From 435a0ff0706314e0cc5c8f6401922e732ed5adff Mon Sep 17 00:00:00 2001 From: Tom Close Date: Fri, 9 Feb 2024 17:09:19 +1100 Subject: [PATCH] updated staging and upload so that they should work with AWS --- pyproject.toml | 1 + scripts/run_upload.py | 21 ++++++ xnat_ingest/cli/stage.py | 10 ++- xnat_ingest/cli/upload.py | 99 +++++++++++++++++++------ xnat_ingest/session.py | 149 ++++++++++++++++++++++++++------------ 5 files changed, 211 insertions(+), 69 deletions(-) create mode 100644 scripts/run_upload.py diff --git a/pyproject.toml b/pyproject.toml index 4fb7843..01af490 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "fileformats-medimage-extras", "pydicom >=2.3.1", "tqdm >=4.64.1", + "boto3", "xnat", "arcana", "arcana-xnat", diff --git a/scripts/run_upload.py b/scripts/run_upload.py new file mode 100644 index 0000000..32a44ee --- /dev/null +++ b/scripts/run_upload.py @@ -0,0 +1,21 @@ +from xnat_ingest.cli import upload +from xnat_ingest.utils import show_cli_trace +from click.testing import CliRunner + +PATTERN = "{PatientName.given_name}_{PatientName.family_name}_{SeriesDate}.*" + +runner = CliRunner() + +result = runner.invoke( + upload, + [ + "s3://ais-s3-tbp-s3bucket-1afz0bzdw5jd6/staging", + ], + env={ + "XNAT_INGEST_HOST": "https://xnat.sydney.edu.au", + + }, + catch_exceptions=False, +) + +assert result.exit_code == 0, show_cli_trace(result) diff --git a/xnat_ingest/cli/stage.py b/xnat_ingest/cli/stage.py index 17dd934..8941fd8 100644 --- a/xnat_ingest/cli/stage.py +++ b/xnat_ingest/cli/stage.py @@ -181,7 +181,7 @@ def stage( for session in tqdm(sessions, f"Staging DICOM sessions found in '{dicoms_path}'"): try: - session_staging_dir = staging_dir / session.name + session_staging_dir = staging_dir.joinpath(session.staging_relpath) if session_staging_dir.exists(): logger.info( "Skipping %s session as staging directory %s already exists", @@ -189,7 +189,7 @@ def stage( str(session_staging_dir), ) continue - session_staging_dir.mkdir(exist_ok=True) + session_staging_dir.mkdir(exist_ok=True, parents=True) # Deidentify files and save them to the staging directory staged_session = session.stage( session_staging_dir, associated_files=associated_files, @@ -206,3 +206,9 @@ def stage( continue else: raise + else: + if delete: + session.delete() + logger.info("Staged and deleted %s session", session.name) + else: + logger.info("Staged %s session to %s", session.name, str(session_staging_dir)) diff --git a/xnat_ingest/cli/upload.py b/xnat_ingest/cli/upload.py index d31075e..1d9aed5 100644 --- a/xnat_ingest/cli/upload.py +++ b/xnat_ingest/cli/upload.py @@ -2,6 +2,7 @@ import shutil import traceback import typing as ty +from collections import defaultdict import tempfile import click from tqdm import tqdm @@ -110,8 +111,9 @@ ) @click.option( "--aws-creds", - type=ty.Tuple[str, str], + type=str, metavar=" ", + envvar="XNAT_INGEST_AWS_CREDS", default=None, nargs=2, help="AWS credentials to use for access of data stored S3 of DICOMs", @@ -137,29 +139,80 @@ def upload( server=server, user=user, password=password, cache_dir=Path(tempfile.mkdtemp()) ) - if aws_creds: - # List sessions stored in s3 bucket - s3 = boto3.resource("s3", aws_access_key_id=aws_creds[0], aws_secret_access_key=aws_creds[1]) - bucket_name, prefix = staged.split("/", 1) - bucket = s3.Bucket(bucket_name) - staged = Path(tempfile.mkdtemp()) - for obj in bucket.objects.filter(Prefix=prefix): - if obj.key.endswith("/"): - continue - session_dir = staged / obj.key.split("/", 1)[0] - session_dir.mkdir(parents=True, exist_ok=True) - with open(session_dir / obj.key.split("/")[-1], "wb") as f: - bucket.download_fileobj(obj.key, f) - else: - def list_staged_sessions(staging_dir): - for session_dir in staging_dir.iterdir(): - if session_dir.is_dir(): - yield session_dir - with xnat_repo.connection: + + def xnat_session_exists(project_id, subject_id, session_id): + try: + xnat_repo.connection.projects[project_id].subjects[ + subject_id + ].experiments[session_id] + except KeyError: + return False + else: + logger.info( + "Skipping session '%s-%s-%s' as it already exists on XNAT", + project_id, + subject_id, + session_id, + ) + return True + + if staged.startswith("s3://"): + # List sessions stored in s3 bucket + s3 = boto3.resource( + "s3", aws_access_key_id=aws_creds[0], aws_secret_access_key=aws_creds[1] + ) + bucket_name, prefix = staged[5:].split("/", 1) + bucket = s3.Bucket(bucket_name) + all_objects = bucket.objects.filter(Prefix=prefix) + session_objs = defaultdict(list) + for obj in all_objects: + if obj.key.endswith("/"): + continue + path_parts = obj.key.split("/") + session_ids = tuple(path_parts[:3]) + session_objs[session_ids].append((path_parts[3:-1], obj)) + + session_objs = { + (ids, objs) + for ids, objs in session_objs.items() + if not xnat_session_exists(*ids) + } + + num_sessions = len(session_objs) + + tmp_download_dir = Path(tempfile.mkdtemp()) + + def iter_staged_sessions(): + for ids, objs in session_objs.items(): + # Just in case the manifest file is not included in the list of objects + # we recreate the project/subject/sesssion directory structure + session_tmp_dir = tmp_download_dir.joinpath(*ids) + session_tmp_dir.mkdir(parents=True, exist_ok=True) + for relpath, obj in objs: + with open(session_tmp_dir.joinpath(relpath), "wb") as f: + bucket.download_fileobj(obj.key, f) + yield session_tmp_dir + shutil.rmtree( + session_tmp_dir + ) # Delete the tmp session after the upload + + sessions = iter_staged_sessions() + else: + sessions = [] + for project_dir in Path(staged).iterdir(): + for subject_dir in project_dir.iterdir(): + for session_dir in subject_dir.iterdir(): + if not xnat_session_exists( + project_dir.name, subject_dir.name, session_dir.name + ): + sessions.append(session_dir) + num_sessions = len(sessions) + for session_staging_dir in tqdm( - list(list_staged_sessions(staged)), - f"Processing staged sessions found in '{staged}'", + sessions, + total=num_sessions, + desc=f"Processing staged sessions found in '{staged}'", ): session = ImagingSession.load(session_staging_dir) try: @@ -304,3 +357,5 @@ def list_staged_sessions(staging_dir): continue else: raise + + shutil.rmtree(tmp_download_dir) diff --git a/xnat_ingest/session.py b/xnat_ingest/session.py index e46a809..2dd6d39 100644 --- a/xnat_ingest/session.py +++ b/xnat_ingest/session.py @@ -4,6 +4,7 @@ import logging import os.path import subprocess as sp +from copy import deepcopy from functools import cached_property import shutil import yaml @@ -62,6 +63,8 @@ class ImagingSession: validator=attrs.validators.instance_of(dict), ) + id_escape_re = re.compile(r"[^a-zA-Z0-9_]+") + def __getitem__(self, fieldname: str) -> ty.Any: return self.metadata[fieldname] @@ -69,6 +72,10 @@ def __getitem__(self, fieldname: str) -> ty.Any: def name(self): return f"{self.project_id}-{self.subject_id}-{self.session_id}" + @property + def staging_relpath(self): + return [self.project_id, self.subject_id, self.session_id] + @cached_property def modalities(self) -> ty.Set[str]: modalities = self["Modality"] @@ -264,7 +271,7 @@ def get_id(field): f"Multiple values for '{field}' tag found within scans in session: " f"{session_dicom_series}" ) - id_ = id_.replace(" ", "_") + id_ = cls.id_escape_re("", id_) return id_ scans = [] @@ -289,62 +296,111 @@ def get_id(field): return sessions @classmethod - def load(cls, save_dir: Path): + def load(cls, save_dir: Path) -> "ImagingSession": """Override IDs extracted from DICOM metadata with manually specified IDs loaded from a YAML Parameters ---------- - yaml_file : Path - name of the file to load the manually specified IDs from (YAML format) + save_dir : Path + the path to the directory where the session is saved + + Returns + ------- + ImagingSession + the loaded session """ yaml_file = save_dir / cls.SAVE_FILENAME - try: - with open(yaml_file) as f: - dct = yaml.load(f, Loader=yaml.SafeLoader) - except Exception as e: - add_exc_note( - e, - f"Loading saved session from {yaml_file}, please check that it " - "is a valid YAML file", - ) - raise e - scans = [] - for scan_id, scan_dict in dct["scans"].items(): - scans.append( - ImagingScan( - id=scan_id, - type=scan_dict["type"], - resources={ - n: from_mime(d["datatype"])(d["fspaths"]) - for n, d in scan_dict["resources"].items() - }, + if yaml_file.exists(): + # Load session from YAML file metadata + try: + with open(yaml_file) as f: + dct = yaml.load(f, Loader=yaml.SafeLoader) + except Exception as e: + add_exc_note( + e, + f"Loading saved session from {yaml_file}, please check that it " + "is a valid YAML file", + ) + raise e + scans = [] + for scan_id, scan_dict in dct["scans"].items(): + scans.append( + ImagingScan( + id=scan_id, + type=scan_dict["type"], + resources={ + n: from_mime(d["datatype"])(d["fspaths"]) + for n, d in scan_dict["resources"].items() + }, + ) ) + dct["scans"] = scans + session = cls(**dct) + else: + # Load session based on directory structure + scans = [] + for scan_dir in save_dir.iterdir(): + scan_id, scan_type = scan_dir.name.split("-") + scan_resources = {} + for resource_dir in scan_dir.iterdir(): + scan_resources[resource_dir.name] = FileSet(resource_dir.iterdir()) + scans.append( + ImagingScan( + id=scan_id, + type=scan_type, + resources=scan_resources, + ) + ) + project_id = save_dir.parent.parent.name + subject_id = save_dir.parent.name + session_id = save_dir.name + session = cls( + scans=scans, + project_id=project_id, + subject_id=subject_id, + session_id=session_id, ) - dct["scans"] = scans - return cls(**dct) + return session - def save(self, save_dir: Path): + def save(self, save_dir: Path) -> "ImagingSession": """Save the project/subject/session IDs loaded from the session to a YAML file, so they can be manually overridden. Parameters ---------- - yaml_file : Path - name of the file to load the manually specified IDs from (YAML format) + save_dir: Path + the path to save the session metadata into (NB: the data is typically also + stored in the directory structure of the session, but this is not necessary) + + Returns + ------- + saved: ImagingSession + the saved session with the updated file-system paths """ dct = attrs.asdict(self, recurse=False) dct["scans"] = {} + scans_dir = save_dir / "scans" + if scans_dir.exists(): + shutil.rmtree(scans_dir) + scans_dir.mkdir() + saved = deepcopy(self) for scan in self.scans.values(): + resources_dict = {} + for resource_name, fileset in scan.resources.items(): + resource_dir = scans_dir / f"{scan.id}-{scan.type}" / resource_name + resource_dir.mkdir(parents=True) + fileset_copy = fileset.copy( + resource_dir, mode=fileset.CopyMode.hardlink_or_copy + ) + resources_dict[resource_name] = { + "datatype": to_mime(fileset, official=False), + "fspaths": [str(p) for p in fileset_copy.fspaths], + } + saved.scans[scan.id].resources[resource_name] = fileset_copy dct["scans"][scan.id] = { "type": scan.type, - "resources": { - n: { - "datatype": to_mime(f, official=False), - "fspaths": [str(p) for p in f.fspaths], - } - for n, f in scan.resources.items() - }, + "resources": resources_dict, } yaml_file = save_dir / self.SAVE_FILENAME with open(yaml_file, "w") as f: @@ -352,13 +408,14 @@ def save(self, save_dir: Path): dct, f, ) + return saved def stage( self, dest_dir: Path, associated_files: ty.Tuple[str, str], - delete_original: bool = False, - deidentify: bool = True + remove_original: bool = False, + deidentify: bool = True, ) -> "ImagingSession": r"""Stages and deidentifies files by removing the fields listed `FIELDS_TO_ANONYMISE` and replacing birth date with 01/01/ and returning new imaging session @@ -380,7 +437,7 @@ def stage( Used to extract the scan ID & type/resource from the associated filename. Should be a regular-expression (Python syntax) with named groups called 'id' and 'type', e.g. '[^\.]+\.[^\.]+\.(?P\d+)\.(?P\w+)\..*' - delete_original : bool + remove_original : bool delete original files after they have been staged, false by default deidentify : bool deidentify the scans in the staging process, true by default @@ -411,8 +468,10 @@ def stage( dicom, scan_dir / (dicom.metadata["SOPInstanceUID"] + dicom_ext), - delete_original=delete_original, + remove_original=remove_original, ) + elif remove_original: + staged_fspath = dicom.move(scan_dir) else: staged_fspath = dicom.copy(scan_dir) staged_dicom_paths.append(staged_fspath) @@ -459,9 +518,9 @@ def stage( dest_path = tmpdir / new.name if Dicom.matches(old): self.deidentify_dicom( - old, dest_path, delete_original=delete_original + old, dest_path, remove_original=remove_original ) - elif delete_original: + elif remove_original: old.rename(dest_path) else: shutil.copyfile(old, dest_path) @@ -525,7 +584,7 @@ def stage( ) os.rmdir(tmpdir) # Should be empty # Remove all references scans in original session as they have been deleted - if delete_original: + if remove_original: self.scans = {} return type(self)( scans=staged_scans, @@ -535,7 +594,7 @@ def stage( ) def deidentify_dicom( - self, dicom_file: Path, new_path: Path, delete_original: bool = False + self, dicom_file: Path, new_path: Path, remove_original: bool = False ) -> Path: if dcmedit_path: # Copy to new path @@ -559,7 +618,7 @@ def deidentify_dicom( else: elem.value = "" dcm.save_as(new_path) - if delete_original: + if remove_original: os.unlink(dicom_file) return new_path