Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Allow datman to handle linked XNAT sessions #358

Merged
merged 10 commits into from
Oct 17, 2023
2 changes: 2 additions & 0 deletions bin/dm_redcap_scan_completed.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ def add_session_redcap(record, record_key):
return

session_date = record[get_setting('RedcapDate', default='date')]
# Strip off optional 'hours' field'
session_date = session_date.split(" ")[0]

try:
session = dashboard.get_session(ident, date=session_date, create=True)
Expand Down
19 changes: 15 additions & 4 deletions bin/dm_xnat_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def main():

session = datman.scan.Scan(ident, config, bids_root=args.bids_out)

if xnat_experiment.resource_files:
if xnat_experiment.resource_files and not xnat_experiment.is_shared():
export_resources(session.resource_path, xnat, xnat_experiment,
dry_run=args.dry_run)

Expand Down Expand Up @@ -611,7 +611,11 @@ def make_session_exporters(config, session, experiment, bids_opts=None,
list: Returns a list of :obj:`datman.exporters.Exporter` for the
desired session export formats.
"""
formats = get_session_formats(bids_opts=bids_opts, ignore_db=ignore_db)
formats = get_session_formats(
bids_opts=bids_opts,
shared=experiment.is_shared(),
ignore_db=ignore_db
)

exporters = []
for exp_format in formats:
Expand All @@ -623,21 +627,28 @@ def make_session_exporters(config, session, experiment, bids_opts=None,
return exporters


def get_session_formats(bids_opts=None, ignore_db=False):
def get_session_formats(bids_opts=None, shared=False, ignore_db=False):
"""Get the string identifiers for all session exporters that are needed.

Args:
bids_opts (:obj:`BidsOptions`, optional): dcm2bids settings to be
used if exporting to BIDS format. Defaults to None.
shared (bool, optional): Whether to treat the session as a
shared XNAT experiment. Defaults to False.
ignore_db (bool, optional): If True, datman's QC dashboard will not
be updated. Defaults to False.

Returns:
list: a list of string keys that should be used to make exporters.
"""
formats = []
if bids_opts:
if shared:
formats.append("shared")
elif bids_opts:
# Only do 'bids' format if not a shared session.
formats.append("bids")

if bids_opts:
formats.append("nii_link")
if not ignore_db:
formats.append("db")
Expand Down
279 changes: 268 additions & 11 deletions datman/exporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import pydicom as dicom

import datman.config
import datman.dashboard
from datman.exceptions import UndefinedSetting, DashboardException
from datman.scanid import (parse_bids_filename, ParseException, make_filename,
KCNIIdentifier)
import datman.scan
from datman.exceptions import (UndefinedSetting, DashboardException,
ExportException)
from datman.scanid import (parse, parse_bids_filename, ParseException,
parse_filename, make_filename, KCNIIdentifier)
from datman.utils import (run, make_temp_directory, get_extension,
filter_niftis, find_tech_notes, read_blacklist,
get_relative_source, read_json, write_json)
Expand Down Expand Up @@ -134,6 +137,8 @@ class SessionExporter(Exporter):

def __init__(self, config, session, experiment, dry_run=False, **kwargs):
self.experiment = experiment
self.config = config
self.session = session
self.dry_run = dry_run

def __repr__(self):
Expand Down Expand Up @@ -657,11 +662,11 @@ def __init__(self, config, session, experiment, **kwargs):
self.study_resource_path = study_resource_dir
self.resources_path = resources_dir
self.date = experiment.date
self.names = self.get_scan_names(session, experiment)
super().__init__(config, session, experiment, **kwargs)

def get_scan_names(self, session, experiment):
"""Gets list of datman-style scan names for a session.
@property
def names(self):
"""Gets list of valid datman-style scan names for a session.

Returns:
:obj:`dict`: A dictionary of datman style scan names mapped to
Expand All @@ -670,17 +675,17 @@ def get_scan_names(self, session, experiment):
"""
names = {}
# use experiment.scans, so dashboard can report scans that didnt export
for scan in experiment.scans:
for scan in self.experiment.scans:
for name in scan.names:
names[name] = self.get_bids_name(name, session)
names[name] = self.get_bids_name(name, self.session)

# Check the actual folder contents as well, in case symlinked scans
# exist that werent named on XNAT
for nii in session.niftis:
for nii in self.session.niftis:
fname = nii.file_name.replace(nii.ext, "")
if fname in names:
continue
names[fname] = self.get_bids_name(fname, session)
names[fname] = self.get_bids_name(fname, self.session)

return names

Expand Down Expand Up @@ -852,11 +857,36 @@ def make_scan(self, file_stem):
logger.error(f"Failed adding scan {file_stem} to dashboard "
f"with error: {exc}")
return

if self.experiment.is_shared():
self._make_linked(scan)
self._add_bids_scan_name(scan, file_stem)
self._add_side_car(scan, file_stem)
self._update_conversion_errors(scan, file_stem)

def _make_linked(self, scan):
try:
source_session = datman.dashboard.get_session(self.experiment.name)
except datman.dashboard.DashboardException as exc:
logger.error(
f"Failed to link shared scan {scan} to source "
f"{self.experiment.name}. Reason - {exc}"
)
return
matches = [
source_scan for source_scan in source_session.scans
if (source_scan.series == scan.series and
source_scan.tag == scan.tag)
]
if not matches or len(matches) > 1:
logger.error(
f"Failed to link shared scan {scan} to {self.experiment.name}."
" Reason - Unable to find source scan database record."
)
return

scan.source_id = matches[0].id
scan.save()

def _add_bids_scan_name(self, scan, dm_stem):
"""Add a bids format file name to a series in the QC database.

Expand Down Expand Up @@ -969,6 +999,233 @@ def errors_outdated(self, scan, fname):
return False


class SharedExporter(SessionExporter):
"""Export an XNAT 'shared' experiment.
"""

type = "shared"
ext = ".nii.gz"

def __init__(self, config, session, experiment, bids_opts=None, **kwargs):
if not experiment.is_shared():
raise ExportException(
f"Cannot make SharedExporter for {experiment}. "
"XNAT Experiment is not shared."
)

try:
self.source_session = self.find_source_session(config, experiment)
except (ParseException, datman.config.ConfigException) as exc:
raise ExportException(
f"Can't find source data for shared experiment {experiment}. "
f"Reason - {exc}"
)

# The datman-style directories to export
dm_dirs = ['qc_path', 'dcm_path', 'mnc_path', 'nrrd_path']
if not bids_opts:
dm_dirs.append('nii_path')

self.tags = config.get_tags(site=session.site)

super().__init__(config, session, experiment, **kwargs)

self.name_map = self.make_name_map(dm_dirs, use_bids=bids_opts)

def find_source_session(self, config, experiment):
"""Find the original data on the filesystem.

Args:
config (:obj:`datman.config.config`): The datman config object for
the study that the shared experiment belongs to.
experiment (:obj:`datman.xnat.XNATExperiment`): The experiment
object for the shared session on XNAT.

Returns:
:obj:`datman.scan.Scan`: The scan object for the source dataset
as previously exported to the filesystem.
"""
ident = parse(experiment.name)
study = config.map_xnat_archive_to_project(ident)
config = datman.config.config(study=study)
return datman.scan.Scan(ident, config)

def make_name_map(self, dm_dirs, use_bids=False):
"""Create a dictionary of source files to their 'shared' alias.

Args:
dm_dirs (:obj:`list`): A list of datman-style paths on the source
session's :obj:`datman.scan.Scan` object to search for files.
use_bids (any, optional): Whether or not to search for bids files.
Any input equivalent to boolean True will be taken as 'True'.
Default False.

Returns:
dict: A dictionary mapping the full path to each discovered source
file to the full path to the output alias name/symlink.
"""
if use_bids:
name_map = self.find_bids_files()
else:
name_map = {}

for dir_type in dm_dirs:
self.find_dm_files(dir_type, name_map)

self.find_resource_files(name_map)

return name_map

def find_bids_files(self, name_map=None):
"""Find all bids files that have been created for the source session.

Args:
name_map (dict, optional): A dictionary that may contain other
discovered files and their output names. Default None.

Returns:
dict: A dictionary of source session files that have been
found, mapped to their full path under the shared/alias ID.
"""
if name_map is None:
name_map = {}

for root, _, files in os.walk(self.source_session.bids_path):
dest_dir = root.replace(
self.source_session.bids_path,
self.session.bids_path
)

for item in files:
dest_name = item.replace(
self.source_session.bids_sub, self.session.bids_sub
).replace(
self.source_session.bids_ses, self.session.bids_ses
)
name_map[os.path.join(root, item)] = os.path.join(
dest_dir, dest_name
)

return name_map

def find_dm_files(self, dir_type, name_map=None):
"""Find datman-style source files in all listed directory types.

Args:
dir_type (list): A list of paths on the source session to search
through. All entries should be valid path types defined for
the `datman.scan.Scan` object.
name_map (dict, optional): A dictionary of other discovered
source files mapped to their aliases. Default None.

Returns:
dict: A dictionary of source session files that have been
found, mapped to their full path under the shared/alias ID.
"""
if name_map is None:
name_map = {}

source_dir = getattr(self.source_session, dir_type)
dest_dir = getattr(self.session, dir_type)

for item in glob(os.path.join(source_dir, "*")):
try:
_, tag, _, _ = parse_filename(item)
except ParseException:
logger.debug(
f"Ignoring invalid file name {item} in {source_dir}"
)
continue

if tag not in self.tags:
# Found a valid scan name but with a tag not used by dest study
continue

if dir_type == 'qc_path' and item.endswith('_manifest.json'):
# Filter out manifest files. These should be regenerated
# by dm_qc_report for the dest session.
continue

fname = os.path.basename(item).replace(
self.source_session.id_plus_session,
self.session.id_plus_session
)
name_map[item] = os.path.join(dest_dir, fname)

return name_map

def find_resource_files(self, name_map=None):
"""Find all source session resources files.

Args:
name_map (dict, optional): A dictionary of any previously found
source files mapped to their aliases.

Returns:
dict: A dictionary of source session files that have been
found, mapped to their full path under the shared/alias ID.
"""
if name_map is None:
name_map = {}

for root, _, files in os.walk(self.session.resource_path):
dest_path = root.replace(
self.source_session.resource_path,
self.session.resource_path
)
for item in files:
name_map[os.path.join(root, item)] = os.path.join(
dest_path, item
)

return name_map

def export(self, *args, **kwargs):
if self.dry_run:
logger.info(
"Dry run: Skipping export of shared session files "
f"{self.name_map}"
)
return

for source in self.name_map:
parent, _ = os.path.split(self.name_map[source])
try:
os.makedirs(parent)
except FileExistsError:
pass
except PermissionError:
logger.error(
f"Failed to make dir {parent} for session {self.session}. "
"Permission denied."
)
continue
rel_source = get_relative_source(source, self.name_map[source])
try:
os.symlink(rel_source, self.name_map[source])
except FileExistsError:
pass
except PermissionError:
logger.error(
f"Failed to create {self.name_map[source]}. "
"Permission denied."
)

def outputs_exist(self):
for source in self.name_map:
if not os.path.islink(self.name_map[source]):
# Check if there's a link, NOT whether the source exists.
return False
return True

@classmethod
def get_output_dir(cls, session):
return None

def needs_raw_data(self):
return False


class NiiExporter(SeriesExporter):
"""Export a series to nifti format with datman-style names.
"""
Expand Down
Loading
Loading