Source code for bidshandler.session

import os
import os.path as op
from collections import OrderedDict
import re
import shutil

import xml.etree.ElementTree as ET

import pandas as pd
from datetime import datetime

from .utils import (_get_bids_params, _copyfiles, _realize_paths, _combine_tsv,
                    _multi_replace, _fix_folderless, _file_list,
                    _reformat_fname)
from .bidserrors import MappingError, AssociationError, NoScanError
from .scan import Scan
from .querymixin import QueryMixin
from .constants import _RAW_FILETYPES, _SIDECAR_MAP


[docs]class Session(QueryMixin): """Session-level object. Parameters ---------- id_ : str Id of the session. This is the sequence of characters after `'ses-'`. subject : :class:`bidshandler.Subject` Parent Subject object containing this Session. initialize : bool, optional Whether to parse the folder and load any child structures. no_folder : bool, optional Whether or not the session is contained within a `ses-XX` folder. For experiments with multiple sessions each folder will correspond to a Session object, however if there is only a single session this can be omitted and the Subject folder is in fact the Session folder. """
[docs] def __init__(self, id_, subject, initialize=True, no_folder=False): super(Session, self).__init__() self._id = id_ self.subject = subject self._scans_tsv = None self._scans = [] self.recording_types = [] self._queryable_types = ('session', 'scan') self.has_no_folder = no_folder # list of folder that contain extra associated data for the session self.extra_data = [] if initialize: self._add_scans() self._check()
#region public methods
[docs] def add(self, other, copier=_copyfiles): """.. # noqa Add another Scan or Session to this object. Parameters ---------- other : Instance of :class:`bidshandler.Scan` or :class:`bidshandler.Session` Object to be added to this Session. The added object must already exist in the same context as this object. copier : function, optional A function to facilitate the copying of any applicable data. This function must have the call signature `function(src_files: list, dst_files: list)` Where src_files is the list of files to be moved and dst_files is the list of corresponding destinations. This will default to using utils._copyfiles which simply implements :py:func:`shutil.copy` and creates any directories that do not already exist. """ if isinstance(other, Session): if self._id == other._id: # Copy over all the contained scans. for scan in other.scans: self.add(scan, copier) # Also copy over all the extra files. extra_files = list() for fname in other.extra_data: extra_files.extend( list(_file_list(_realize_paths(other, fname)))) self.extra_data.append(fname) # now that we have the full list, we just need the names # relative to this session's path extra_files_rel = list() for fname in extra_files: extra_files_rel.append(op.relpath(fname, other.path)) copier(extra_files, _realize_paths(self, extra_files_rel)) else: raise ValueError("Added session must have same ID.") elif isinstance(other, Scan): # TODO-LT: handle other modalities # We need to make sure that the scan is of the same person/session: if not (self._id == other.session._id and self.subject._id == other.subject._id and self.project._id == other.project._id): raise AssociationError("scan", "project, subject and session") # Handle merging the scans.tsv file. if other in self: # We don't want to add it if it is already in this session. # TODO: add overwrite argument to allow it to still be # added. return other_scan_df = pd.DataFrame( OrderedDict([ ('filename', [_reformat_fname(other.raw_file_relative)]), ('acq_time', [other.acq_time])]), columns=['filename', 'acq_time']) # Combine the new data into the original tsv. _combine_tsv(self.scans_tsv, other_scan_df, 'filename') # Assign as a set to avoid any potential doubling of the raw # file path. files = set(other.associated_files.values()) files.add(other._sidecar) files.add(other._raw_file) # Copy the files over. fl_left = _realize_paths(other, files) fl_right = [] for fpath in files: fl_right.append(op.join(self.path, other._path, fpath)) copier(fl_left, fl_right) # Add the scan object to our scans list. scan = Scan(other.raw_file_relative, self, acq_time=other.acq_time) self._scans.append(scan) # finally, check to see if the scan had an associated empty # room file. If so, make sure it comes along too if other.emptyroom is not None: self.project.add(other.emptyroom) else: raise TypeError("Cannot add a {0} object to a Subject".format( type(other).__name__))
[docs] def contained_files(self): """Get the list of contained files. Returns ------- file_list : list List with paths to all contained files relating to the BIDS structure. """ file_list = set() file_list.add(_realize_paths(self, self._scans_tsv)) for scan in self.scans: file_list.update(scan.contained_files()) return file_list
[docs] def delete(self): """Delete the session information.""" for scan in self.scans[:]: # Delete the scan. This will remove it from this sessions' scan # list. scan.delete() if self.scans_tsv is not None: os.remove(self.scans_tsv) if len(list(_file_list(self.path))) == 0: shutil.rmtree(self.path) # Remove this session from the session list in the subject and delete. del self.subject._sessions[self._id]
[docs] def rename(self, id_): """Change the sessions' id. Parameters ---------- id_ : str New id for the session object. """ self._rename(self.subject._id, id_)
[docs] def scan(self, task='.', acq='.', run='.', return_all=False): """Return a list of all contained Scan's corresponding to the provided values. Parameters ---------- task : str Value of `task` in the BIDS filename. acq : str Value of `acq` in the BIDS filename. run : str Value of `run` in the BIDS filename. return_all : bool Whether to return every scan in the session that matches the provided values or not. Returns ------- scan : list(:class:`bidshandler.Scan`) List of Scan's. Notes ----- The `task`, `acq` and `run` arguments may all have regular expressions passed to them. """ # First process any regular expressions passed: tsk_re = re.compile(task) if task is not None else re.compile('.') acq_re = re.compile(acq) if acq is not None else re.compile('.') run_re = re.compile(run) if run is not None else re.compile('.') valid_scans = list() for scan in self.scans: _task = scan.task if scan.task is not None else '.' _acq = scan.acq if scan.acq is not None else '.' _run = scan.run if scan.run is not None else '.' if (re.match(tsk_re, _task) and re.match(acq_re, _acq) and re.match(run_re, _run)): valid_scans.append(scan) if return_all: return valid_scans else: if len(valid_scans) > 1: raise Exception("Multiple scans found for {0}. To get the " "list set `return_all=True`".format( self.subject.ID)) if valid_scans == []: raise NoScanError return valid_scans[0]
#region private methods def _add_scans(self): """Parse the session folder to find what recordings are included.""" for fname in os.listdir(self.path): full_path = op.join(self.path, fname) # Each sub-directory is considered a separate type of recording. if op.isdir(full_path): if fname in _SIDECAR_MAP.keys(): self.recording_types.append(fname) else: self.extra_data.append(fname) # The only other non-folder should be the scans tsv. else: filename_data = _get_bids_params(fname) if filename_data.get('file', None) == 'scans': # Store the path and extract the paths of the scans. self._scans_tsv = fname scans = pd.read_csv(_realize_paths(self, self._scans_tsv), sep='\t') column_names = set(scans.columns.values) if 'filename' not in column_names: raise MappingError( "{0} contains no 'filename' column".format( self.scans_tsv)) column_names.remove('filename') for i in range(len(scans)): row = scans.iloc[i] fname = row.pop('filename') self._scans.append( Scan(fname, self, **dict(row))) # if we haven't found a scans.tsv file then we need to add all the # scans in a different way. if self._scans_tsv is None: # for now do just MRI stuff which is any .nii.gz file I think? #TODO: have a switch for each folder name? for rec_type in self.recording_types: if rec_type not in ('anat', 'dwi'): rec_path = _realize_paths(self, rec_type) if rec_type == 'fmap': # fieldmap sequence # The files with `file` = `magnitude1` are not raw # scans. filename_data = _get_bids_params(fname) if ((filename_data['file'] not in ('magnitude1', 'magnitude2')) and 'nii' in fname): self._scans.append( Scan(op.join(rec_type, fname), self)) for fname in os.listdir(rec_path): for ext in _RAW_FILETYPES: if ext in fname: self._scans.append( Scan(op.join(rec_type, fname), self)) def _check(self): """Check that there is at least one included scan.""" if len(self._scans) == 0: raise MappingError("No scans found in {0}/{1}/{2}.".format( self.project.ID, self.subject.ID, self.ID)) @staticmethod def _clone_into_subject(subject, other): """Create a copy of the Session with a new parent Subject. Parameters ---------- subject : :class:`bidshandler.Subject` New parent Subject. other : :class:`BIDSHandler.Session` Original Session instance to clone. Returns ------- new_session : :class:`bidshandler.Session` New uninitialized Session cloned from `other` to be a child of `subject`. """ # set the directory to be the same as the parent. os.makedirs(_realize_paths(subject, other.ID), exist_ok=True) # Create a new empty session object. new_session = Session(other._id, subject, initialize=False) new_session._create_empty_scan_tsv() return new_session def _create_empty_scan_tsv(self): """Create an empty scans.tsv file for this session.""" self._scans_tsv = '{0}_{1}_scans.tsv'.format(self.subject.ID, self.ID) full_path = _realize_paths(self, self._scans_tsv) if not op.exists(full_path): df = pd.DataFrame(OrderedDict([('filename', [])]), columns=['filename']) df.to_csv(full_path, sep='\t', index=False, na_rep='n/a', encoding='utf-8') def _generate_map(self): """Generate a map of the Session. Returns ------- root : :py:class:`xml.etree.ElementTree.Element` Xml element containing session information. """ root = ET.Element('Session', attrib={'ID': str(self._id)}) for scan in self.scans: root.append(scan._generate_map()) return root def _rename(self, subj_id, sess_id): """Change the session id for all contained files. Parameters ---------- subj_id : str Raw subject ID value. Ie. *without* `sub-`. sess_id : str Raw session ID value. Ie. *without* `ses-`. """ # cache current values and generate new ones for use old_subj_id = self.subject.ID new_subj_id = 'sub-{0}'.format(subj_id) old_sess_id = self.ID new_sess_id = 'ses-{0}'.format(sess_id) old_scans_tsv = self.scans_tsv old_path = self.path if self.has_no_folder: new_path = op.join(self.subject.path, new_sess_id) else: new_path = _multi_replace(old_path, [old_subj_id, old_sess_id], [new_subj_id, new_sess_id]) if not op.exists(new_path): os.makedirs(new_path) scan_delete_paths = set() # call rename on each of the contained Scan objects for scan in self.scans: scan_delete_paths.add(scan.path) scan._rename(subj_id, sess_id) # update the row data to point to the new scan locations if old_scans_tsv is not None: if op.exists(old_scans_tsv): df = pd.read_csv(old_scans_tsv, sep='\t') for idx, row in enumerate(df['filename']): row = _fix_folderless(self, row, old_sess_id, old_subj_id) row = _reformat_fname(row) df.at[idx, 'filename'] = _multi_replace( row, [old_subj_id, old_sess_id], [new_subj_id, new_sess_id]) df.to_csv(old_scans_tsv, sep='\t', index=False, na_rep='n/a', encoding='utf-8') self._scans_tsv = _fix_folderless(self, self._scans_tsv, old_sess_id, old_subj_id) self._scans_tsv = _multi_replace(self._scans_tsv, [old_subj_id, old_sess_id], [new_subj_id, new_sess_id]) # rename the scans.tsv file os.rename(old_scans_tsv, op.join(self.project.path, new_subj_id, new_sess_id, self._scans_tsv)) # remove the old path # TODO: check to see if the folders are empty. for fpath in scan_delete_paths: shutil.rmtree(fpath) # change the internal id. self.ID -> new_sess_id old_id = self._id self._id = sess_id # update the parent subject dictionary if old_id != self._id: self.subject._sessions[self._id] = self del self.subject._sessions[old_id] if self._id != 'none': self.has_no_folder = False #region properties @property def bids_tree(self): """Parent :class:`bidshandler.BIDSTree` object.""" return self.project.bids_tree @property def date(self): """The recording date of the session. Returns ------- known_date : :func:`datetime.date` Specific date of the year the session ocurred on. """ known_date = None for scan in self.scans: # if the scan has an acquisition date, load it into a datetime.date # object and compare if scan.acq_time is not None: try: compare_date = datetime.strptime(scan.acq_time, '%Y-%m-%d') except ValueError: compare_date = datetime.strptime(scan.acq_time, '%Y-%m-%dT%H:%M:%S') compare_date = compare_date.date() if known_date is None: known_date = compare_date else: if compare_date != known_date: known_date = None break return known_date @property def ID(self): """ID with 'ses' prefix.""" return 'ses-{0}'.format(self._id) @property def inheritable_files(self): """List of files that are able to be inherited by child objects.""" # TODO: make private? files = self.subject.inheritable_files for fname in os.listdir(self.path): abs_path = _realize_paths(self, fname) if op.isfile(abs_path): files.append(abs_path) return files @property def path(self): """Path to Session folder.""" if self.has_no_folder: return self.subject.path return _realize_paths(self.subject, self.ID) @property def project(self): """Parent :class:`bidshandler.Project` object.""" return self.subject.project @property def scans(self): """List of all contained :class:`bidshandler.Scan`'s. Returns ------- list of :class:`bidshandler.Scan` All Scans within this Session. """ return self._scans @property def scans_tsv(self): """Path of associated scans.tsv file if there is one.""" _path = None if self._scans_tsv is not None: _path = _realize_paths(self, self._scans_tsv) return _path #region class methods
[docs] def __contains__(self, other): """Determine whether the Session object contains a scan. Parameters ---------- other : :class:`bidshandler.Scan` Object to test whether it is contained in this Session. Returns ------- bool Returns True if the object is contained within this Session. """ if isinstance(other, Scan): for scan in self._scans: if scan == other: return True return False raise TypeError("Can only determine if a Scan is contained.")
[docs] def __iter__(self): """Iterable of the contained Scan objects.""" return iter(self._scans)
def __repr__(self): return '<Session, ID: {0}, {1} scan{2}, @ {3}>'.format( self.ID, len(self.scans), ('s' if len(self.scans) != 1 else ''), self.path) def __str__(self): output = [] output.append('ID: {0}'.format(self.ID)) output.append('Number of scans: {0}'.format(len(self.scans))) return '\n'.join(output)