Skip to content

Commit

Permalink
got raw data direct stage to work
Browse files Browse the repository at this point in the history
  • Loading branch information
tclose committed Sep 26, 2024
1 parent c545f3c commit 8214f59
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 79 deletions.
58 changes: 38 additions & 20 deletions xnat_ingest/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
class ImagingResource:
name: str
fileset: FileSet
checksums: dict[str, str] = attrs.field()
scan: "xnat_ingest.scan.ImagingScan" = attrs.field(default=None)
checksums: dict[str, str] = attrs.field(eq=False, repr=False)
scan: "xnat_ingest.scan.ImagingScan" = attrs.field(
default=None, eq=False, repr=False
)

@checksums.default
def calculate_checksums(self) -> dict[str, str]:
Expand Down Expand Up @@ -59,7 +61,7 @@ def save(
copy_mode: FileSet.CopyMode = FileSet.CopyMode.copy,
calculate_checksums: bool = True,
overwrite: bool | None = None,
) -> None:
) -> Self:
"""Save the resource to a directory
Parameters
Expand All @@ -76,6 +78,11 @@ def save(
issued, if False an exception will be raised, if True then the resource is
saved regardless of the files being newer
Returns
-------
ImagingResource
The saved resource
Raises
------
FileExistsError
Expand All @@ -87,28 +94,39 @@ def save(
self.calculate_checksums() if calculate_checksums else self.checksums
)
if resource_dir.exists():
loaded = self.load(resource_dir, require_manifest=False)
if loaded.checksums == checksums:
return
elif overwrite is None and not self.newer_than_or_equal(loaded):
try:
loaded = self.load(resource_dir, require_manifest=False)
if loaded.checksums == checksums:
return loaded
elif overwrite is None and not self.newer_than_or_equal(loaded):
logger.warning(
f"Resource '{self.name}' already exists in '{dest_dir}' but "
"the files are not older than the ones to be be saved"
)
elif overwrite:
logger.warning(
f"Resource '{self.name}' already exists in '{dest_dir}', overwriting"
)
shutil.rmtree(resource_dir)
else:
if overwrite is None:
msg = "and the files are not older than the ones to be be saved"
else:
msg = ""
raise FileExistsError(
f"Resource '{self.name}' already exists in '{dest_dir}'{msg}, set "
"'overwrite' to True to overwrite regardless of file times"
)
except DifferingCheckumsException:
logger.warning(
f"Resource '{self.name}' already exists in '{dest_dir}' but "
"the files are not older than the ones to be be saved"
f"Resource '{self.name}' already exists in '{dest_dir}', but it is "
"incomplete, overwriting"
)
elif overwrite:
shutil.rmtree(resource_dir)
else:
if overwrite is None:
msg = "and the files are not older than the ones to be be saved"
else:
msg = ""
raise FileExistsError(
f"Resource '{self.name}' already exists in '{dest_dir}'{msg}, set "
"'overwrite' to True to overwrite regardless of file times"
)
self.fileset.copy(resource_dir, mode=copy_mode, trim=True)
saved_fileset = self.fileset.copy(resource_dir, mode=copy_mode, trim=True)
manifest = {"datatype": self.fileset.mime_like, "checksums": checksums}
Json.new(resource_dir / self.MANIFEST_FNAME, manifest)
return type(self)(name=self.name, fileset=saved_fileset, checksums=checksums)

@classmethod
def load(
Expand Down
37 changes: 36 additions & 1 deletion xnat_ingest/scan.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import typing as ty
import re
from pathlib import Path
from typing_extensions import Self
import logging
import attrs
from fileformats.core import FileSet
Expand Down Expand Up @@ -44,7 +46,9 @@ class ImagingScan:
factory=dict, converter=scan_resources_converter
)
associated: AssociatedFiles | None = None
session: "xnat_ingest.session.ImagingSession" = attrs.field(default=None)
session: "xnat_ingest.session.ImagingSession" = attrs.field(
default=None, eq=False, repr=False
)

def __contains__(self, resource_name: str) -> bool:
return resource_name in self.resources
Expand All @@ -56,6 +60,37 @@ def __attrs_post_init__(self) -> None:
for resource in self.resources.values():
resource.scan = self

def new_empty(self) -> Self:
return type(self)(self.id, self.type)

def save(
self,
dest_dir: Path,
copy_mode: FileSet.CopyMode = FileSet.CopyMode.hardlink_or_copy,
) -> Self:
# Ensure scan type is a valid directory name
saved = self.new_empty()
scan_dir = dest_dir / f"{self.id}-{self.type}"
scan_dir.mkdir(parents=True, exist_ok=True)
for resource in self.resources.values():
saved_resource = resource.save(scan_dir, copy_mode=copy_mode)
saved_resource.scan = saved
saved.resources[saved_resource.name] = saved_resource
return saved

@classmethod
def load(cls, scan_dir: Path, require_manifest: bool = True) -> Self:
scan_id, scan_type = scan_dir.name.split("-", 1)
scan = cls(scan_id, scan_type)
for resource_dir in scan_dir.iterdir():
if resource_dir.is_dir():
resource = ImagingResource.load(
resource_dir, require_manifest=require_manifest
)
resource.scan = scan
scan.resources[resource.name] = resource
return scan

@property
def path(self) -> str:
return self.session.path + ":" + self.id + "-" + self.type
53 changes: 22 additions & 31 deletions xnat_ingest/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ def select_resources(
f"{mime_like!r} does not correspond to a file format ({fileformat})"
)
for scan in self.scans.values():
for resource_name, fileset in scan.resources.items():
if isinstance(fileset, fileformat):
uploaded.add((scan.id, resource_name))
yield self.scans[scan.id].resources[resource_name]
for resource in scan.resources.values():
if isinstance(resource.fileset, fileformat):
uploaded.add((scan.id, resource.name))
yield resource
if dataset is not None:
for column in dataset.columns.values():
try:
Expand All @@ -192,9 +192,15 @@ def select_resources(
always_include,
)
continue
fileset = column.datatype(scan.resources[resource_name])
resource = scan.resources[resource_name]
if not isinstance(resource.fileset, column.datatype):
resource = ImagingResource(
name=resource_name,
fileset=column.datatype(resource.fileset),
scan=scan,
)
uploaded.add((scan.id, resource_name))
yield self.scans[scan_id].resources[resource_name]
yield resource

@cached_property
def metadata(self) -> dict[str, ty.Any]:
Expand Down Expand Up @@ -586,21 +592,10 @@ def load(
visit_id=visit_id,
)
for scan_dir in session_dir.iterdir():
if not scan_dir.is_dir():
continue
scan_id, scan_type = scan_dir.name.split("-", 1)
scan_resources = {}
for resource_dir in scan_dir.iterdir():
scan_resources[resource_dir.name] = ImagingResource.load(
resource_dir,
require_manifest=require_manifest,
check_checksums=check_checksums,
)
session.scans[scan_id] = ImagingScan(
scan_id,
scan_type,
scan_resources, # type: ignore[arg-type]
)
if scan_dir.is_dir():
scan = ImagingScan.load(scan_dir, require_manifest=require_manifest)
scan.session = session
session.scans[scan.id] = scan
return session

def save(
Expand Down Expand Up @@ -652,22 +647,18 @@ def save(
Path
the path to the directory where the session is saved
"""
staged = self.new_empty()
saved = self.new_empty()
if available_projects is None or self.project_id in available_projects:
project_id = self.project_id
else:
project_id = "INVALID_UNRECOGNISED_" + self.project_id
session_dir = dest_dir / "-".join((project_id, self.subject_id, self.visit_id))
session_dir.mkdir(parents=True)
session_dir.mkdir(parents=True, exist_ok=True)
for scan in tqdm(self.scans.values(), f"Staging sessions to {session_dir}"):
for resource in scan.resources.values():
# Ensure scan type is a valid directory name
resource_dir = session_dir / f"{scan.id}-{scan.type}" / resource.name
resource_dir.mkdir(parents=True, exist_ok=True)
staged_fileset = resource.fileset.copy(resource_dir, mode=copy_mode)
staged.add_resource(scan.id, scan.type, resource.name, staged_fileset)

