Skip to content

Commit

Permalink
updated staging and upload so that they should work with AWS
Browse files Browse the repository at this point in the history
  • Loading branch information
tclose committed Feb 9, 2024
1 parent bf6050a commit 435a0ff
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 69 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies = [
"fileformats-medimage-extras",
"pydicom >=2.3.1",
"tqdm >=4.64.1",
"boto3",
"xnat",
"arcana",
"arcana-xnat",
Expand Down
21 changes: 21 additions & 0 deletions scripts/run_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from xnat_ingest.cli import upload
from xnat_ingest.utils import show_cli_trace
from click.testing import CliRunner

PATTERN = "{PatientName.given_name}_{PatientName.family_name}_{SeriesDate}.*"

runner = CliRunner()

result = runner.invoke(
upload,
[
"s3://ais-s3-tbp-s3bucket-1afz0bzdw5jd6/staging",
],
env={
"XNAT_INGEST_HOST": "https://xnat.sydney.edu.au",

},
catch_exceptions=False,
)

assert result.exit_code == 0, show_cli_trace(result)
10 changes: 8 additions & 2 deletions xnat_ingest/cli/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,15 @@ def stage(

for session in tqdm(sessions, f"Staging DICOM sessions found in '{dicoms_path}'"):
try:
session_staging_dir = staging_dir / session.name
session_staging_dir = staging_dir.joinpath(session.staging_relpath)
if session_staging_dir.exists():
logger.info(
"Skipping %s session as staging directory %s already exists",
session.name,
str(session_staging_dir),
)
continue
session_staging_dir.mkdir(exist_ok=True)
session_staging_dir.mkdir(exist_ok=True, parents=True)
# Deidentify files and save them to the staging directory
staged_session = session.stage(
session_staging_dir, associated_files=associated_files,
Expand All @@ -206,3 +206,9 @@ def stage(
continue
else:
raise
else:
if delete:
session.delete()
logger.info("Staged and deleted %s session", session.name)
else:
logger.info("Staged %s session to %s", session.name, str(session_staging_dir))
99 changes: 77 additions & 22 deletions xnat_ingest/cli/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import shutil
import traceback
import typing as ty
from collections import defaultdict
import tempfile
import click
from tqdm import tqdm
Expand Down Expand Up @@ -110,8 +111,9 @@
)
@click.option(
"--aws-creds",
type=ty.Tuple[str, str],
type=str,
metavar="<access-key> <secret-key>",
envvar="XNAT_INGEST_AWS_CREDS",
default=None,
nargs=2,
help="AWS credentials to use for access of data stored S3 of DICOMs",
Expand All @@ -137,29 +139,80 @@ def upload(
server=server, user=user, password=password, cache_dir=Path(tempfile.mkdtemp())
)

if aws_creds:
# List sessions stored in s3 bucket
s3 = boto3.resource("s3", aws_access_key_id=aws_creds[0], aws_secret_access_key=aws_creds[1])
bucket_name, prefix = staged.split("/", 1)
bucket = s3.Bucket(bucket_name)
staged = Path(tempfile.mkdtemp())
for obj in bucket.objects.filter(Prefix=prefix):
if obj.key.endswith("/"):
continue
session_dir = staged / obj.key.split("/", 1)[0]
session_dir.mkdir(parents=True, exist_ok=True)
with open(session_dir / obj.key.split("/")[-1], "wb") as f:
bucket.download_fileobj(obj.key, f)
else:
def list_staged_sessions(staging_dir):
for session_dir in staging_dir.iterdir():
if session_dir.is_dir():
yield session_dir

with xnat_repo.connection:

def xnat_session_exists(project_id, subject_id, session_id):
try:
xnat_repo.connection.projects[project_id].subjects[
subject_id
].experiments[session_id]
except KeyError:
return False
else:
logger.info(
"Skipping session '%s-%s-%s' as it already exists on XNAT",
project_id,
subject_id,
session_id,
)
return True

if staged.startswith("s3://"):
# List sessions stored in s3 bucket
s3 = boto3.resource(
"s3", aws_access_key_id=aws_creds[0], aws_secret_access_key=aws_creds[1]
)
bucket_name, prefix = staged[5:].split("/", 1)
bucket = s3.Bucket(bucket_name)
all_objects = bucket.objects.filter(Prefix=prefix)
session_objs = defaultdict(list)
for obj in all_objects:
if obj.key.endswith("/"):
continue
path_parts = obj.key.split("/")
session_ids = tuple(path_parts[:3])
session_objs[session_ids].append((path_parts[3:-1], obj))

session_objs = {
(ids, objs)
for ids, objs in session_objs.items()
if not xnat_session_exists(*ids)
}

num_sessions = len(session_objs)

tmp_download_dir = Path(tempfile.mkdtemp())

def iter_staged_sessions():
for ids, objs in session_objs.items():
# Just in case the manifest file is not included in the list of objects
# we recreate the project/subject/sesssion directory structure
session_tmp_dir = tmp_download_dir.joinpath(*ids)
session_tmp_dir.mkdir(parents=True, exist_ok=True)
for relpath, obj in objs:
with open(session_tmp_dir.joinpath(relpath), "wb") as f:
bucket.download_fileobj(obj.key, f)
yield session_tmp_dir
shutil.rmtree(
session_tmp_dir
) # Delete the tmp session after the upload

sessions = iter_staged_sessions()
else:
sessions = []
for project_dir in Path(staged).iterdir():
for subject_dir in project_dir.iterdir():
for session_dir in subject_dir.iterdir():
if not xnat_session_exists(
project_dir.name, subject_dir.name, session_dir.name
):
sessions.append(session_dir)
num_sessions = len(sessions)

for session_staging_dir in tqdm(
list(list_staged_sessions(staged)),
f"Processing staged sessions found in '{staged}'",
sessions,
total=num_sessions,
desc=f"Processing staged sessions found in '{staged}'",
):
session = ImagingSession.load(session_staging_dir)
try:
Expand Down Expand Up @@ -304,3 +357,5 @@ def list_staged_sessions(staging_dir):
continue
else:
raise

shutil.rmtree(tmp_download_dir)
Loading

0 comments on commit 435a0ff

Please sign in to comment.