Skip to content

Commit

Permalink
added logic to prevent the attempted upload of incomplete sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
tclose committed Sep 26, 2024
1 parent d078555 commit a0154a7
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 106 deletions.
69 changes: 38 additions & 31 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import logging
import typing as ty
import tempfile
from logging.handlers import SMTPHandler

# from logging.handlers import SMTPHandler
import pytest
import click.testing
from click.testing import CliRunner
import xnat4tests # type: ignore[import-untyped]
from datetime import datetime
Expand All @@ -31,11 +33,12 @@
if os.getenv("_PYTEST_RAISE", "0") != "0":

@pytest.hookimpl(tryfirst=True)
def pytest_exception_interact(call):
raise call.excinfo.value
def pytest_exception_interact(call: pytest.CallInfo[ty.Any]) -> None:
if call.excinfo is not None:
raise call.excinfo.value

@pytest.hookimpl(tryfirst=True)
def pytest_internalerror(excinfo):
def pytest_internalerror(excinfo: pytest.ExceptionInfo[BaseException]) -> None:
raise excinfo.value

CATCH_CLI_EXCEPTIONS = False
Expand All @@ -44,84 +47,88 @@ def pytest_internalerror(excinfo):


@pytest.fixture
def catch_cli_exceptions():
def catch_cli_exceptions() -> bool:
return CATCH_CLI_EXCEPTIONS


@pytest.fixture(scope="session")
def run_prefix():
def run_prefix() -> str:
"A datetime string used to avoid stale data left over from previous tests"
return datetime.strftime(datetime.now(), "%Y%m%d%H%M%S")


@pytest.fixture(scope="session")
def xnat_repository():
def xnat_repository() -> None:
xnat4tests.start_xnat()


@pytest.fixture(scope="session")
def xnat_archive_dir(xnat_repository):
return xnat4tests.Config().xnat_root_dir / "archive"
def xnat_archive_dir(xnat_repository: None) -> Path:
return xnat4tests.Config().xnat_root_dir / "archive" # type: ignore[no-any-return]


@pytest.fixture(scope="session")
def tmp_gen_dir():
def tmp_gen_dir() -> Path:
# tmp_gen_dir = Path("~").expanduser() / ".xnat-ingest-work3"
# tmp_gen_dir.mkdir(exist_ok=True)
# return tmp_gen_dir
return Path(tempfile.mkdtemp())


@pytest.fixture(scope="session")
def xnat_login(xnat_repository):
def xnat_login(xnat_repository: str) -> ty.Any:
return xnat4tests.connect()


@pytest.fixture(scope="session")
def xnat_project(xnat_login, run_prefix):
def xnat_project(xnat_login: ty.Any, run_prefix: str) -> ty.Any:
project_id = f"INGESTUPLOAD{run_prefix}"
with xnat4tests.connect() as xnat_login:
xnat_login.put(f"/data/archive/projects/{project_id}")
return project_id


@pytest.fixture(scope="session")
def xnat_server(xnat_config):
return xnat_config.xnat_uri
def xnat_server(xnat_config: xnat4tests.Config) -> str:
return xnat_config.xnat_uri # type: ignore[no-any-return]


@pytest.fixture(scope="session")
def xnat_config(xnat_repository):
def xnat_config(xnat_repository: str) -> xnat4tests.Config:
return xnat4tests.Config()


@pytest.fixture
def cli_runner(catch_cli_exceptions):
def invoke(*args, catch_exceptions=catch_cli_exceptions, **kwargs):
def cli_runner(catch_cli_exceptions: bool) -> ty.Callable[..., ty.Any]:
def invoke(
*args: ty.Any, catch_exceptions: bool = catch_cli_exceptions, **kwargs: ty.Any
) -> click.testing.Result:
runner = CliRunner()
result = runner.invoke(*args, catch_exceptions=catch_exceptions, **kwargs)
result = runner.invoke(*args, catch_exceptions=catch_exceptions, **kwargs) # type: ignore[misc]
return result

return invoke


# Create a custom handler that captures email messages for testing
class TestSMTPHandler(SMTPHandler):
def __init__(
self, mailhost, fromaddr, toaddrs, subject, credentials=None, secure=None
):
super().__init__(mailhost, fromaddr, toaddrs, subject, credentials, secure)
self.emails = [] # A list to store captured email messages
# # Create a custom handler that captures email messages for testing
# class TestSMTPHandler(SMTPHandler):
# def __init__(
# self, mailhost, fromaddr, toaddrs, subject, credentials=None, secure=None
# ):
# super().__init__(mailhost, fromaddr, toaddrs, subject, credentials, secure)
# self.emails = [] # A list to store captured email messages

def emit(self, record):
# Capture the email message and append it to the list
msg = self.format(record)
self.emails.append(msg)
# def emit(self, record):
# # Capture the email message and append it to the list
# msg = self.format(record)
# self.emails.append(msg)


def get_raw_data_files(out_dir: ty.Optional[Path] = None, **kwargs) -> ty.List[Path]:
def get_raw_data_files(
out_dir: ty.Optional[Path] = None, **kwargs: ty.Any
) -> ty.List[Path]:
if out_dir is None:
out_dir = Path(tempfile.mkdtemp())
return get_listmode_data(out_dir, skip_unknown=True, **kwargs) + get_countrate_data(
return get_listmode_data(out_dir, skip_unknown=True, **kwargs) + get_countrate_data( # type: ignore[no-any-return]
out_dir, skip_unknown=True, **kwargs
)
57 changes: 38 additions & 19 deletions xnat_ingest/cli/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import traceback
import click
import tempfile
import shutil
from tqdm import tqdm
from fileformats.core import FileSet
from xnat_ingest.cli.base import cli
Expand Down Expand Up @@ -33,7 +32,7 @@
)
@click.argument("files_path", type=str, envvar="XNAT_INGEST_STAGE_DICOMS_PATH")
@click.argument(
"staging_dir", type=click.Path(path_type=Path), envvar="XNAT_INGEST_STAGE_DIR"
"output_dir", type=click.Path(path_type=Path), envvar="XNAT_INGEST_STAGE_DIR"
)
@click.option(
"--datatype",
Expand Down Expand Up @@ -237,9 +236,15 @@
"physical disk as the staging directory for optimal performance"
),
)
@click.option(
"--copy-mode",
type=FileSet.CopyMode,
default=FileSet.CopyMode.hardlink_or_copy,
help="The method to use for copying files",
)
def stage(
files_path: str,
staging_dir: Path,
output_dir: Path,
datatype: str,
associated_files: ty.List[AssociatedFiles],
project_field: str,
Expand All @@ -260,6 +265,7 @@ def stage(
deidentify: bool,
xnat_login: XnatLogin,
spaces_to_underscores: bool,
copy_mode: FileSet.CopyMode,
work_dir: Path | None = None,
) -> None:
set_logger_handling(
Expand All @@ -269,13 +275,6 @@ def stage(
mail_server=mail_server,
add_logger=add_logger,
)
if work_dir is None:
work_dir = staging_dir.parent / (staging_dir.name + ".work")
if not work_dir.exists():
work_dir.mkdir(parents=True)
cleanup_work_dir = True
else:
cleanup_work_dir = False

if xnat_login:
xnat_repo = Xnat(
Expand Down Expand Up @@ -313,7 +312,19 @@ def stage(
project_id=project_id,
)

logger.info("Staging sessions to '%s'", str(staging_dir))
logger.info("Staging sessions to '%s'", str(output_dir))

# Create sub-directories of the output directory for the different phases of the
# staging process
prestage_dir = output_dir / "PRE-STAGE"
staged_dir = output_dir / "STAGED"
invalid_dir = output_dir / "INVALID"
prestage_dir.mkdir(parents=True, exist_ok=True)
staged_dir.mkdir(parents=True, exist_ok=True)
invalid_dir.mkdir(parents=True, exist_ok=True)
if deidentify:
deidentified_dir = output_dir / "DEIDENTIFIED"
deidentified_dir.mkdir(parents=True, exist_ok=True)

for session in tqdm(sessions, f"Staging resources found in '{files_path}'"):
try:
Expand All @@ -323,14 +334,25 @@ def stage(
spaces_to_underscores=spaces_to_underscores,
)
if deidentify:
session = session.deidentify(
work_dir / "deidentified",
copy_mode=FileSet.CopyMode.hardlink_or_copy,
deidentified_session = session.deidentify(
deidentified_dir,
copy_mode=copy_mode,
)
session.save(
staging_dir.joinpath(*session.staging_relpath),
if delete:
session.unlink()
session = deidentified_session
# We save the session into a temporary "pre-stage" directory first before
# moving them into the final "staged" directory. This is to prevent the
# files being transferred/deleted until the saved session is in a final state.
_, saved_dir = session.save(
prestage_dir.joinpath(*session.staging_relpath),
available_projects=project_list,
copy_mode=copy_mode,
)
if "INVALID" in saved_dir.name:
invalid_dir.rename(staged_dir.joinpath(*session.staging_relpath))
else:
saved_dir.rename(staged_dir.joinpath(*session.staging_relpath))
if delete:
session.unlink()
except Exception as e:
Expand All @@ -342,9 +364,6 @@ def stage(
continue
else:
raise
finally:
if cleanup_work_dir:
shutil.rmtree(work_dir)


if __name__ == "__main__":
Expand Down
46 changes: 34 additions & 12 deletions xnat_ingest/cli/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
set_logger_handling,
StoreCredentials,
)
from xnat_ingest.helpers_upload import (
from xnat_ingest.upload_helpers import (
get_xnat_session,
get_xnat_resource,
get_xnat_checksums,
calculate_checksums,
iterate_s3_sessions,
remove_old_files_on_s3,
remove_old_files_on_ssh,
dir_older_than,
)


Expand Down Expand Up @@ -188,6 +189,15 @@
"'tgz_file' is used"
),
)
@click.option(
"--wait-period",
type=int,
default=1800,
help=(
"The number of seconds to wait between checking for new sessions in the S3 bucket. "
"This is to avoid hitting the S3 API rate limit"
),
)
def upload(
staged: str,
server: str,
Expand All @@ -207,6 +217,7 @@ def upload(
verify_ssl: bool,
use_curl_jsession: bool,
method: str,
wait_period: int,
) -> None:

set_logger_handling(
Expand Down Expand Up @@ -250,18 +261,28 @@ def upload(
num_sessions: int
sessions: ty.Iterable[Path]
if staged.startswith("s3://"):
sessions = iterate_s3_sessions(store_credentials, staged, temp_dir)
sessions = iterate_s3_sessions(
staged, store_credentials, temp_dir, wait_period=wait_period
)
# bit of a hack: number of sessions is the first item in the iterator
num_sessions = next(sessions) # type: ignore[assignment]
else:
sessions = []
for project_dir in Path(staged).iterdir():
for subject_dir in project_dir.iterdir():
for session_dir in subject_dir.iterdir():
sessions.append(session_dir)
if dir_older_than(session_dir, wait_period):
sessions.append(session_dir)
else:
logger.info(
"Skipping '%s' session as it has been modified recently",
session_dir,
)
num_sessions = len(sessions)
logger.info(
"Found %d sessions in staging directory '%s'", num_sessions, staged
"Found %d sessions in staging directory to stage'%s'",
num_sessions,
staged,
)

framesets: dict[str, FrameSet] = {}
Expand All @@ -271,11 +292,12 @@ def upload(
total=num_sessions,
desc=f"Processing staged sessions found in '{staged}'",
):

session = ImagingSession.load(
session_staging_dir, require_manifest=require_manifest
session_staging_dir,
require_manifest=require_manifest,
)
try:

# Create corresponding session on XNAT
xproject = xnat_repo.connection.projects[session.project_id]

Expand All @@ -301,17 +323,13 @@ def upload(
frameset = None
framesets[session.project_id] = frameset

session_path = (
f"{session.project_id}:{session.subject_id}:{session.visit_id}"
)

xsession = get_xnat_session(session, xproject)

# Anonymise DICOMs and save to directory prior to upload
if always_include:
logger.info(
f"Including {always_include} scans/files in upload from '{session.name}' to "
f"{session_path} regardless of whether they are explicitly specified"
f"{session.path} regardless of whether they are explicitly specified"
)

for resource in tqdm(
Expand All @@ -320,10 +338,14 @@ def upload(
frameset, always_include=always_include
)
),
f"Uploading scans found in {session.name}",
f"Uploading resources found in {session.name}",
):
xresource = get_xnat_resource(resource, xsession)
if xresource is None:
logger.info(
"Skipping '%s' resource as it is already uploaded",
resource.path,
)
continue # skipping as resource already exists
if isinstance(resource.fileset, File):
for fspath in resource.fileset.fspaths:
Expand Down
Loading

0 comments on commit a0154a7

Please sign in to comment.