diff --git a/real-tests/usyd_stage.py b/real-tests/usyd_stage.py new file mode 100644 index 0000000..39e4cf6 --- /dev/null +++ b/real-tests/usyd_stage.py @@ -0,0 +1,26 @@ +from click.testing import CliRunner +from xnat_ingest.cli import stage +from xnat_ingest.utils import show_cli_trace + +runner = CliRunner() + + +result = runner.invoke( + stage, + [], + env={ + "XNAT_INGEST_STAGE_DICOMS_PATH": "/vol/vmhost/kubernetes/////**/*.IMA", + "XNAT_INGEST_STAGE_DIR": "/vol/vmhost/usyd-data-export/STAGING", + "XNAT_INGEST_STAGE_PROJECT": "ProtocolName", + "XNAT_INGEST_STAGE_SUBJECT": "PatientID", + "XNAT_INGEST_STAGE_VISIT": "AccessionNumber", + "XNAT_INGEST_STAGE_ASSOCIATED": '"/vol/vmhost/usyd-data-export/RAW-DATA-EXPORT/{PatientName.family_name}_{PatientName.given_name}/.ptd","./[^\\.]+.[^\\.]+.[^\\.]+.(?P\\d+).[A-Z]+_(?P[^\\.]+)."', + "XNAT_INGEST_STAGE_DELETE": "0", + "XNAT_INGEST_STAGE_LOGFILE": ",INFO", + "XNAT_INGEST_STAGE_DEIDENTIFY": "1", + }, + catch_exceptions=False, +) + + +assert result.exit_code == 0, show_cli_trace(result) diff --git a/real-tests/usyd_transfer.py b/real-tests/usyd_transfer.py new file mode 100644 index 0000000..5afe624 --- /dev/null +++ b/real-tests/usyd_transfer.py @@ -0,0 +1,20 @@ +import os +from click.testing import CliRunner +from xnat_ingest.cli import transfer +from xnat_ingest.utils import show_cli_trace + +runner = CliRunner() + + +result = runner.invoke( + transfer, + [], + env={ + "XNAT_INGEST_STAGE_DIR": "/Users/tclose/Data/testing/staging-test/", + "XNAT_INGEST_TRANSFER_LOGFILE": "/Users/tclose/Desktop/test-log.log,INFO", + "XNAT_INGEST_TRANSFER_DELETE": "0", + }, + catch_exceptions=False, +) + +assert result.exit_code == 0, show_cli_trace(result) diff --git a/real-tests/usyd_upload.py b/real-tests/usyd_upload.py new file mode 100644 index 0000000..2a498fa --- /dev/null +++ b/real-tests/usyd_upload.py @@ -0,0 +1,26 @@ +from click.testing import CliRunner +from xnat_ingest.cli import upload +from xnat_ingest.utils import show_cli_trace + +runner = CliRunner() + +result = runner.invoke( + upload, + [], + env={ + "XNAT_INGEST_UPLOAD_STAGED": "", + "XNAT_INGEST_UPLOAD_HOST": "https://xnat.sydney.edu.au", + "XNAT_INGEST_UPLOAD_USER": "", + "XNAT_INGEST_UPLOAD_PASS": "", + "XNAT_INGEST_UPLOAD_ALWAYSINCLUDE": "medimage/dicom-series", + "XNAT_INGEST_UPLOAD_STORE_CREDENTIALS": ",", + "XNAT_INGEST_UPLOAD_LOGFILE": ",INFO", + "XNAT_INGEST_UPLOAD_DELETE": "0", + "XNAT_INGEST_UPLOAD_TEMPDIR": "", + "XNAT_INGEST_UPLOAD_REQUIRE_MANIFEST": "1", + "XNAT_INGEST_UPLOAD_CLEANUP_OLDER_THAN": "30", + }, + catch_exceptions=False, +) + +assert result.exit_code == 0, show_cli_trace(result) diff --git a/xnat_ingest/cli/stage.py b/xnat_ingest/cli/stage.py index 2540639..1ae024f 100644 --- a/xnat_ingest/cli/stage.py +++ b/xnat_ingest/cli/stage.py @@ -99,8 +99,10 @@ ) @click.option( "--log-file", + "log_files", default=None, type=LogFile.cli_type, + multiple=True, nargs=2, metavar=" ", envvar="XNAT_INGEST_STAGE_LOGFILE", @@ -151,19 +153,24 @@ def stage( dicoms_path: str, staging_dir: Path, associated_files: AssociatedFiles, - project_field: str, - subject_field: str, - visit_field: str, + project_field: DicomField, + subject_field: DicomField, + visit_field: DicomField, project_id: str | None, delete: bool, log_level: str, - log_file: LogFile | None, + log_files: ty.List[LogFile], log_emails: ty.List[LogEmail], mail_server: MailServer, raise_errors: bool, deidentify: bool, ): - set_logger_handling(log_level, log_emails, log_file, mail_server) + set_logger_handling( + log_level=log_level, + log_emails=log_emails, + log_files=log_files, + mail_server=mail_server, + ) msg = f"Loading DICOM sessions from '{dicoms_path}'" diff --git a/xnat_ingest/cli/transfer.py b/xnat_ingest/cli/transfer.py index 61ea412..a516d57 100644 --- a/xnat_ingest/cli/transfer.py +++ b/xnat_ingest/cli/transfer.py @@ -10,6 +10,8 @@ logger, LogFile, LogEmail, + StoreCredentials, + XnatLogin, MailServer, set_logger_handling, ) @@ -30,11 +32,13 @@ an SSH server. """, ) -@click.argument("staging_dir", type=str, envvar="XNAT_INGEST_STAGE_DIR") +@click.argument( + "staging_dir", type=click.Path(path_type=Path), envvar="XNAT_INGEST_STAGE_DIR" +) @click.argument("remote_store", type=str, envvar="XNAT_INGEST_TRANSFER_REMOTE_STORE") @click.option( "--store-credentials", - type=click.Path(path_type=Path), + type=StoreCredentials.cli_type, metavar=" ", envvar="XNAT_INGEST_TRANSFER_STORE_CREDENTIALS", default=None, @@ -50,9 +54,11 @@ ) @click.option( "--log-file", + "log_files", default=None, type=LogFile.cli_type, nargs=2, + multiple=True, metavar=" ", envvar="XNAT_INGEST_TRANSFER_LOGFILE", help=( @@ -99,7 +105,7 @@ @click.option( "--xnat-login", nargs=3, - type=str, + type=XnatLogin.cli_type, default=None, metavar=" ", help="The XNAT server to upload to plus the user and password to use", @@ -108,20 +114,25 @@ def transfer( staging_dir: Path, remote_store: str, - credentials: ty.Tuple[str, str], - log_file: LogFile, + store_credentials: ty.Optional[StoreCredentials], + log_files: ty.List[LogFile], log_level: str, log_emails: ty.List[LogEmail], - mail_server: ty.Tuple[MailServer], + mail_server: MailServer, delete: bool, raise_errors: bool, - xnat_login: ty.Optional[ty.Tuple[str, str, str]], + xnat_login: ty.Optional[XnatLogin], ): if not staging_dir.exists(): raise ValueError(f"Staging directory '{staging_dir}' does not exist") - set_logger_handling(log_level, log_file, log_emails, mail_server) + set_logger_handling( + log_level=log_level, + log_files=log_files, + log_emails=log_emails, + mail_server=mail_server, + ) if remote_store.startswith("s3://"): store_type = "s3" @@ -134,11 +145,10 @@ def transfer( ) if xnat_login is not None: - server, user, password = xnat_login xnat_repo = Xnat( - server=server, - user=user, - password=password, + server=xnat_login.host, + user=xnat_login.user, + password=xnat_login.password, cache_dir=Path(tempfile.mkdtemp()), ) else: @@ -204,16 +214,31 @@ def transfer( logger.debug( "Transferring %s to S3 (%s)", session_dir, remote_store ) - sp.check_call( + aws_cmd = ( + sp.check_output("which aws", shell=True).strip().decode("utf-8") + ) + if store_credentials is None: + raise ValueError( + "No store credentials provided for S3 bucket transfer" + ) + process = sp.Popen( [ - "aws", + aws_cmd, "s3", "sync", - "--quiet", str(session_dir), remote_path, - ] + ], + env={ + "AWS_ACCESS_KEY_ID": store_credentials.access_key, + "AWS_SECRET_ACCESS_KEY": store_credentials.access_secret, + }, + stdout=sp.PIPE, + stderr=sp.PIPE, ) + stdout, stderr = process.communicate() + if process.returncode != 0: + raise RuntimeError("AWS sync failed: " + stderr.decode("utf-8")) elif store_type == "ssh": logger.debug( "Transferring %s to %s via SSH", session_dir, remote_store diff --git a/xnat_ingest/cli/upload.py b/xnat_ingest/cli/upload.py index 0282467..8f470a0 100644 --- a/xnat_ingest/cli/upload.py +++ b/xnat_ingest/cli/upload.py @@ -25,6 +25,7 @@ set_logger_handling, get_checksums, calculate_checksums, + StoreCredentials, ) @@ -62,10 +63,12 @@ ) @click.option( "--log-file", + "log_files", default=None, type=LogFile.cli_type, nargs=2, metavar=" ", + multiple=True, envvar="XNAT_INGEST_UPLOAD_LOGFILE", help=( 'Location to write the output logs to, defaults to "upload-logs" in the ' @@ -119,7 +122,7 @@ ) @click.option( "--store-credentials", - type=str, + type=StoreCredentials.cli_type, metavar=" ", envvar="XNAT_INGEST_UPLOAD_STORE_CREDENTIALS", default=None, @@ -158,8 +161,8 @@ def upload( password: str, delete: bool, log_level: str, - log_file: Path, - log_emails: LogEmail, + log_files: ty.List[LogFile], + log_emails: ty.List[LogEmail], mail_server: MailServer, always_include: ty.Sequence[str], raise_errors: bool, @@ -169,8 +172,12 @@ def upload( clean_up_older_than: int, ): - set_logger_handling(log_level, log_file, log_emails, mail_server) - + set_logger_handling( + log_level=log_level, + log_emails=log_emails, + log_files=log_files, + mail_server=mail_server, + ) if temp_dir: tempfile.tempdir = str(temp_dir) @@ -221,11 +228,12 @@ def xnat_session_exists(project_id, subject_id, visit_id): project_ids.add(session_ids[0]) session_objs[session_ids].append((path_parts[3:], obj)) - session_objs = { - ids: objs - for ids, objs in session_objs.items() - if not xnat_session_exists(*ids) - } + for ids, objs in list(session_objs.items()): + if xnat_session_exists(*ids): + logger.info( + "Skipping session '%s' as it already exists on XNAT", ids + ) + del session_objs[ids] num_sessions = len(session_objs) diff --git a/xnat_ingest/utils.py b/xnat_ingest/utils.py index e92f2cd..9946983 100644 --- a/xnat_ingest/utils.py +++ b/xnat_ingest/utils.py @@ -82,23 +82,20 @@ def __str__(self): return self.address -def path_or_none_converter(path: str | Path | None): - if path is None: - return None - return Path(path) - - @attrs.define class LogFile(MultiCliTyped): - path: Path = attrs.field(converter=path_or_none_converter, default=None) - loglevel: ty.Optional[str] = None + path: Path = attrs.field(converter=Path) + loglevel: str + + def __bool__(self): + return bool(self.path) def __str__(self): return str(self.path) def __fspath__(self): - return self.path + return str(self.path) @attrs.define @@ -117,18 +114,31 @@ class AssociatedFiles(CliTyped): identity_pattern: str +@attrs.define +class XnatLogin(CliTyped): + + host: str + user: str + password: str + + +@attrs.define +class StoreCredentials(CliTyped): + + access_key: str + access_secret: str + + def set_logger_handling( log_level: str, log_emails: ty.List[LogEmail] | None, - log_file: LogFile | None, + log_files: ty.List[LogFile] | None, mail_server: MailServer, ): levels = [log_level] - if log_emails: - levels.extend(le.loglevel for le in log_emails) - if log_file and log_file.loglevel: - levels.append(log_file.loglevel) + levels.extend(le.loglevel for le in log_emails) + levels.extend(lf.loglevel for lf in log_files) min_log_level = min(getattr(logging, ll.upper()) for ll in levels) logger.setLevel(min_log_level) @@ -154,7 +164,7 @@ def set_logger_handling( logger.addHandler(smtp_hdle) # Configure the file logger - if log_file: + for log_file in log_files: log_file.path.parent.mkdir(exist_ok=True) log_file_hdle = logging.FileHandler(log_file) if log_file.loglevel: