diff --git a/xnat_ingest/cli/stage.py b/xnat_ingest/cli/stage.py index dd0c65c..7bf150c 100644 --- a/xnat_ingest/cli/stage.py +++ b/xnat_ingest/cli/stage.py @@ -60,23 +60,23 @@ ) @click.option( "--associated-files", - type=AssociatedFiles(), + type=AssociatedFiles.cli_type, nargs=2, default=None, envvar="XNAT_INGEST_ASSOCIATED", metavar=" ", help=( - "The \"glob\" arg is a glob pattern by which to detect associated files to be " + 'The "glob" arg is a glob pattern by which to detect associated files to be ' "attached to the DICOM sessions. Note that when this pattern corresponds to a " "relative path it is considered to be relative to the parent directory containing " "the DICOMs for the session NOT the current working directory Can contain string " "templates corresponding to DICOM metadata fields, which are substituted before " "the glob is called. For example, " - '"./associated/{PatientName.given_name}_{PatientName.family_name}/*)\" ' + '"./associated/{PatientName.given_name}_{PatientName.family_name}/*)" ' "will find all files under the subdirectory within '/path/to/dicoms/associated' that matches " "_. Will be interpreted as being relative to `dicoms_dir` " "if a relative path is provided.\n" - "The \"id-pattern\" arg is a regular expression that is used to extract the scan ID & " + 'The "id-pattern" arg is a regular expression that is used to extract the scan ID & ' "type/resource from the associated filename. Should be a regular-expression " "(Python syntax) with named groups called 'id' and 'type', e.g. " r"--assoc-id-pattern '[^\.]+\.[^\.]+\.(?P\d+)\.(?P\w+)\..*'" @@ -98,7 +98,7 @@ @click.option( "--log-file", default=None, - type=LogFile(), + type=LogFile.cli_type, nargs=2, metavar=" ", envvar="XNAT_INGEST_LOGFILE", @@ -110,7 +110,7 @@ @click.option( "--log-email", "log_emails", - type=LogEmail(), + type=LogEmail.cli_type, nargs=3, metavar="
", multiple=True, @@ -122,7 +122,7 @@ ) @click.option( "--mail-server", - type=MailServer(), + type=MailServer.cli_type, nargs=4, metavar=" ", default=None, @@ -142,7 +142,7 @@ "--deidentify/--dont-deidentify", default=False, type=bool, - help="whether to deidentify the file names and DICOM metadata before staging" + help="whether to deidentify the file names and DICOM metadata before staging", ) def stage( dicoms_path: str, @@ -160,7 +160,7 @@ def stage( raise_errors: bool, deidentify: bool, ): - set_logger_handling(log_level, log_file, log_emails, mail_server) + set_logger_handling(log_level, log_emails, log_file, mail_server) msg = f"Loading DICOM sessions from '{dicoms_path}'" @@ -193,7 +193,8 @@ def stage( continue # Identify theDeidentify files if necessary and save them to the staging directory session.stage( - staging_dir, associated_files=associated_files, + staging_dir, + associated_files=associated_files, remove_original=delete, deidentify=deidentify, ) diff --git a/xnat_ingest/cli/transfer.py b/xnat_ingest/cli/transfer.py index 2f22401..2222c21 100644 --- a/xnat_ingest/cli/transfer.py +++ b/xnat_ingest/cli/transfer.py @@ -82,7 +82,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int): @click.argument("remote_store", type=str, envvar="XNAT_INGEST_REMOTE_STORE") @click.option( "--store-credentials", - type=str, + type=click.Path(path_type=Path), metavar=" ", envvar="XNAT_INGEST_STORE_CREDENTIALS", default=None, @@ -99,7 +99,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int): @click.option( "--log-file", default=None, - type=LogFile(), + type=LogFile.cli_type, nargs=2, metavar=" ", envvar="XNAT_INGEST_LOGFILE", @@ -111,7 +111,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int): @click.option( "--log-email", "log_emails", - type=LogEmail(), + type=LogEmail.cli_type, nargs=3, metavar="
", multiple=True, @@ -123,7 +123,7 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int): ) @click.option( "--mail-server", - type=MailServer(), + type=MailServer.cli_type, metavar=" ", default=None, envvar="XNAT_INGEST_MAILSERVER", @@ -161,20 +161,19 @@ def remove_old_files_on_ssh(remote_store: str, threshold: int): help="The number of days to keep files in the remote store for", ) def transfer( - staging_dir: str, + staging_dir: Path, remote_store: str, credentials: ty.Tuple[str, str], - log_file: ty.Tuple[str, str], + log_file: LogFile, log_level: str, - log_emails: ty.List[ty.Tuple[str, str, str]], - mail_server: ty.Tuple[str, str, str, str], + log_emails: ty.List[LogEmail], + mail_server: ty.Tuple[MailServer], delete: bool, raise_errors: bool, xnat_login: ty.Optional[ty.Tuple[str, str, str]], clean_up_older_than: int, ): - staging_dir = Path(staging_dir) if not staging_dir.exists(): raise ValueError(f"Staging directory '{staging_dir}' does not exist") diff --git a/xnat_ingest/cli/upload.py b/xnat_ingest/cli/upload.py index e249e33..5818721 100644 --- a/xnat_ingest/cli/upload.py +++ b/xnat_ingest/cli/upload.py @@ -60,7 +60,7 @@ @click.option( "--log-file", default=None, - type=LogFile(), + type=LogFile.cli_type, nargs=2, metavar=" ", envvar="XNAT_INGEST_LOGFILE", @@ -72,7 +72,7 @@ @click.option( "--log-email", "log_emails", - type=LogEmail(), + type=LogEmail.cli_type, nargs=3, metavar="
", multiple=True, @@ -84,7 +84,7 @@ ) @click.option( "--mail-server", - type=MailServer(), + type=MailServer.cli_type, metavar=" ", default=None, envvar="XNAT_INGEST_MAILSERVER", diff --git a/xnat_ingest/session.py b/xnat_ingest/session.py index 7f54fea..a02e168 100644 --- a/xnat_ingest/session.py +++ b/xnat_ingest/session.py @@ -26,7 +26,7 @@ from arcana.core.data.tree import DataTree from arcana.core.exceptions import ArcanaDataMatchError from .exceptions import DicomParseError, StagingError -from .utils import add_exc_note, transform_paths +from .utils import add_exc_note, transform_paths, DicomField, AssociatedFiles from .dicom import dcmedit_path import random import string @@ -34,11 +34,16 @@ logger = logging.getLogger("xnat-ingest") +def scan_type_converter(scan_type: str) -> str: + "Ensure there aren't any special characters that aren't valid file/dir paths" + return re.sub(r"[\"\*\/\:\<\>\?\\\|\+\,\.\;\=\[\]]+", "", scan_type) + + @attrs.define class ImagingScan: id: str - type: str - resources: ty.Dict[str, FileSet] + type: str = attrs.field(converter=scan_type_converter) + resources: ty.Dict[str, FileSet] = attrs.field() def __contains__(self, resource_name): return resource_name in self.resources @@ -91,7 +96,9 @@ def make_session_id(cls, project_id, subject_id, visit_id): def modalities(self) -> ty.Set[str]: modalities = self.metadata["Modality"] if not isinstance(modalities, str): - modalities = set(tuple(m) if not isinstance(m, str) else m for m in modalities) + modalities = set( + tuple(m) if not isinstance(m, str) else m for m in modalities + ) return modalities @property @@ -200,9 +207,9 @@ def metadata(self): def from_dicoms( cls, dicoms_path: str | Path, - project_field: str = "StudyID", - subject_field: str = "PatientID", - visit_field: str = "AccessionNumber", + project_field: DicomField = DicomField("StudyID"), + subject_field: DicomField = DicomField("PatientID"), + visit_field: DicomField = DicomField("AccessionNumber"), project_id: str | None = None, ) -> ty.List["ImagingSession"]: """Loads all imaging sessions from a list of DICOM files @@ -478,7 +485,7 @@ def save(self, save_dir: Path, just_manifest: bool = False) -> "ImagingSession": def stage( self, dest_dir: Path, - associated_files: ty.Optional[ty.Tuple[str, str]] = None, + associated_files: ty.Optional[AssociatedFiles] = None, remove_original: bool = False, deidentify: bool = True, ) -> "ImagingSession": @@ -528,6 +535,7 @@ def stage( ): staged_resources = {} for resource_name, fileset in scan.resources.items(): + # Ensure scan type is a valid directory name scan_dir = session_dir / f"{scan.id}-{scan.type}" / resource_name scan_dir.mkdir(parents=True, exist_ok=True) if isinstance(fileset, DicomSeries): @@ -559,7 +567,7 @@ def stage( # substitute string templates int the glob template with values from the # DICOM metadata to construct a glob pattern to select files associated # with current session - associated_fspaths = set() + associated_fspaths: ty.Set[Path] = set() for dicom_dir in self.dicom_dirs: assoc_glob = dicom_dir / associated_files.glob.format(**self.metadata) # Select files using the constructed glob pattern @@ -579,7 +587,7 @@ def stage( if deidentify: # Transform the names of the paths to remove any identiable information transformed_fspaths = transform_paths( - associated_fspaths, + list(associated_fspaths), associated_files.glob, self.metadata, staged_metadata, @@ -603,7 +611,7 @@ def stage( shutil.copyfile(old, dest_path) staged_associated_fspaths.append(dest_path) else: - staged_associated_fspaths = associated_fspaths + staged_associated_fspaths = list(associated_fspaths) # Identify scan id, type and resource names from deidentified file paths assoc_scans = {} @@ -624,7 +632,9 @@ def stage( except IndexError: scan_type = scan_id if scan_id not in assoc_scans: - assoc_resources = defaultdict(list) + assoc_resources: ty.DefaultDict[str, ty.List[Path]] = defaultdict( + list + ) assoc_scans[scan_id] = (scan_type, assoc_resources) else: prev_scan_type, assoc_resources = assoc_scans[scan_id] diff --git a/xnat_ingest/utils.py b/xnat_ingest/utils.py index 3930d95..8179feb 100644 --- a/xnat_ingest/utils.py +++ b/xnat_ingest/utils.py @@ -11,44 +11,70 @@ from fileformats.core import FileSet from .dicom import DicomField # noqa + logger = logging.getLogger("xnat-ingest") +class classproperty(object): + def __init__(self, f): + self.f = f + + def __get__(self, obj, owner): + return self.f(owner) + + class CliType(click.types.ParamType): is_composite = True + def __init__( + self, + type_, + multiple=False, + ): + self.type = type_ + self.multiple = multiple + def convert( self, value: ty.Any, param: click.Parameter | None, ctx: click.Context | None ): - return type(self)(*value) + return self.type(*value) @property def arity(self): - return len(attrs.fields(type(self))) + return len(attrs.fields(self.type)) @property def name(self): return type(self).__name__.lower() - @classmethod - def split_envvar_value(cls, envvar): - return cls(*envvar.split(",")) + def split_envvar_value(self, envvar): + if self.multiple: + return [self.type(*entry.split(",")) for entry in envvar.split(";")] + else: + return self.type(*envvar.split(",")) + + +class CliTyped: + @classproperty + def cli_type(cls): + return CliType(cls) -class MultiCliType(CliType): - @classmethod - def split_envvar_value(cls, envvar): - return [cls(*entry.split(",")) for entry in envvar.split(";")] +class MultiCliTyped: + + @classproperty + def cli_type(cls): + return CliType(cls, multiple=True) @attrs.define -class LogEmail(CliType): +class LogEmail(CliTyped): - address: str = None - loglevel: str = None - subject: str = None + address: str + loglevel: str + subject: str def __str__(self): return self.address @@ -61,10 +87,10 @@ def path_or_none_converter(path: str | Path | None): @attrs.define -class LogFile(MultiCliType): +class LogFile(MultiCliTyped): path: Path = attrs.field(converter=path_or_none_converter, default=None) - loglevel: str = None + loglevel: ty.Optional[str] = None def __str__(self): return str(self.path) @@ -74,29 +100,32 @@ def __fspath__(self): @attrs.define -class MailServer(CliType): +class MailServer(CliTyped): - host: str = None - sender_email: str = None - user: str = None - password: str = None + host: str + sender_email: str + user: str + password: str @attrs.define -class AssociatedFiles(CliType): +class AssociatedFiles(CliTyped): - glob: str = None - identity_pattern: str = None + glob: str + identity_pattern: str def set_logger_handling( - log_level: str, log_emails: ty.List[LogEmail] | None, log_file: LogFile | None, mail_server: MailServer + log_level: str, + log_emails: ty.List[LogEmail] | None, + log_file: LogFile | None, + mail_server: MailServer, ): levels = [log_level] if log_emails: levels.extend(le.loglevel for le in log_emails) - if log_file: + if log_file and log_file.loglevel: levels.append(log_file.loglevel) min_log_level = min(getattr(logging, ll.upper()) for ll in levels) @@ -108,7 +137,7 @@ def set_logger_handling( raise ValueError( "Mail server needs to be provided, either by `--mail-server` option or " "XNAT_INGEST_MAILSERVER environment variable if logger emails " - "are provided: " + ", ".join(log_emails) + "are provided: " + ", ".join(str(le) for le in log_emails) ) for log_email in log_emails: smtp_hdle = logging.handlers.SMTPHandler( @@ -126,7 +155,8 @@ def set_logger_handling( if log_file: log_file.path.parent.mkdir(exist_ok=True) log_file_hdle = logging.FileHandler(log_file) - log_file_hdle.setLevel(getattr(logging, log_file.loglevel.upper())) + if log_file.loglevel: + log_file_hdle.setLevel(getattr(logging, log_file.loglevel.upper())) log_file_hdle.setFormatter( logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") ) @@ -280,7 +310,7 @@ def transform_paths( while templ_attr_re.findall(expr): expr = templ_attr_re.sub(r"{\1.\2}", expr) - group_count = Counter() + group_count: Counter[str] = Counter() # Create regex groups for string template args def str_templ_to_regex_group(match) -> str: