Source code for bidshandler.subject

import os
import os.path as op
from collections import OrderedDict
import xml.etree.ElementTree as ET
import shutil
from warnings import warn

import pandas as pd

from .bidserrors import MappingError, NoSessionError, AssociationError
from .session import Session
from .scan import Scan
from .querymixin import QueryMixin
from .utils import _copyfiles, _realize_paths, _file_list


[docs]class Subject(QueryMixin): """Subject-level object. Parameters ---------- id_ : str Id of the subject. This is the sequence of characters after `'sub-'`. project : :class:`bidshandler.Project` Parent Project object containing this Subject. initialize : bool, optional Whether to parse the folder and load any child structures. """
[docs] def __init__(self, id_, project, initialize=True): super(Subject, self).__init__() self._id = id_ self.project = project # Contained sessions self._sessions = dict() # All the various information about the subject from the # participants.tsv file. self.subject_data = OrderedDict() self._queryable_types = ('subject', 'session', 'scan') if initialize: self._load_subject_info() self._add_sessions() self._check()
#region public methods
[docs] def add(self, other, copier=_copyfiles): """.. # noqa Add another Scan, Session or Subject to this object. Parameters ---------- other : Instance of :class:`bidshandler.Scan`, :class:`bidshandler.Session` or :class:`bidshandler.Subject` Object to be added to this Subject. The added object must already exist in the same context as this object. copier : function, optional A function to facilitate the copying of any applicable data. This function must have the call signature `function(src_files: list, dst_files: list)` Where src_files is the list of files to be moved and dst_files is the list of corresponding destinations. This will default to using utils._copyfiles which simply implements :py:func:`shutil.copy` and creates any directories that do not already exist. """ if isinstance(other, Subject): # If the subject has the same ID, take all the child sessions and # merge into this project. if self._id == other._id: for session in other.sessions: self.add(session, copier) else: raise ValueError("Added subject must have same ID.") elif isinstance(other, Session): if not (self._id == other.subject._id and self.project._id == other.project._id): raise AssociationError("session", "project and subject") if other in self: # if the other session being added has the same ID, merge it # with the current session with that ID. self.session(other._id).add(other, copier) else: # Check to see if we have only one scan without a session # folder: if len(self.sessions) == 1: # If we have only one existing session, we want to # check whether the existing session has no actual # session folder. if self.sessions[0].has_no_folder: warn("Current Subject has only one session with no " "specified session id. Please set this " "sessions' id by renaming it using " "`self.sessions[0].rename(1)` (or other number). " "The session to be added will not be added.") return new_session = Session._clone_into_subject(self, other) new_session.add(other, copier) self._sessions[other._id] = new_session elif isinstance(other, Scan): if not (self._id == other.subject._id and self.project._id == other.project._id): raise AssociationError("scan", "project and subject") if other.session in self: self.session(other.session._id).add(other, copier) else: new_session = Session._clone_into_subject(self, other.session) new_session.add(other, copier) self._sessions[other.session._id] = new_session else: raise TypeError("Cannot add a {0} object to a Subject".format( type(other).__name__))
[docs] def contained_files(self): """Get the list of contained files. Returns ------- file_list : list List with paths to all contained files relating to the BIDS structure. """ file_list = set() for session in self.sessions: file_list.update(session.contained_files()) return file_list
[docs] def delete(self): """Delete the subject from the parent Project.""" for session in self.sessions[:]: session.delete() # remove the subject information from the participants.tsv if self.project.participants_tsv is not None: df = pd.read_csv(self.project.participants_tsv, sep='\t') row_idx = df[df['participant_id'] == self.ID].index.item() df = df.drop(row_idx) df.to_csv(self.project.participants_tsv, sep='\t', index=False, na_rep='n/a', encoding='utf-8') if len(list(_file_list(self.path))) == 0: shutil.rmtree(self.path) del self.project._subjects[self._id]
[docs] def rename(self, id_): """Change the subjects' id. Parameters ---------- id_ : str New id for the subject object. """ self._rename(id_)
[docs] def session(self, id_): """Return the Session corresponding to the provided id. Parameters ---------- id_ : str Id of the session to return. This doesn't need the `'ses'` prefix. Returns ------- :class:`bidshandler.Session` Contained Session with the specified `id_`. """ try: return self._sessions[str(id_)] except KeyError: raise NoSessionError( "Session {0} doesn't exist in subject '{1}'. " "Possible sessions: {2}".format(id_, self.ID, list(self._sessions.keys())))
#region private methods def _add_sessions(self): """Add all the sessions in the folder to the Subject.""" for fname in os.listdir(self.path): full_path = op.join(self.path, fname) if op.isdir(full_path) and 'ses' in fname: ses_id = fname.split('-')[1] self._sessions[ses_id] = Session(ses_id, self) # If we haven't found any sub-folders with 'ses' in their name try and # assume that the current folder is in fact the session folder (ie. # only one session). if len(self._sessions) == 0: self._sessions['none'] = Session('none', self, no_folder=True) def _check(self): """Check that there is at least one included session.""" if len(self._sessions) == 0: raise MappingError("No sessions found in {0}/{1}.".format( self.project.ID, self.ID)) @staticmethod def _clone_into_project(project, other): """Create a copy of the Subject with a new parent Project. Parameters ---------- project : :class:`bidshandler.Project` New parent Project. other : :class:`bidshandler.Subject` Original Subject instance to clone. Returns ------- new_subject : :class:`bidshandler.Subject` New uninitialized Subject cloned from `other` to be a child of `project`. """ os.makedirs(_realize_paths(project, other.ID), exist_ok=True) # Create a new empty subject object. new_subject = Subject(other._id, project, initialize=False) # Merge the subject data into the participants.tsv file. df = pd.read_csv(project.participants_tsv, sep='\t') data = [('participant_id', [other.ID])] for key, value in other.subject_data.items(): data.append((key, [value])) other_sub_df = pd.DataFrame( OrderedDict(data), columns=['participant_id', *other.subject_data.keys()]) df = df.append(other_sub_df, sort=False) df.to_csv(project.participants_tsv, sep='\t', index=False, na_rep='n/a', encoding='utf-8') # Check if the new parent has a participants.json file. # If not, give it the one with this subject if it has one. if project._participants_json is not None: if other.project._participants_json is not None: shutil.copy(other.project.participants_json, project.path) # can now safely get the subject info new_subject._load_subject_info() return new_subject def _load_subject_info(self): participant_path = op.join(op.dirname(self.path), 'participants.tsv') if not op.exists(participant_path): return participants = pd.read_csv(participant_path, sep='\t') column_names = set(participants.columns.values) if 'participant_id' not in column_names: # temporary error... This means the file is bad. raise MappingError column_names.remove('participant_id') row = participants.loc[participants['participant_id'] == self.ID] for col_name in column_names: val = row.get(col_name) if val is not None: if not val.empty: # Ignore empty rows. self.subject_data[col_name] = val.item() else: self.subject_data[col_name] = "n/a" def _generate_map(self): """Generate a map of the Subject. Returns ------- root : :py:class:`xml.etree.ElementTree.Element` Xml element containing subject information. """ attribs = {'ID': str(self._id)} attribs.update(zip(self.subject_data.keys(), [str(x) for x in self.subject_data.values()])) for key, value in attribs.items(): if value == 'n/a': attribs.pop(key) root = ET.Element('Subject', attrib=attribs) for session in self.sessions: root.append(session._generate_map()) return root def _rename(self, subj_id): """Change the session id for all contained files. Parameters ---------- subj_id : str Raw subject ID value. Ie. *without* `sub-`. """ # cache current values old_subj_id = self.ID new_subj_id = 'sub-{0}'.format(subj_id) old_path = self.path new_path = self.path.replace(old_subj_id, new_subj_id) if not op.exists(new_path): os.mkdir(new_path) # call rename on each of the contained Scan objects for session in self.sessions: session._rename(subj_id, session._id) if op.exists(self.project.participants_tsv): df = pd.read_csv(self.project.participants_tsv, sep='\t') for idx, row in enumerate(df['participant_id']): if row == old_subj_id: df.at[idx, 'participant_id'] = new_subj_id break df.to_csv(self.project.participants_tsv, sep='\t', index=False, na_rep='n/a', encoding='utf-8') # remove the old path if len(list(_file_list(old_path))) == 0: shutil.rmtree(old_path) else: warn_msg = "The following files haven't been moved correctly:\n{0}" warn(warn_msg.format( "\n".join( [_realize_paths(self, p) for p in os.listdir(old_path)]))) self._id = subj_id #region properties @property def bids_tree(self): """Parent :class:`bidshandler.BIDSTree` object.""" return self.project.bids_tree @property def ID(self): """ID with 'sub' prefix.""" return 'sub-{0}'.format(self._id) @property def inheritable_files(self): """List of files that are able to be inherited by child objects.""" # TODO: make private? files = self.project.inheritable_files for fname in os.listdir(self.path): abs_path = _realize_paths(self, fname) if op.isfile(abs_path): files.append(abs_path) return files @property def path(self): """Path of Subject folder.""" return op.join(self.project.path, self.ID) @property def scans(self): """List of all contained :class:`bidshandler.Scan`'s. Returns ------- list of :class:`bidshandler.Scan` All Scans within this Subject. """ scan_list = [] for session in self.sessions: scan_list.extend(session.scans) return scan_list @property def sessions(self): """List of all contained :class:`bidshandler.Session`'s. Returns ------- list of :class:`bidshandler.Session` All Sessions within this Subject. """ return list(self._sessions.values()) #region class methods
[docs] def __contains__(self, other): """.. # noqa Determine if the Subject contains a certain Scan or Session. Parameters ---------- other : Instance of :class:`bidshandler.Scan` or :class:`bidshandler.Session` Object to check whether it is contained in this Subject. Returns ------- bool Returns True if the object is contained within this Subject. """ if isinstance(other, Session): return other._id in self._sessions elif isinstance(other, Scan): for session in self.sessions: if other in session: return True return False raise TypeError("Can only determine if Scans or Sessions are " "contained.")
[docs] def __iter__(self): """Iterable of the contained Session objects.""" return iter(self._sessions.values())
[docs] def __getitem__(self, item): """ Return the child session with the corresponding name (if it exists). """ return self.session(item)
def __repr__(self): return '<Subject, ID: {0}, {1} session{2}, @ {3}>'.format( self.ID, len(self.sessions), ('s' if len(self.sessions) != 1 else ''), self.path) def __str__(self): output = [] output.append('ID: {0}'.format(self.ID)) for key, value in self.subject_data.items(): output.append('{0}: {1}'.format(key.title(), value)) output.append('Number of Sessions: {0}'.format(len(self.sessions))) return '\n'.join(output)