import os.path as op
import os
import json
import xml.etree.ElementTree as ET
from warnings import warn
import shutil
import pandas as pd
from .querymixin import QueryMixin
from .utils import (_get_bids_params, _realize_paths, _multi_replace,
_bids_params_are_subsets, _splitall, _fix_folderless,
_file_list, _reformat_fname)
from .bidserrors import NoScanError
from .constants import _SIDECAR_MAP
[docs]class Scan(QueryMixin):
"""Scan-level object
Parameters
----------
fpath : str
The path to the raw scan file.
session : Instance of :class:`bidshandler.Session`
Parent Session object containing this Scan.
scan_params : dict, optional
A dictionary containing any number of other scan parameters specified
by scans.tsv.
"""
[docs] def __init__(self, fpath, session, **scan_params):
super(Scan, self).__init__()
split_paths = _splitall(fpath)
if len(split_paths) < 2:
raise ValueError("Scan must be within a subfolder of the session.")
self._path = split_paths[0]
sub_paths = split_paths[1:]
if len(sub_paths) == 1:
self._raw_file = sub_paths[0]
elif len(sub_paths) > 1:
self._raw_file = op.join(sub_paths[0], *sub_paths[1:])
self.acq_time = scan_params.pop('acq_time', None)
self.scan_params = scan_params
self.session = session
self._get_params()
self._sidecar = None
self._queryable_types = ('scan',)
self.associated_files = dict()
self._assign_metadata()
# Load information from the sidecar.
self.info = dict()
self._load_info()
# Finally we do any manufacturer specific loading.
self._load_extras()
#region public methods
[docs] def contained_files(self):
"""Get the list of contained files.
Returns
-------
file_list : list
List with paths to all contained files relating to the BIDS
structure.
"""
file_list = set()
if self.sidecar is not None:
file_list.add(self.sidecar)
file_list.update(_realize_paths(self,
list(self.associated_files.values())))
return file_list
[docs] def delete(self):
"""Delete all the scans' files."""
for fname in self.contained_files():
# make sure we only delete files that are in the same directory or
# lower.
if not fname.startswith('..'):
# also make sure that there are no other scans in the same
# session using the file
used = False
for scan in self.session.scans:
if scan != self:
if fname in scan.contained_files():
used = True
break
if not used:
os.remove(fname)
# remove the raw file
os.remove(self.raw_file)
# remove the scan information from the scans.tsv
if self.session.scans_tsv is not None:
df = pd.read_csv(self.session.scans_tsv, sep='\t')
row_idx = (df[df['filename'] ==
_reformat_fname(self.raw_file_relative)].index.item())
df = df.drop(row_idx)
df.to_csv(self.session.scans_tsv, sep='\t', index=False,
na_rep='n/a', encoding='utf-8')
# is the directory is empty remove it
if len(list(_file_list(self.path))) == 0:
shutil.rmtree(self.path)
# remove the scan from the parent session
self.session._scans.remove(self)
# and delete self
del self
#region private methods
def _assign_metadata(self):
"""Associate any files that are related to this raw file."""
filename_data = _get_bids_params(op.basename(self._raw_file))
for fname in os.listdir(self.path):
bids_params = _get_bids_params(fname)
part = bids_params.pop('part', None)
if _bids_params_are_subsets(filename_data, bids_params):
if (bids_params['file'] == _SIDECAR_MAP.get(self._path,
None) and
bids_params['ext'] == '.json'):
self._sidecar = fname
else:
# TODO: this will not work for .ds folders...
if not op.isdir(_realize_paths(self, fname)):
if part is None:
if fname == self._raw_file:
# Don't add the raw file name to the list.
continue
if bids_params['file'] in self.associated_files:
new_key = bids_params['file'] + \
bids_params['ext']
self.associated_files[new_key] = fname
else:
self.associated_files[bids_params['file']] = \
fname
else:
if part == '01':
# Assign the correct raw file name.
self._raw_file = fname
else:
# Give a unique key to avoid conflict if there
# are lots of parts for some reason...
key = str(bids_params['file']) + '_' + part
self.associated_files[key] = fname
# If we have no sidecar file associated from the local folder, go over
# the files that this folder inherit
if self._sidecar is None:
filename_data = _get_bids_params(op.basename(self._raw_file))
for fname in self.session.inheritable_files:
bids_params = _get_bids_params(op.basename(fname))
if _bids_params_are_subsets(filename_data, bids_params):
if bids_params['ext'] == '.json':
if bids_params['file'] == _SIDECAR_MAP.get(self._path,
None):
self._sidecar = op.relpath(fname, self.path)
else:
self.associated_files[bids_params['file']] = \
op.relpath(fname, self.path)
# If there is still no sidecar file then it probably doesn't have one.
def _generate_map(self):
"""Generate a map of the Subject.
Returns
-------
:py:class:`xml.etree.ElementTree.Element`
Xml element containing subject information.
"""
return ET.Element('Scan', attrib={'path': self.raw_file_relative})
def _get_params(self):
"""Find the scan parameters from the file name."""
filename_data = _get_bids_params(op.basename(self._raw_file))
self.task = filename_data.get('task', None)
self.run = filename_data.get('run', None)
self.acquisition = self.acq = filename_data.get('acq', None)
self.proc = filename_data.get('proc', None)
def _load_extras(self):
"""Load any extra files on a manufacturer-by-manufacturer basis."""
if self.info.get('Manufacturer', None) == 'KIT/Yokogawa':
# Need to load the marker files.
# These will be in the same folder as the raw data.
filename_data = _get_bids_params(op.basename(self._raw_file))
raw_folder = op.dirname(self._raw_file)
for fname in os.listdir(op.join(self.path, raw_folder)):
bids_params = _get_bids_params(fname)
if _bids_params_are_subsets(filename_data, bids_params):
if bids_params['file'] == 'markers':
if bids_params.get('acq', None) is not None:
acq = '-' + bids_params['acq']
else:
acq = ''
self.associated_files['markers{0}'.format(acq)] = op.join(raw_folder, fname) # noqa
def _load_info(self):
"""Read the sidecar.json and load the information into self.info"""
if self._sidecar is not None:
with open(self.sidecar, 'r') as sidecar:
self.info = json.load(sidecar)
def _rename(self, subj_id, sess_id):
"""Rename all the files contained by the scan.
Parameters
----------
subj_id : str
Raw subject ID value. Ie. *without* `sub-`.
sess_id : str
Raw session ID value. Ie. *without* `ses-`.
"""
# TODO: handle moving of anat data
old_subj_id = self.subject.ID
new_subj_id = 'sub-{0}'.format(subj_id)
old_sess_id = self.session.ID
new_sess_id = 'ses-{0}'.format(sess_id)
# rename all the contained files
for fname in self.contained_files():
# make sure we only rename files that are in the same directory or
# lower.
if not fname.startswith('..'):
new_fname = _fix_folderless(self.session, fname, old_sess_id,
old_subj_id)
new_fname = _multi_replace(new_fname,
[old_subj_id, old_sess_id],
[new_subj_id, new_sess_id])
if not op.exists(op.dirname(new_fname)):
os.makedirs(op.dirname(new_fname))
os.rename(fname, new_fname)
# rename the raw file
old_fname = self.raw_file
new_fname = _fix_folderless(self.session, old_fname, old_sess_id,
old_subj_id)
new_fname = _multi_replace(new_fname, [old_subj_id, old_sess_id],
[new_subj_id, new_sess_id])
if not op.exists(op.dirname(new_fname)):
os.makedirs(op.dirname(new_fname))
os.rename(old_fname, new_fname)
self._raw_file = _fix_folderless(self.session, self._raw_file,
old_sess_id, old_subj_id)
self._raw_file = _multi_replace(self._raw_file,
[old_subj_id, old_sess_id],
[new_subj_id, new_sess_id])
# rename all the internal file names
if self._sidecar is not None:
self._sidecar = _fix_folderless(self.session, self._sidecar,
old_sess_id, old_subj_id)
self._sidecar = _multi_replace(self._sidecar,
[old_subj_id, old_sess_id],
[new_subj_id, new_sess_id])
for key, value in self.associated_files.items():
value = _fix_folderless(self.session, value, old_sess_id,
old_subj_id)
self.associated_files[key] = _multi_replace(
value, [old_subj_id, old_sess_id], [new_subj_id, new_sess_id])
#region properties
@property
def bids_tree(self):
"""Parent :class:`bidshandler.BIDSTree` object."""
return self.project.bids_tree
@property
def channels_tsv(self):
"""Path to the associated channels.tsv file if there is one."""
_path = None
channels_path = self.associated_files.get('channels')
if channels_path is not None:
_path = _realize_paths(self, channels_path)
return _path
@property
def coordsystem_json(self):
"""Path to the associated coordsystem.json file if there is one."""
_path = None
coordsystem_path = self.associated_files.get('coordsystem')
if coordsystem_path is not None:
_path = _realize_paths(self, coordsystem_path)
return _path
@property
def emptyroom(self):
"""Associated emptyroom Scan.
Returns
-------
:class:`bidshandler.Scan`
Associated emptyroom Scan object.
Note
----
Only for MEG scans.
"""
_path = None
if self.scan_type == 'meg':
emptyroom = self.info.get('AssociatedEmptyRoom')
if emptyroom is not None:
fname = op.basename(emptyroom)
bids_params = _get_bids_params(fname)
try:
_path = self.project.subject(
bids_params['sub']).session(
bids_params['ses']).scan(
task=bids_params.get('task'),
acq=bids_params.get('acq'),
run=bids_params.get('run'))
except (KeyError, NoScanError):
msg = 'Associated empty room file for {0} cannot be found'
warn(msg.format(str(self)))
_path = None
return _path
@property
def events_tsv(self):
"""Absolute path to the associated events.tsv file."""
_path = None
events_path = self.associated_files.get('events')
if events_path is not None:
_path = _realize_paths(self, events_path)
return _path
@property
def path(self):
"""Path of folder containing Scan."""
return op.join(self.session.path, self._path)
@property
def project(self):
"""Parent :class:`bidshandler.Project` object."""
return self.subject.project
@property
def raw_file(self):
"""Path of associated raw file."""
return _realize_paths(self, self._raw_file)
@property
def raw_file_relative(self):
"""Path of associated raw file relative to parent Session."""
return op.join(self._path, self._raw_file)
@property
def scan_type(self):
"""Type of Scan.
This will be the name of the folder the Scan resides in.
Eg. `meg` for MEG data, `func` for fMRI data.
"""
return self._path
@property
def sidecar(self):
"""Path of associated sidecar file if there is one."""
_path = None
if self._sidecar is not None:
_path = _realize_paths(self, self._sidecar)
return _path
@property
def subject(self):
"""Parent :class:`bidshandler.Subject` object."""
return self.session.subject
#region class methods
def __eq__(self, other):
"""Implements self == other
Returns True if each instance has the same set of parameters
"""
if not isinstance(other, Scan):
raise TypeError("Can only compare two Scan objects.")
return ((self.acq == other.acq) &
(self.task == other.task) &
(self.run == other.run) &
(self.proc == other.proc) &
(self.session._id == other.session._id) &
(self.subject._id == other.subject._id) &
(self.project._id == other.project._id))
def __repr__(self):
return '<Scan, @ {0}>'.format(self.raw_file)
def __str__(self):
return self.raw_file_relative