return staged, session_dir
saved_scan = scan.save(session_dir, copy_mode=copy_mode)
saved_scan.session = saved
saved.scans[saved_scan.id] = saved_scan
return saved, session_dir

MANIFEST_FILENAME = "MANIFEST.yaml"

Expand Down
2 changes: 1 addition & 1 deletion xnat_ingest/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def get(self, entry: DataEntry, datatype: type) -> DataType:
the item stored within the specified entry
"""
scan_id, resource_name = entry.uri
return datatype(self.session.scans[scan_id][resource_name]) # type: ignore[no-any-return]
return datatype(self.session.scans[scan_id][resource_name].fileset) # type: ignore[no-any-return]

######################################
# The following methods can be empty #
Expand Down
50 changes: 24 additions & 26 deletions xnat_ingest/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@
(
"listmode",
"medimage/vnd.siemens.biograph128-vision.vr20b.pet-list-mode",
".*/LISTMODE",
".*/PET_LISTMODE",
),
# (
# "sinogram",
# "medimage/vnd.siemens.biograph128-vision.vr20b.pet-sinogram",
# ".*/EM_SINO",
# ".*/PET_EM_SINO",
# ),
(
"countrate",
"medimage/vnd.siemens.biograph128-vision.vr20b.pet-count-rate",
".*/COUNTRATE",
".*/PET_COUNTRATE",
),
]

Expand Down Expand Up @@ -196,15 +196,16 @@ def test_session_select_resources(
Vnd_Siemens_Biograph128Vision_Vr20b_PetRawData,
str(assoc_dir)
+ "/{PatientName.family_name}_{PatientName.given_name}*.ptd",
r".*/[^\.]+.[^\.]+.[^\.]+.(?P<id>\d+)\.[A-Z]+_(?P<resource>[^\.]+).*",
r".*/[^\.]+.[^\.]+.[^\.]+.(?P<id>\d+)\.(?P<resource>[^\.]+).*",
)
],
spaces_to_underscores=True,
)

saved_session, saved_dir = imaging_session.save(staging_dir)

resources = list(saved_session.select_resources(dataset))
resources_iter = saved_session.select_resources(dataset)
resources = list(resources_iter)

assert len(resources) == 5 # 6
ids, descs, resource_names, scans = zip(*resources)
Expand All @@ -218,7 +219,9 @@ def test_session_select_resources(
# "603",
]
)
assert set(resource_names) == set(("DICOM", "LISTMODE", "COUNTRATE")) # , "EM_SINO"
assert set(resource_names) == set(
("DICOM", "PET_LISTMODE", "PET_COUNTRATE")
) # , "PET_EM_SINO"
assert set(type(s) for s in scans) == set(
[
DicomSeries,
Expand All @@ -232,7 +235,7 @@ def test_session_select_resources(
def test_session_save_roundtrip(tmp_path: Path, imaging_session: ImagingSession):

# Save imaging sessions to a temporary directory
saved = imaging_session.save(tmp_path)
saved, _ = imaging_session.save(tmp_path)
assert saved is not imaging_session

# Calculate where the session should have been saved to
Expand All @@ -248,15 +251,15 @@ def test_session_save_roundtrip(tmp_path: Path, imaging_session: ImagingSession)
rereloaded = ImagingSession.load(session_dir)
assert rereloaded == saved

# Load from saved directory, this time only using directory structure instead of
# manifest. Should be the same with the exception of the detected fileformats
loaded_no_manifest = ImagingSession.load(session_dir, use_manifest=False)
for scan in loaded_no_manifest.scans.values():
for key, resource in list(scan.resources.items()):
if key == "DICOM":
assert isinstance(resource, FileSet)
scan.resources[key] = DicomSeries(resource)
assert loaded_no_manifest == saved
# # Load from saved directory, this time only using directory structure instead of
# # manifest. Should be the same with the exception of the detected fileformats
# loaded_no_manifest = ImagingSession.load(session_dir, require_manifest=False)
# for scan in loaded_no_manifest.scans.values():
# for key, resource in list(scan.resources.items()):
# if key == "DICOM":
# assert isinstance(resource, FileSet)
# scan.resources[key] = DicomSeries(resource)
# assert loaded_no_manifest == saved


def test_stage_raw_data_directly(raw_frameset: FrameSet, tmp_path: Path):
Expand Down Expand Up @@ -299,16 +302,11 @@ def test_stage_raw_data_directly(raw_frameset: FrameSet, tmp_path: Path):
for staged_session in staged_sessions:
resources = list(staged_session.select_resources(raw_frameset))

assert len(resources) == 5
ids, descs, resource_names, scans = zip(*resources)
assert set(ids) == set(("602",))
assert set(descs) == set(
[
"602",
]
)
assert set(resource_names) == set(("LISTMODE", "COUNTRATE"))
assert set(type(s) for s in scans) == set(
assert len(resources) == 2
assert set([r.scan.id for r in resources]) == set(["602"])
assert set([r.scan.type for r in resources]) == set(["PET_Raw_Data"])
assert set(r.name for r in resources) == set(("PET_LISTMODE", "PET_COUNTRATE"))
assert set(type(r.fileset) for r in resources) == set(
[
Vnd_Siemens_Biograph128Vision_Vr20b_PetListMode,
Vnd_Siemens_Biograph128Vision_Vr20b_PetCountRate,
Expand Down

0 comments on commit 8214f59

Please sign in to comment.