diff --git a/xnat_ingest/cli/stage.py b/xnat_ingest/cli/stage.py index d62dd46..8b4850c 100644 --- a/xnat_ingest/cli/stage.py +++ b/xnat_ingest/cli/stage.py @@ -2,6 +2,8 @@ import typing as ty import traceback import click +import datetime +import time import tempfile from tqdm import tqdm from fileformats.core import FileSet @@ -18,6 +20,11 @@ set_logger_handling, ) +PRE_STAGE_NAME_DEFAULT = "PRE-STAGE" +STAGED_NAME_DEFAULT = "STAGED" +INVALID_NAME_DEFAULT = "INVALID" +DEIDENTIFIED_NAME_DEFAULT = "DEIDENTIFIED" + @cli.command( help="""Stages DICOM and associated files found in the input directories into separate @@ -242,6 +249,36 @@ default=FileSet.CopyMode.hardlink_or_copy, help="The method to use for copying files", ) +@click.option( + "--loop", + type=int, + default=None, + help="Run the staging process continuously every LOOP seconds", +) +@click.option( + "--pre-stage-dir-name", + type=str, + default=PRE_STAGE_NAME_DEFAULT, + help="The name of the directory to use for pre-staging the files", +) +@click.option( + "--staged-dir-name", + type=str, + default=STAGED_NAME_DEFAULT, + help="The name of the directory to use for staging the files", +) +@click.option( + "--invalid-dir-name", + type=str, + default=INVALID_NAME_DEFAULT, + help="The name of the directory to use for invalid files", +) +@click.option( + "--deidentified-dir-name", + type=str, + default=DEIDENTIFIED_NAME_DEFAULT, + help="The name of the directory to use for deidentified files", +) def stage( files_path: str, output_dir: Path, @@ -266,6 +303,11 @@ def stage( xnat_login: XnatLogin, spaces_to_underscores: bool, copy_mode: FileSet.CopyMode, + pre_stage_dir_name: str, + staged_dir_name: str, + invalid_dir_name: str, + deidentified_dir_name: str, + loop: int | None, work_dir: Path | None = None, ) -> None: set_logger_handling( @@ -300,70 +342,89 @@ def stage( logger.info(msg) - sessions = ImagingSession.from_paths( - files_path=files_path, - project_field=project_field, - subject_field=subject_field, - visit_field=visit_field, - session_field=session_field, - scan_id_field=scan_id_field, - scan_desc_field=scan_desc_field, - resource_field=resource_field, - project_id=project_id, - ) - - logger.info("Staging sessions to '%s'", str(output_dir)) - # Create sub-directories of the output directory for the different phases of the # staging process - prestage_dir = output_dir / "PRE-STAGE" - staged_dir = output_dir / "STAGED" - invalid_dir = output_dir / "INVALID" + prestage_dir = output_dir / pre_stage_dir_name + staged_dir = output_dir / staged_dir_name + invalid_dir = output_dir / invalid_dir_name prestage_dir.mkdir(parents=True, exist_ok=True) staged_dir.mkdir(parents=True, exist_ok=True) invalid_dir.mkdir(parents=True, exist_ok=True) if deidentify: - deidentified_dir = output_dir / "DEIDENTIFIED" + deidentified_dir = output_dir / deidentified_dir_name deidentified_dir.mkdir(parents=True, exist_ok=True) - for session in tqdm(sessions, f"Staging resources found in '{files_path}'"): - try: - if associated_files: - session.associate_files( - associated_files, - spaces_to_underscores=spaces_to_underscores, - ) - if deidentify: - deidentified_session = session.deidentify( - deidentified_dir, + def do_stage() -> None: + sessions = ImagingSession.from_paths( + files_path=files_path, + project_field=project_field, + subject_field=subject_field, + visit_field=visit_field, + session_field=session_field, + scan_id_field=scan_id_field, + scan_desc_field=scan_desc_field, + resource_field=resource_field, + project_id=project_id, + ) + + logger.info("Staging sessions to '%s'", str(output_dir)) + + for session in tqdm(sessions, f"Staging resources found in '{files_path}'"): + try: + if associated_files: + session.associate_files( + associated_files, + spaces_to_underscores=spaces_to_underscores, + ) + if deidentify: + deidentified_session = session.deidentify( + deidentified_dir, + copy_mode=copy_mode, + ) + if delete: + session.unlink() + session = deidentified_session + # We save the session into a temporary "pre-stage" directory first before + # moving them into the final "staged" directory. This is to prevent the + # files being transferred/deleted until the saved session is in a final state. + _, saved_dir = session.save( + prestage_dir, + available_projects=project_list, copy_mode=copy_mode, ) + if "INVALID" in saved_dir.name: + saved_dir.rename(invalid_dir / saved_dir.relative_to(prestage_dir)) + else: + saved_dir.rename(staged_dir / saved_dir.relative_to(prestage_dir)) if delete: session.unlink() - session = deidentified_session - # We save the session into a temporary "pre-stage" directory first before - # moving them into the final "staged" directory. This is to prevent the - # files being transferred/deleted until the saved session is in a final state. - _, saved_dir = session.save( - prestage_dir.joinpath(*session.staging_relpath), - available_projects=project_list, - copy_mode=copy_mode, + except Exception as e: + if not raise_errors: + logger.error( + f"Skipping '{session.name}' session due to error in staging: \"{e}\"" + f"\n{traceback.format_exc()}\n\n" + ) + continue + else: + raise + + if loop: + while True: + start_time = datetime.datetime.now() + do_stage() + end_time = datetime.datetime.now() + elapsed_seconds = (end_time - start_time).total_seconds() + sleep_time = loop - elapsed_seconds + logger.info( + "Stage took %s seconds, waiting another %s seconds before running " + "again (loop every %s seconds)", + elapsed_seconds, + sleep_time, + loop, ) - if "INVALID" in saved_dir.name: - invalid_dir.rename(staged_dir.joinpath(*session.staging_relpath)) - else: - saved_dir.rename(staged_dir.joinpath(*session.staging_relpath)) - if delete: - session.unlink() - except Exception as e: - if not raise_errors: - logger.error( - f"Skipping '{session.name}' session due to error in staging: \"{e}\"" - f"\n{traceback.format_exc()}\n\n" - ) - continue - else: - raise + time.sleep(loop) + else: + do_stage() if __name__ == "__main__": diff --git a/xnat_ingest/cli/upload.py b/xnat_ingest/cli/upload.py index af6da31..5cdb23d 100644 --- a/xnat_ingest/cli/upload.py +++ b/xnat_ingest/cli/upload.py @@ -192,12 +192,19 @@ @click.option( "--wait-period", type=int, - default=60, + default=0, help=( - "The number of seconds to wait between checking for new sessions in the S3 bucket. " - "This is to avoid hitting the S3 API rate limit" + "The number of seconds to wait since the last file modification in sessions " + "in the S3 bucket or source file-system directory before uploading them to " + "avoid uploading partial sessions" ), ) +@click.option( + "--loop", + type=int, + default=None, + help="Run the staging process continuously every LOOP seconds", +) def upload( staged: str, server: str, @@ -218,6 +225,7 @@ def upload( use_curl_jsession: bool, method: str, wait_period: int, + loop: int | None, ) -> None: set_logger_handling( @@ -240,185 +248,208 @@ def upload( verify_ssl=verify_ssl, ) - if use_curl_jsession: - jsession = sp.check_output( - [ - "curl", - "-X", - "PUT", - "-d", - f"username={user}&password={password}", - f"{server}/data/services/auth", - ] - ).decode("utf-8") - xnat_repo.connection.depth = 1 - xnat_repo.connection.session = xnat.connect( - server, user=user, jsession=jsession - ) - - with xnat_repo.connection: - - num_sessions: int - sessions: ty.Iterable[Path] - if staged.startswith("s3://"): - sessions = iterate_s3_sessions( - staged, store_credentials, temp_dir, wait_period=wait_period - ) - # bit of a hack: number of sessions is the first item in the iterator - num_sessions = next(sessions) # type: ignore[assignment] - else: - sessions = [] - for project_dir in Path(staged).iterdir(): - for subject_dir in project_dir.iterdir(): - for session_dir in subject_dir.iterdir(): - if dir_older_than(session_dir, wait_period): - sessions.append(session_dir) - else: - logger.info( - "Skipping '%s' session as it has been modified recently", - session_dir, - ) - num_sessions = len(sessions) - logger.info( - "Found %d sessions in staging directory to stage'%s'", - num_sessions, - staged, + def do_upload() -> None: + if use_curl_jsession: + jsession = sp.check_output( + [ + "curl", + "-X", + "PUT", + "-d", + f"username={user}&password={password}", + f"{server}/data/services/auth", + ] + ).decode("utf-8") + xnat_repo.connection.depth = 1 + xnat_repo.connection.session = xnat.connect( + server, user=user, jsession=jsession ) - framesets: dict[str, FrameSet] = {} + with xnat_repo.connection: + + num_sessions: int + sessions: ty.Iterable[Path] + if staged.startswith("s3://"): + sessions = iterate_s3_sessions( + staged, store_credentials, temp_dir, wait_period=wait_period + ) + # bit of a hack: number of sessions is the first item in the iterator + num_sessions = next(sessions) # type: ignore[assignment] + else: + sessions = [] + for session_dir in Path(staged).iterdir(): + if dir_older_than(session_dir, wait_period): + sessions.append(session_dir) + else: + logger.info( + "Skipping '%s' session as it has been modified recently", + session_dir, + ) + num_sessions = len(sessions) + logger.info( + "Found %d sessions in staging directory to stage'%s'", + num_sessions, + staged, + ) - for session_staging_dir in tqdm( - sessions, - total=num_sessions, - desc=f"Processing staged sessions found in '{staged}'", - ): + framesets: dict[str, FrameSet] = {} - session = ImagingSession.load( - session_staging_dir, - require_manifest=require_manifest, - ) - try: - # Create corresponding session on XNAT - xproject = xnat_repo.connection.projects[session.project_id] + for session_staging_dir in tqdm( + sessions, + total=num_sessions, + desc=f"Processing staged sessions found in '{staged}'", + ): - # Access Arcana frameset associated with project + session = ImagingSession.load( + session_staging_dir, + require_manifest=require_manifest, + ) try: - frameset = framesets[session.project_id] - except KeyError: + # Create corresponding session on XNAT + xproject = xnat_repo.connection.projects[session.project_id] + + # Access Arcana frameset associated with project try: - frameset = FrameSet.load(session.project_id, xnat_repo) - except Exception as e: - if not always_include: - logger.error( - "Did not load frameset definition (%s) from %s project " - "on %s. Either '--always-include' flag must be used or " - "the frameset must be defined on XNAT using the `frametree` " - "command line tool (see https://arcanaframework.github.io/frametree/).", - e, - session.project_id, - xnat_repo.server, - ) - continue - else: - frameset = None - framesets[session.project_id] = frameset + frameset = framesets[session.project_id] + except KeyError: + try: + frameset = FrameSet.load(session.project_id, xnat_repo) + except Exception as e: + if not always_include: + logger.error( + "Did not load frameset definition (%s) from %s project " + "on %s. Either '--always-include' flag must be used or " + "the frameset must be defined on XNAT using the `frametree` " + "command line tool (see https://arcanaframework.github.io/frametree/).", + e, + session.project_id, + xnat_repo.server, + ) + continue + else: + frameset = None + framesets[session.project_id] = frameset - xsession = get_xnat_session(session, xproject) + xsession = get_xnat_session(session, xproject) - # Anonymise DICOMs and save to directory prior to upload - if always_include: - logger.info( - f"Including {always_include} scans/files in upload from '{session.name}' to " - f"{session.path} regardless of whether they are explicitly specified" - ) + # Anonymise DICOMs and save to directory prior to upload + if always_include: + logger.info( + f"Including {always_include} scans/files in upload from '{session.name}' to " + f"{session.path} regardless of whether they are explicitly specified" + ) - for resource in tqdm( - sorted( - session.select_resources( - frameset, always_include=always_include + for resource in tqdm( + sorted( + session.select_resources( + frameset, always_include=always_include + ) + ), + f"Uploading resources found in {session.name}", + ): + xresource = get_xnat_resource(resource, xsession) + if xresource is None: + logger.info( + "Skipping '%s' resource as it is already uploaded", + resource.path, + ) + continue # skipping as resource already exists + if isinstance(resource.fileset, File): + for fspath in resource.fileset.fspaths: + xresource.upload(str(fspath), fspath.name) + else: + xresource.upload_dir(resource.fileset.parent, method=method) + logger.debug("retrieving checksums for %s", xresource) + remote_checksums = get_xnat_checksums(xresource) + logger.debug("calculating checksums for %s", xresource) + calc_checksums = calculate_checksums(resource.fileset) + if remote_checksums != calc_checksums: + mismatching = [ + k + for k, v in remote_checksums.items() + if v != calc_checksums[k] + ] + raise RuntimeError( + "Checksums do not match after upload of " + f"'{resource.path}' resource. " + f"Mismatching files were {mismatching}" + ) + logger.info(f"Uploaded '{resource.path}' in '{session.name}'") + logger.info(f"Successfully uploaded all files in '{session.name}'") + # Extract DICOM metadata + logger.info("Extracting metadata from DICOMs on XNAT..") + try: + xnat_repo.connection.put( + f"/data/experiments/{xsession.id}?pullDataFromHeaders=true" ) - ), - f"Uploading resources found in {session.name}", - ): - xresource = get_xnat_resource(resource, xsession) - if xresource is None: - logger.info( - "Skipping '%s' resource as it is already uploaded", - resource.path, + except XNATResponseError as e: + logger.warning( + f"Failed to extract metadata from DICOMs in '{session.name}': {e}" ) - continue # skipping as resource already exists - if isinstance(resource.fileset, File): - for fspath in resource.fileset.fspaths: - xresource.upload(str(fspath), fspath.name) - else: - xresource.upload_dir(resource.fileset.parent, method=method) - logger.debug("retrieving checksums for %s", xresource) - remote_checksums = get_xnat_checksums(xresource) - logger.debug("calculating checksums for %s", xresource) - calc_checksums = calculate_checksums(resource.fileset) - if remote_checksums != calc_checksums: - mismatching = [ - k - for k, v in remote_checksums.items() - if v != calc_checksums[k] - ] - raise RuntimeError( - "Checksums do not match after upload of " - f"'{resource.path}' resource. " - f"Mismatching files were {mismatching}" + try: + xnat_repo.connection.put( + f"/data/experiments/{xsession.id}?fixScanTypes=true" ) - logger.info(f"Uploaded '{resource.path}' in '{session.name}'") - logger.info(f"Successfully uploaded all files in '{session.name}'") - # Extract DICOM metadata - logger.info("Extracting metadata from DICOMs on XNAT..") - try: - xnat_repo.connection.put( - f"/data/experiments/{xsession.id}?pullDataFromHeaders=true" - ) - except XNATResponseError as e: - logger.warning( - f"Failed to extract metadata from DICOMs in '{session.name}': {e}" - ) - try: - xnat_repo.connection.put( - f"/data/experiments/{xsession.id}?fixScanTypes=true" - ) - except XNATResponseError as e: - logger.warning(f"Failed to fix scan types in '{session.name}': {e}") - try: - xnat_repo.connection.put( - f"/data/experiments/{xsession.id}?triggerPipelines=true" - ) - except XNATResponseError as e: - logger.warning( - f"Failed to trigger pipelines in '{session.name}': {e}" - ) - logger.info(f"Succesfully uploaded all files in '{session.name}'") - except Exception as e: - if not raise_errors: - logger.error( - f"Skipping '{session.name}' session due to error in staging: \"{e}\"" - f"\n{traceback.format_exc()}\n\n" - ) - continue - else: - raise + except XNATResponseError as e: + logger.warning( + f"Failed to fix scan types in '{session.name}': {e}" + ) + try: + xnat_repo.connection.put( + f"/data/experiments/{xsession.id}?triggerPipelines=true" + ) + except XNATResponseError as e: + logger.warning( + f"Failed to trigger pipelines in '{session.name}': {e}" + ) + logger.info(f"Succesfully uploaded all files in '{session.name}'") + except Exception as e: + if not raise_errors: + logger.error( + f"Skipping '{session.name}' session due to error in staging: \"{e}\"" + f"\n{traceback.format_exc()}\n\n" + ) + continue + else: + raise - if use_curl_jsession: - xnat_repo.connection.exit() + if use_curl_jsession: + xnat_repo.connection.exit() - if clean_up_older_than: - logger.info( - "Cleaning up files in %s older than %d days", - staged, - clean_up_older_than, - ) - if staged.startswith("s3://"): - remove_old_files_on_s3(remote_store=staged, threshold=clean_up_older_than) - elif "@" in staged: - remove_old_files_on_ssh(remote_store=staged, threshold=clean_up_older_than) - else: - assert False + if clean_up_older_than: + logger.info( + "Cleaning up files in %s older than %d days", + staged, + clean_up_older_than, + ) + if staged.startswith("s3://"): + remove_old_files_on_s3( + remote_store=staged, threshold=clean_up_older_than + ) + elif "@" in staged: + remove_old_files_on_ssh( + remote_store=staged, threshold=clean_up_older_than + ) + else: + assert False + + if loop: + while True: + start_time = datetime.datetime.now() + do_upload() + end_time = datetime.datetime.now() + elapsed_seconds = (end_time - start_time).total_seconds() + sleep_time = loop - elapsed_seconds + logger.info( + "Stage took %s seconds, waiting another %s seconds before running " + "again (loop every %s seconds)", + elapsed_seconds, + sleep_time, + loop, + ) + time.sleep(loop) + else: + do_upload() if __name__ == "__main__": diff --git a/xnat_ingest/tests/test_cli.py b/xnat_ingest/tests/test_cli.py index 88e0c6f..f736049 100644 --- a/xnat_ingest/tests/test_cli.py +++ b/xnat_ingest/tests/test_cli.py @@ -9,6 +9,7 @@ import xnat4tests # type: ignore[import-untyped] from frametree.core.cli.store import add as store_add # type: ignore[import-untyped] from xnat_ingest.cli import stage, upload +from xnat_ingest.cli.stage import STAGED_NAME_DEFAULT from xnat_ingest.utils import show_cli_trace from fileformats.medimage import DicomSeries from medimages4tests.dummy.dicom.pet.wholebody.siemens.biograph_vision.vr20b import ( # type: ignore[import-untyped] @@ -218,7 +219,7 @@ def test_stage_and_upload( result = cli_runner( upload, [ - str(staging_dir), + str(staging_dir / STAGED_NAME_DEFAULT), "--log-file", str(log_file), "info", @@ -230,6 +231,8 @@ def test_stage_and_upload( "--method", "tar_file", "--use-curl-jsession", + "--wait-period", + "0", ], env={ "XNAT_INGEST_UPLOAD_HOST": xnat_server,