diff --git a/dax/processors_v3.py b/dax/processors_v3.py index 269f6f9a..fa1ee1fb 100644 --- a/dax/processors_v3.py +++ b/dax/processors_v3.py @@ -1407,6 +1407,7 @@ def is_first_mr_session(self, session, project_data): def _map_inputs(self, session, project_data): inputs = self.proc_inputs artefacts_by_input = {k: [] for k in inputs} + artefact_ids_by_input = {k: [] for k in inputs} # Get lists for scans/assrs for this session LOGGER.debug('prepping session data') @@ -1468,8 +1469,39 @@ def _map_inputs(self, session, project_data): # Break if the scan matches so we don't find it again comparing # vs a different requested type artefacts_by_input[i].append(cscan['full_path']) + artefact_ids_by_input[i].append(scanid) break + # If requested, check for multiple matching scans in the list and only keep + # the first. Sort lowercase by alpha, on scan ID. + if len(artefacts_by_input[i]) > 0 and iv['keep_multis'] != 'all': + scan_info = zip( + artefacts_by_input[i], + artefact_ids_by_input[i], + ) + sorted_info = sorted(scan_info, key=lambda x: str(x[1]).lower()) + num_scans = sum(1 for _ in sorted_info) + if iv['keep_multis'] == 'first': + idx_multi = 1 + elif iv['keep_multis'] == 'last': + idx_multi = num_scans + else: + try: + idx_multi = int(iv['keep_multis']) + except: + msg = f'For {i}, keep_multis must be first, last, or index 1,2,3,...' + LOGGER.error(msg) + raise AutoProcessorError(msg) + if idx_multi > num_scans: + msg = f'Requested {idx_multi}th scan for {i}, but only {num_scans} found' + LOGGER.error(msg) + raise AutoProcessorError(msg) + artefacts_by_input[i] = [sorted_info[idx_multi-1][0]] + LOGGER.info( + f'Keeping only the {idx_multi}th scan found for ' + f'{i}: {sorted_info[idx_multi-1][0]}' + ) + elif iv['artefact_type'] == 'assessor': for cassr in assrs: proctype = cassr.get('PROCTYPE') diff --git a/dax/rcq/analysislauncher.py b/dax/rcq/analysislauncher.py index b6c85c2c..2c0f77e9 100644 --- a/dax/rcq/analysislauncher.py +++ b/dax/rcq/analysislauncher.py @@ -359,22 +359,52 @@ def get_session_inputs(self, spec, info, subject, session): for scan_spec in spec.get('scans', []): logger.debug(f'scan_spec={scan_spec}') + # Get list of resources to download from this scan + resources = scan_spec.get('resources', []) + + # Check for nifti tag + if 'nifti' in scan_spec: + # Add a NIFTI resource using value as fdest + resources.append({ + 'resource': 'NIFTI', + 'fdest': scan_spec['nifti'] + }) + scan_types = scan_spec['types'].split(',') logger.debug(f'scan_types={scan_types}') - for scan in [x for x in scans if x['SCANTYPE'] in scan_types]: + input_scans = [x for x in scans if x['SCANTYPE'] in scan_types] - # Get list of resources to download from this scan - resources = scan_spec.get('resources', []) + if len(input_scans) > 0 and scan_spec.get('keep_multis', '') != 'all': + # Sort by id + input_scans = sorted(input_scans, lambda: x: x['SCANID']) + num_scans = len(input_scans) - # Check for nifti tag - if 'nifti' in scan_spec: - # Add a NIFTI resource using value as fdest - resources.append({ - 'resource': 'NIFTI', - 'fdest': scan_spec['nifti'] - }) + # Apply keep_multis filter + if scan_spec['keep_multis'] == 'first': + idx_multi = 1 + elif iv['keep_multis'] == 'last': + idx_multi = num_scans + else: + try: + idx_multi = int(iv['keep_multis']) + except: + msg = 'keep_multis must be first, last, or index 1,2,3,...' + logger.error(msg) + raise Exception(msg) + + if idx_multi > num_scans: + msg = f'{idx_multi} index exceeds found {num_scans}' + logger.error(msg) + raise Exception(msg) + + # Get a list of only the requested scan + input_scans = [input_scans[idx_multi-1]] + + # Get the file inputs for each input scan + for scan in input_scans: + scanid = scan['SCANID'] for res_spec in resources: try: @@ -384,7 +414,7 @@ def get_session_inputs(self, spec, info, subject, session): continue # Get the download destination subdir - ddest = f'{subject}/{session}' + ddest = f'{subject}/{session}/scans/{scanid}' if res_spec.get('ddest', False): ddest += '/' + res_spec.get('ddest') @@ -396,7 +426,7 @@ def get_session_inputs(self, spec, info, subject, session): scan['SCANID'], res ) - fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{session}/scans/{scan["SCANID"]}/resources/{res}/files/{_file}' + fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{session}/scans/{scanid}/resources/{res}/files/{_file}' inputs.append(self._input( fpath, 'FILE', @@ -438,11 +468,12 @@ def get_session_inputs(self, spec, info, subject, session): logger.debug(f'assr_types={assr_types}') for assr in [x for x in assessors if x['PROCTYPE'] in assr_types]: + assrlabel = assr["ASSR"] for res_spec in assr_spec['resources']: # Get the download destination subdir - ddest = f'{subject}/{session}' + ddest = f'{subject}/{session}/assessors/{assrlabel}' if res_spec.get('ddest', False): ddest += '/' + res_spec.get('ddest') @@ -454,7 +485,7 @@ def get_session_inputs(self, spec, info, subject, session): if 'fmatch' in res_spec: for fmatch in res_spec['fmatch'].split(','): - fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{session}/assessors/{assr["ASSR"]}/out/resources/{res}/files/{fmatch}' + fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{session}/assessors/{assrlabel}/out/resources/{res}/files/{fmatch}' inputs.append(self._input( fpath, 'FILE', @@ -462,7 +493,7 @@ def get_session_inputs(self, spec, info, subject, session): ddest)) else: # whole resource - fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{session}/assessors/{assr["ASSR"]}/out/resources/{res}/files' + fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{session}/assessors/{assrlabel}/out/resources/{res}/files' inputs.append(self._input( fpath, 'DIR', @@ -481,6 +512,8 @@ def get_subject_inputs(self, spec, info, subject): sgp = [x for x in info['sgp'] if x['SUBJECT'] == subject] for assr in sgp: + assrlabel = assr['ASSR'] + for assr_spec in sgp_spec: logger.debug(f'assr_spec={assr_spec}') assr_types = assr_spec['types'].split(',') @@ -494,7 +527,7 @@ def get_subject_inputs(self, spec, info, subject): for res_spec in assr_spec['resources']: # Get the download destination subdir - ddest = f'{subject}' + ddest = f'{subject}/assessors/{assrlabel}' if res_spec.get('ddest', False): ddest += '/' + res_spec.get('ddest') @@ -508,7 +541,7 @@ def get_subject_inputs(self, spec, info, subject): if 'fmatch' in res_spec: # Add each file for fmatch in res_spec['fmatch'].split(','): - fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{assr["ASSR"]}/resources/{res}/files/{fmatch}' + fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{assrlabel}/resources/{res}/files/{fmatch}' inputs.append(self._input( fpath, 'FILE', @@ -517,7 +550,7 @@ def get_subject_inputs(self, spec, info, subject): )) else: # We want the whole resource as one download - fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{assr["ASSR"]}/resources/{res}/files' + fpath = f'data/projects/{info["name"]}/subjects/{subject}/experiments/{assrlabel}/resources/{res}/files' inputs.append(self._input( fpath, 'DIR',