Skip to content

Commit

Permalink
debugging upload refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
tclose committed Sep 26, 2024
1 parent 0a871b4 commit a91a3d3
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 219 deletions.
163 changes: 112 additions & 51 deletions xnat_ingest/cli/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import typing as ty
import traceback
import click
import datetime
import time
import tempfile
from tqdm import tqdm
from fileformats.core import FileSet
Expand All @@ -18,6 +20,11 @@
set_logger_handling,
)

PRE_STAGE_NAME_DEFAULT = "PRE-STAGE"
STAGED_NAME_DEFAULT = "STAGED"
INVALID_NAME_DEFAULT = "INVALID"
DEIDENTIFIED_NAME_DEFAULT = "DEIDENTIFIED"


@cli.command(
help="""Stages DICOM and associated files found in the input directories into separate
Expand Down Expand Up @@ -242,6 +249,36 @@
default=FileSet.CopyMode.hardlink_or_copy,
help="The method to use for copying files",
)
@click.option(
"--loop",
type=int,
default=None,
help="Run the staging process continuously every LOOP seconds",
)
@click.option(
"--pre-stage-dir-name",
type=str,
default=PRE_STAGE_NAME_DEFAULT,
help="The name of the directory to use for pre-staging the files",
)
@click.option(
"--staged-dir-name",
type=str,
default=STAGED_NAME_DEFAULT,
help="The name of the directory to use for staging the files",
)
@click.option(
"--invalid-dir-name",
type=str,
default=INVALID_NAME_DEFAULT,
help="The name of the directory to use for invalid files",
)
@click.option(
"--deidentified-dir-name",
type=str,
default=DEIDENTIFIED_NAME_DEFAULT,
help="The name of the directory to use for deidentified files",
)
def stage(
files_path: str,
output_dir: Path,
Expand All @@ -266,6 +303,11 @@ def stage(
xnat_login: XnatLogin,
spaces_to_underscores: bool,
copy_mode: FileSet.CopyMode,
pre_stage_dir_name: str,
staged_dir_name: str,
invalid_dir_name: str,
deidentified_dir_name: str,
loop: int | None,
work_dir: Path | None = None,
) -> None:
set_logger_handling(
Expand Down Expand Up @@ -300,70 +342,89 @@ def stage(

logger.info(msg)

sessions = ImagingSession.from_paths(
files_path=files_path,
project_field=project_field,
subject_field=subject_field,
visit_field=visit_field,
session_field=session_field,
scan_id_field=scan_id_field,
scan_desc_field=scan_desc_field,
resource_field=resource_field,
project_id=project_id,
)

logger.info("Staging sessions to '%s'", str(output_dir))

# Create sub-directories of the output directory for the different phases of the
# staging process
prestage_dir = output_dir / "PRE-STAGE"
staged_dir = output_dir / "STAGED"
invalid_dir = output_dir / "INVALID"
prestage_dir = output_dir / pre_stage_dir_name
staged_dir = output_dir / staged_dir_name
invalid_dir = output_dir / invalid_dir_name
prestage_dir.mkdir(parents=True, exist_ok=True)
staged_dir.mkdir(parents=True, exist_ok=True)
invalid_dir.mkdir(parents=True, exist_ok=True)
if deidentify:
deidentified_dir = output_dir / "DEIDENTIFIED"
deidentified_dir = output_dir / deidentified_dir_name
deidentified_dir.mkdir(parents=True, exist_ok=True)

for session in tqdm(sessions, f"Staging resources found in '{files_path}'"):
try:
if associated_files:
session.associate_files(
associated_files,
spaces_to_underscores=spaces_to_underscores,
)
if deidentify:
deidentified_session = session.deidentify(
deidentified_dir,
def do_stage() -> None:
sessions = ImagingSession.from_paths(
files_path=files_path,
project_field=project_field,
subject_field=subject_field,
visit_field=visit_field,
session_field=session_field,
scan_id_field=scan_id_field,
scan_desc_field=scan_desc_field,
resource_field=resource_field,
project_id=project_id,
)

logger.info("Staging sessions to '%s'", str(output_dir))

for session in tqdm(sessions, f"Staging resources found in '{files_path}'"):
try:
if associated_files:
session.associate_files(
associated_files,
spaces_to_underscores=spaces_to_underscores,
)
if deidentify:
deidentified_session = session.deidentify(
deidentified_dir,
copy_mode=copy_mode,
)
if delete:
session.unlink()
session = deidentified_session
# We save the session into a temporary "pre-stage" directory first before
# moving them into the final "staged" directory. This is to prevent the
# files being transferred/deleted until the saved session is in a final state.
_, saved_dir = session.save(
prestage_dir,
available_projects=project_list,
copy_mode=copy_mode,
)
if "INVALID" in saved_dir.name:
saved_dir.rename(invalid_dir / saved_dir.relative_to(prestage_dir))
else:
saved_dir.rename(staged_dir / saved_dir.relative_to(prestage_dir))
if delete:
session.unlink()
session = deidentified_session
# We save the session into a temporary "pre-stage" directory first before
# moving them into the final "staged" directory. This is to prevent the
# files being transferred/deleted until the saved session is in a final state.
_, saved_dir = session.save(
prestage_dir.joinpath(*session.staging_relpath),
available_projects=project_list,
copy_mode=copy_mode,
except Exception as e:
if not raise_errors:
logger.error(
f"Skipping '{session.name}' session due to error in staging: \"{e}\""
f"\n{traceback.format_exc()}\n\n"
)
continue
else:
raise

if loop:
while True:
start_time = datetime.datetime.now()
do_stage()
end_time = datetime.datetime.now()
elapsed_seconds = (end_time - start_time).total_seconds()
sleep_time = loop - elapsed_seconds
logger.info(
"Stage took %s seconds, waiting another %s seconds before running "
"again (loop every %s seconds)",
elapsed_seconds,
sleep_time,
loop,
)
if "INVALID" in saved_dir.name:
invalid_dir.rename(staged_dir.joinpath(*session.staging_relpath))
else:
saved_dir.rename(staged_dir.joinpath(*session.staging_relpath))
if delete:
session.unlink()
except Exception as e:
if not raise_errors:
logger.error(
f"Skipping '{session.name}' session due to error in staging: \"{e}\""
f"\n{traceback.format_exc()}\n\n"
)
continue
else:
raise
time.sleep(loop)
else:
do_stage()


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit a91a3d3

Please sign in to comment.