diff --git a/python/lsst/daf/butler/_butler.py b/python/lsst/daf/butler/_butler.py index e2804dd0aa..04eae2647e 100644 --- a/python/lsst/daf/butler/_butler.py +++ b/python/lsst/daf/butler/_butler.py @@ -814,7 +814,8 @@ def get_dataset_type(self, name: str) -> DatasetType: def get_dataset( self, id: DatasetId, - storage_class: str | StorageClass | None, + *, + storage_class: str | StorageClass | None = None, dimension_records: bool = False, datastore_records: bool = False, ) -> DatasetRef | None: diff --git a/python/lsst/daf/butler/datastore/_datastore.py b/python/lsst/daf/butler/datastore/_datastore.py index f0226643f3..9dfd8f8327 100644 --- a/python/lsst/daf/butler/datastore/_datastore.py +++ b/python/lsst/daf/butler/datastore/_datastore.py @@ -540,6 +540,23 @@ def get( """ raise NotImplementedError("Must be implemented by subclass") + def prepare_get_for_external_client(self, ref: DatasetRef) -> object: + """Retrieve serializable data that can be used to execute a get() + + Parameters + ---------- + ref : `DatasetRef` + Reference to the required dataset. + + Returns + ------- + payload : `object` + Serializable payload containing the information needed to perform a + get() operation. This payload may be sent over the wire to another + system to perform the get(). + """ + raise NotImplementedError() + @abstractmethod def put(self, inMemoryDataset: Any, datasetRef: DatasetRef) -> None: """Write a `InMemoryDataset` with a given `DatasetRef` to the store. diff --git a/python/lsst/daf/butler/datastore/cache_manager.py b/python/lsst/daf/butler/datastore/cache_manager.py index 8196cf2a45..034f8ac3d3 100644 --- a/python/lsst/daf/butler/datastore/cache_manager.py +++ b/python/lsst/daf/butler/datastore/cache_manager.py @@ -1015,7 +1015,11 @@ class DatastoreDisabledCacheManager(AbstractDatastoreCacheManager): in lookup keys. """ - def __init__(self, config: str | DatastoreCacheManagerConfig, universe: DimensionUniverse): + def __init__( + self, + config: str | DatastoreCacheManagerConfig | None = None, + universe: DimensionUniverse | None = None, + ): return def should_be_cached(self, entity: DatasetRef | DatasetType | StorageClass) -> bool: diff --git a/python/lsst/daf/butler/datastore/generic_base.py b/python/lsst/daf/butler/datastore/generic_base.py index 9fe2c71f96..ca72d2e32e 100644 --- a/python/lsst/daf/butler/datastore/generic_base.py +++ b/python/lsst/daf/butler/datastore/generic_base.py @@ -29,7 +29,7 @@ from __future__ import annotations -__all__ = ("GenericBaseDatastore",) +__all__ = ("GenericBaseDatastore", "post_process_get") import logging from collections.abc import Mapping @@ -54,54 +54,6 @@ class GenericBaseDatastore(Datastore, Generic[_InfoType]): Should always be sub-classed since key abstract methods are missing. """ - def _post_process_get( - self, - inMemoryDataset: object, - readStorageClass: StorageClass, - assemblerParams: Mapping[str, Any] | None = None, - isComponent: bool = False, - ) -> object: - """Given the Python object read from the datastore, manipulate - it based on the supplied parameters and ensure the Python - type is correct. - - Parameters - ---------- - inMemoryDataset : `object` - Dataset to check. - readStorageClass: `StorageClass` - The `StorageClass` used to obtain the assembler and to - check the python type. - assemblerParams : `dict`, optional - Parameters to pass to the assembler. Can be `None`. - isComponent : `bool`, optional - If this is a component, allow the inMemoryDataset to be `None`. - - Returns - ------- - dataset : `object` - In-memory dataset, potentially converted to expected type. - """ - # Process any left over parameters - if assemblerParams: - inMemoryDataset = readStorageClass.delegate().handleParameters(inMemoryDataset, assemblerParams) - - # Validate the returned data type matches the expected data type - pytype = readStorageClass.pytype - - allowedTypes = [] - if pytype: - allowedTypes.append(pytype) - - # Special case components to allow them to be None - if isComponent: - allowedTypes.append(type(None)) - - if allowedTypes and not isinstance(inMemoryDataset, tuple(allowedTypes)): - inMemoryDataset = readStorageClass.coerce_type(inMemoryDataset) - - return inMemoryDataset - def _validate_put_parameters(self, inMemoryDataset: object, ref: DatasetRef) -> None: """Validate the supplied arguments for put. @@ -173,3 +125,51 @@ def transfer(self, inputDatastore: Datastore, ref: DatasetRef) -> None: assert inputDatastore is not self # unless we want it for renames? inMemoryDataset = inputDatastore.get(ref) return self.put(inMemoryDataset, ref) + + +def post_process_get( + inMemoryDataset: object, + readStorageClass: StorageClass, + assemblerParams: Mapping[str, Any] | None = None, + isComponent: bool = False, +) -> object: + """Given the Python object read from the datastore, manipulate + it based on the supplied parameters and ensure the Python + type is correct. + + Parameters + ---------- + inMemoryDataset : `object` + Dataset to check. + readStorageClass: `StorageClass` + The `StorageClass` used to obtain the assembler and to + check the python type. + assemblerParams : `dict`, optional + Parameters to pass to the assembler. Can be `None`. + isComponent : `bool`, optional + If this is a component, allow the inMemoryDataset to be `None`. + + Returns + ------- + dataset : `object` + In-memory dataset, potentially converted to expected type. + """ + # Process any left over parameters + if assemblerParams: + inMemoryDataset = readStorageClass.delegate().handleParameters(inMemoryDataset, assemblerParams) + + # Validate the returned data type matches the expected data type + pytype = readStorageClass.pytype + + allowedTypes = [] + if pytype: + allowedTypes.append(pytype) + + # Special case components to allow them to be None + if isComponent: + allowedTypes.append(type(None)) + + if allowedTypes and not isinstance(inMemoryDataset, tuple(allowedTypes)): + inMemoryDataset = readStorageClass.coerce_type(inMemoryDataset) + + return inMemoryDataset diff --git a/python/lsst/daf/butler/datastore/stored_file_info.py b/python/lsst/daf/butler/datastore/stored_file_info.py index ab1572d901..73eb3c75a4 100644 --- a/python/lsst/daf/butler/datastore/stored_file_info.py +++ b/python/lsst/daf/butler/datastore/stored_file_info.py @@ -27,13 +27,14 @@ from __future__ import annotations -__all__ = ("StoredDatastoreItemInfo", "StoredFileInfo") +__all__ = ("StoredDatastoreItemInfo", "StoredFileInfo", "SerializedStoredFileInfo") import inspect from collections.abc import Iterable, Mapping from dataclasses import dataclass from typing import TYPE_CHECKING, Any +from lsst.daf.butler._compat import _BaseModelCompat from lsst.resources import ResourcePath from lsst.utils import doImportType from lsst.utils.introspection import get_full_type_name @@ -214,7 +215,7 @@ def __init__( """StorageClass associated with Dataset.""" component: str | None - """Component associated with this file. Can be None if the file does + """Component associated with this file. Can be `None` if the file does not refer to a component of a composite.""" checksum: str | None @@ -260,6 +261,13 @@ def to_record(self, **kwargs: Any) -> dict[str, Any]: **kwargs, ) + def to_simple(self) -> SerializedStoredFileInfo: + record = self.to_record() + # We allow None on the model but the record contains a "null string" + # instead + record["component"] = self.component + return SerializedStoredFileInfo.model_validate(record) + def file_location(self, factory: LocationFactory) -> Location: """Return the location of artifact. @@ -307,6 +315,10 @@ def from_record(cls: type[StoredFileInfo], record: Mapping[str, Any]) -> StoredF ) return info + @classmethod + def from_simple(cls: type[StoredFileInfo], model: SerializedStoredFileInfo) -> StoredFileInfo: + return cls.from_record(dict(model)) + def update(self, **kwargs: Any) -> StoredFileInfo: new_args = {} for k in self.__slots__: @@ -320,3 +332,26 @@ def update(self, **kwargs: Any) -> StoredFileInfo: def __reduce__(self) -> str | tuple[Any, ...]: return (self.from_record, (self.to_record(),)) + + +class SerializedStoredFileInfo(_BaseModelCompat): + """Serialized representation of `StoredFileInfo` properties""" + + formatter: str + """Fully-qualified name of Formatter.""" + + path: str + """Path to dataset within Datastore.""" + + storage_class: str + """Name of the StorageClass associated with Dataset.""" + + component: str | None + """Component associated with this file. Can be `None` if the file does + not refer to a component of a composite.""" + + checksum: str | None + """Checksum of the serialized dataset.""" + + file_size: int + """Size of the serialized dataset in bytes.""" diff --git a/python/lsst/daf/butler/datastores/fileDatastore.py b/python/lsst/daf/butler/datastores/fileDatastore.py index c07862ea2b..cf2d879447 100644 --- a/python/lsst/daf/butler/datastores/fileDatastore.py +++ b/python/lsst/daf/butler/datastores/fileDatastore.py @@ -36,7 +36,6 @@ import logging from collections import defaultdict from collections.abc import Callable, Iterable, Mapping, Sequence -from dataclasses import dataclass from typing import TYPE_CHECKING, Any, ClassVar, cast from lsst.daf.butler import ( @@ -72,6 +71,16 @@ from lsst.daf.butler.datastore.generic_base import GenericBaseDatastore from lsst.daf.butler.datastore.record_data import DatastoreRecordData from lsst.daf.butler.datastore.stored_file_info import StoredDatastoreItemInfo, StoredFileInfo +from lsst.daf.butler.datastores.file_datastore.get import ( + DatasetLocationInformation, + DatastoreFileGetInformation, + generate_datastore_get_information, + get_dataset_as_python_object_from_get_info, +) +from lsst.daf.butler.datastores.fileDatastoreClient import ( + FileDatastoreGetPayload, + FileDatastoreGetPayloadFileInfo, +) from lsst.daf.butler.registry.interfaces import ( DatabaseInsertMode, DatastoreRegistryBridge, @@ -81,12 +90,11 @@ from lsst.daf.butler.repo_relocation import replaceRoot from lsst.daf.butler.utils import transactional from lsst.resources import ResourcePath, ResourcePathExpression -from lsst.utils.introspection import get_class_of, get_instance_of +from lsst.utils.introspection import get_class_of from lsst.utils.iteration import chunk_iterable # For VERBOSE logging usage. from lsst.utils.logging import VERBOSE, getLogger -from lsst.utils.timer import time_this from sqlalchemy import BigInteger, String if TYPE_CHECKING: @@ -110,34 +118,6 @@ def __init__(self, datasets: Iterable[FileDataset]): self.datasets = datasets -@dataclass(frozen=True) -class DatastoreFileGetInformation: - """Collection of useful parameters needed to retrieve a file from - a Datastore. - """ - - location: Location - """The location from which to read the dataset.""" - - formatter: Formatter - """The `Formatter` to use to deserialize the dataset.""" - - info: StoredFileInfo - """Stored information about this file and its formatter.""" - - assemblerParams: Mapping[str, Any] - """Parameters to use for post-processing the retrieved dataset.""" - - formatterParams: Mapping[str, Any] - """Parameters that were understood by the associated formatter.""" - - component: str | None - """The component to be retrieved (can be `None`).""" - - readStorageClass: StorageClass - """The `StorageClass` of the dataset being read.""" - - class FileDatastore(GenericBaseDatastore[StoredFileInfo]): """Generic Datastore for file-based implementations. @@ -577,7 +557,7 @@ def removeStoredItemInfo(self, ref: DatasetIdRef) -> None: def _get_dataset_locations_info( self, ref: DatasetIdRef, ignore_datastore_records: bool = False - ) -> list[tuple[Location, StoredFileInfo]]: + ) -> list[DatasetLocationInformation]: r"""Find all the `Location`\ s of the requested dataset in the `Datastore` and the associated stored file information. @@ -702,7 +682,7 @@ def _get_expected_dataset_locations_info(self, ref: DatasetRef) -> list[tuple[Lo for location, formatter, storageClass, component in all_info ] - def _prepare_for_get( + def _prepare_for_direct_get( self, ref: DatasetRef, parameters: Mapping[str, Any] | None = None ) -> list[DatastoreFileGetInformation]: """Check parameters for ``get`` and obtain formatter and @@ -738,8 +718,6 @@ def _prepare_for_get( fileLocations = self._get_expected_dataset_locations_info(ref) if len(fileLocations) > 1: - disassembled = True - # If trust is involved it is possible that there will be # components listed here that do not exist in the datastore. # Explicitly check for file artifact existence and filter out any @@ -753,58 +731,12 @@ def _prepare_for_get( if not fileLocations: raise FileNotFoundError(f"None of the component files for dataset {ref} exist.") - else: - disassembled = False - - # Is this a component request? - refComponent = ref.datasetType.component() - - fileGetInfo = [] - for location, storedFileInfo in fileLocations: - # The storage class used to write the file - writeStorageClass = storedFileInfo.storageClass - - # If this has been disassembled we need read to match the write - if disassembled: - readStorageClass = writeStorageClass - else: - readStorageClass = refStorageClass - - formatter = get_instance_of( - storedFileInfo.formatter, - FileDescriptor( - location, - readStorageClass=readStorageClass, - storageClass=writeStorageClass, - parameters=parameters, - ), - ref.dataId, - ) - - formatterParams, notFormatterParams = formatter.segregateParameters() - - # Of the remaining parameters, extract the ones supported by - # this StorageClass (for components not all will be handled) - assemblerParams = readStorageClass.filterParameters(notFormatterParams) - - # The ref itself could be a component if the dataset was - # disassembled by butler, or we disassembled in datastore and - # components came from the datastore records - component = storedFileInfo.component if storedFileInfo.component else refComponent - - fileGetInfo.append( - DatastoreFileGetInformation( - location, - formatter, - storedFileInfo, - assemblerParams, - formatterParams, - component, - readStorageClass, - ) - ) - - return fileGetInfo + return generate_datastore_get_information( + fileLocations, + readStorageClass=refStorageClass, + ref=ref, + parameters=parameters, + ) def _prepare_for_put(self, inMemoryDataset: Any, ref: DatasetRef) -> tuple[Location, Formatter]: """Check the arguments for ``put`` and obtain formatter and @@ -1342,177 +1274,6 @@ def _removeFileExists(uri: ResourcePath) -> None: # URI is needed to resolve what ingest case are we dealing with return self._extractIngestInfo(uri, ref, formatter=formatter) - def _read_artifact_into_memory( - self, - getInfo: DatastoreFileGetInformation, - ref: DatasetRef, - isComponent: bool = False, - cache_ref: DatasetRef | None = None, - ) -> Any: - """Read the artifact from datastore into in memory object. - - Parameters - ---------- - getInfo : `DatastoreFileGetInformation` - Information about the artifact within the datastore. - ref : `DatasetRef` - The registry information associated with this artifact. - isComponent : `bool` - Flag to indicate if a component is being read from this artifact. - cache_ref : `DatasetRef`, optional - The DatasetRef to use when looking up the file in the cache. - This ref must have the same ID as the supplied ref but can - be a parent ref or component ref to indicate to the cache whether - a composite file is being requested from the cache or a component - file. Without this the cache will default to the supplied ref but - it can get confused with read-only derived components for - disassembled composites. - - Returns - ------- - inMemoryDataset : `object` - The artifact as a python object. - """ - location = getInfo.location - uri = location.uri - log.debug("Accessing data from %s", uri) - - if cache_ref is None: - cache_ref = ref - if cache_ref.id != ref.id: - raise ValueError( - "The supplied cache dataset ref refers to a different dataset than expected:" - f" {ref.id} != {cache_ref.id}" - ) - - # Cannot recalculate checksum but can compare size as a quick check - # Do not do this if the size is negative since that indicates - # we do not know. - recorded_size = getInfo.info.file_size - resource_size = uri.size() - if recorded_size >= 0 and resource_size != recorded_size: - raise RuntimeError( - "Integrity failure in Datastore. " - f"Size of file {uri} ({resource_size}) " - f"does not match size recorded in registry of {recorded_size}" - ) - - # For the general case we have choices for how to proceed. - # 1. Always use a local file (downloading the remote resource to a - # temporary file if needed). - # 2. Use a threshold size and read into memory and use bytes. - # Use both for now with an arbitrary hand off size. - # This allows small datasets to be downloaded from remote object - # stores without requiring a temporary file. - - formatter = getInfo.formatter - nbytes_max = 10_000_000 # Arbitrary number that we can tune - if resource_size <= nbytes_max and formatter.can_read_bytes(): - with self.cacheManager.find_in_cache(cache_ref, uri.getExtension()) as cached_file: - if cached_file is not None: - desired_uri = cached_file - msg = f" (cached version of {uri})" - else: - desired_uri = uri - msg = "" - with time_this(log, msg="Reading bytes from %s%s", args=(desired_uri, msg)): - serializedDataset = desired_uri.read() - log.debug( - "Deserializing %s from %d bytes from location %s with formatter %s", - f"component {getInfo.component}" if isComponent else "", - len(serializedDataset), - uri, - formatter.name(), - ) - try: - result = formatter.fromBytes( - serializedDataset, component=getInfo.component if isComponent else None - ) - except Exception as e: - raise ValueError( - f"Failure from formatter '{formatter.name()}' for dataset {ref.id}" - f" ({ref.datasetType.name} from {uri}): {e}" - ) from e - else: - # Read from file. - - # Have to update the Location associated with the formatter - # because formatter.read does not allow an override. - # This could be improved. - location_updated = False - msg = "" - - # First check in cache for local version. - # The cache will only be relevant for remote resources but - # no harm in always asking. Context manager ensures that cache - # file is not deleted during cache expiration. - with self.cacheManager.find_in_cache(cache_ref, uri.getExtension()) as cached_file: - if cached_file is not None: - msg = f"(via cache read of remote file {uri})" - uri = cached_file - location_updated = True - - with uri.as_local() as local_uri: - can_be_cached = False - if uri != local_uri: - # URI was remote and file was downloaded - cache_msg = "" - location_updated = True - - if self.cacheManager.should_be_cached(cache_ref): - # In this scenario we want to ask if the downloaded - # file should be cached but we should not cache - # it until after we've used it (to ensure it can't - # be expired whilst we are using it). - can_be_cached = True - - # Say that it is "likely" to be cached because - # if the formatter read fails we will not be - # caching this file. - cache_msg = " and likely cached" - - msg = f"(via download to local file{cache_msg})" - - # Calculate the (possibly) new location for the formatter - # to use. - newLocation = Location(*local_uri.split()) if location_updated else None - - log.debug( - "Reading%s from location %s %s with formatter %s", - f" component {getInfo.component}" if isComponent else "", - uri, - msg, - formatter.name(), - ) - try: - with ( - formatter._updateLocation(newLocation), - time_this( - log, - msg="Reading%s from location %s %s with formatter %s", - args=( - f" component {getInfo.component}" if isComponent else "", - uri, - msg, - formatter.name(), - ), - ), - ): - result = formatter.read(component=getInfo.component if isComponent else None) - except Exception as e: - raise ValueError( - f"Failure from formatter '{formatter.name()}' for dataset {ref.id}" - f" ({ref.datasetType.name} from {uri}): {e}" - ) from e - - # File was read successfully so can move to cache - if can_be_cached: - self.cacheManager.move_to_cache(local_uri, cache_ref) - - return self._post_process_get( - result, ref.datasetType.storageClass, getInfo.assemblerParams, isComponent=isComponent - ) - def knows(self, ref: DatasetRef) -> bool: """Check if the dataset is known to the datastore. @@ -2228,189 +1989,26 @@ def get( # type conversion. if storageClass is not None: ref = ref.overrideStorageClass(storageClass) - refStorageClass = ref.datasetType.storageClass - - allGetInfo = self._prepare_for_get(ref, parameters) - refComponent = ref.datasetType.component() - - # Create mapping from component name to related info - allComponents = {i.component: i for i in allGetInfo} - - # By definition the dataset is disassembled if we have more - # than one record for it. - isDisassembled = len(allGetInfo) > 1 - - # Look for the special case where we are disassembled but the - # component is a derived component that was not written during - # disassembly. For this scenario we need to check that the - # component requested is listed as a derived component for the - # composite storage class - isDisassembledReadOnlyComponent = False - if isDisassembled and refComponent: - # The composite storage class should be accessible through - # the component dataset type - compositeStorageClass = ref.datasetType.parentStorageClass - - # In the unlikely scenario where the composite storage - # class is not known, we can only assume that this is a - # normal component. If that assumption is wrong then the - # branch below that reads a persisted component will fail - # so there is no need to complain here. - if compositeStorageClass is not None: - isDisassembledReadOnlyComponent = refComponent in compositeStorageClass.derivedComponents - - if isDisassembled and not refComponent: - # This was a disassembled dataset spread over multiple files - # and we need to put them all back together again. - # Read into memory and then assemble - - # Check that the supplied parameters are suitable for the type read - refStorageClass.validateParameters(parameters) - - # We want to keep track of all the parameters that were not used - # by formatters. We assume that if any of the component formatters - # use a parameter that we do not need to apply it again in the - # assembler. - usedParams = set() - - components: dict[str, Any] = {} - for getInfo in allGetInfo: - # assemblerParams are parameters not understood by the - # associated formatter. - usedParams.update(set(getInfo.formatterParams)) - - component = getInfo.component - - if component is None: - raise RuntimeError(f"Internal error in datastore assembly of {ref}") - - # We do not want the formatter to think it's reading - # a component though because it is really reading a - # standalone dataset -- always tell reader it is not a - # component. - components[component] = self._read_artifact_into_memory( - getInfo, ref.makeComponentRef(component), isComponent=False - ) - - inMemoryDataset = ref.datasetType.storageClass.delegate().assemble(components) - # Any unused parameters will have to be passed to the assembler - if parameters: - unusedParams = {k: v for k, v in parameters.items() if k not in usedParams} - else: - unusedParams = {} - - # Process parameters - return ref.datasetType.storageClass.delegate().handleParameters( - inMemoryDataset, parameters=unusedParams - ) - - elif isDisassembledReadOnlyComponent: - compositeStorageClass = ref.datasetType.parentStorageClass - if compositeStorageClass is None: - raise RuntimeError( - f"Unable to retrieve derived component '{refComponent}' since" - "no composite storage class is available." - ) - - if refComponent is None: - # Mainly for mypy - raise RuntimeError(f"Internal error in datastore {self.name}: component can not be None here") - - # Assume that every derived component can be calculated by - # forwarding the request to a single read/write component. - # Rather than guessing which rw component is the right one by - # scanning each for a derived component of the same name, - # we ask the storage class delegate directly which one is best to - # use. - compositeDelegate = compositeStorageClass.delegate() - forwardedComponent = compositeDelegate.selectResponsibleComponent( - refComponent, set(allComponents) - ) + allGetInfo = self._prepare_for_direct_get(ref, parameters) + return get_dataset_as_python_object_from_get_info( + allGetInfo, ref=ref, parameters=parameters, cache_manager=self.cacheManager + ) - # Select the relevant component - rwInfo = allComponents[forwardedComponent] - - # For now assume that read parameters are validated against - # the real component and not the requested component - forwardedStorageClass = rwInfo.formatter.fileDescriptor.readStorageClass - forwardedStorageClass.validateParameters(parameters) - - # The reference to use for the caching must refer to the forwarded - # component and not the derived component. - cache_ref = ref.makeCompositeRef().makeComponentRef(forwardedComponent) - - # Unfortunately the FileDescriptor inside the formatter will have - # the wrong write storage class so we need to create a new one - # given the immutability constraint. - writeStorageClass = rwInfo.info.storageClass - - # We may need to put some thought into parameters for read - # components but for now forward them on as is - readFormatter = type(rwInfo.formatter)( - FileDescriptor( - rwInfo.location, - readStorageClass=refStorageClass, - storageClass=writeStorageClass, - parameters=parameters, - ), - ref.dataId, - ) + def prepare_get_for_external_client(self, ref: DatasetRef) -> FileDatastoreGetPayload: + # Docstring inherited - # The assembler can not receive any parameter requests for a - # derived component at this time since the assembler will - # see the storage class of the derived component and those - # parameters will have to be handled by the formatter on the - # forwarded storage class. - assemblerParams: dict[str, Any] = {} - - # Need to created a new info that specifies the derived - # component and associated storage class - readInfo = DatastoreFileGetInformation( - rwInfo.location, - readFormatter, - rwInfo.info, - assemblerParams, - {}, - refComponent, - refStorageClass, + def to_file_info_payload(info: DatasetLocationInformation) -> FileDatastoreGetPayloadFileInfo: + location, file_info = info + return FileDatastoreGetPayloadFileInfo( + url=location.uri.geturl(), datastoreRecords=file_info.to_simple() ) - return self._read_artifact_into_memory(readInfo, ref, isComponent=True, cache_ref=cache_ref) - - else: - # Single file request or component from that composite file - for lookup in (refComponent, None): - if lookup in allComponents: - getInfo = allComponents[lookup] - break - else: - raise FileNotFoundError( - f"Component {refComponent} not found for ref {ref} in datastore {self.name}" - ) - - # Do not need the component itself if already disassembled - if isDisassembled: - isComponent = False - else: - isComponent = getInfo.component is not None - - # For a component read of a composite we want the cache to - # be looking at the composite ref itself. - cache_ref = ref.makeCompositeRef() if isComponent else ref - - # For a disassembled component we can validate parameters against - # the component storage class directly - if isDisassembled: - refStorageClass.validateParameters(parameters) - else: - # For an assembled composite this could be a derived - # component derived from a real component. The validity - # of the parameters is not clear. For now validate against - # the composite storage class - getInfo.formatter.fileDescriptor.storageClass.validateParameters(parameters) - - return self._read_artifact_into_memory(getInfo, ref, isComponent=isComponent, cache_ref=cache_ref) + return FileDatastoreGetPayload( + datastore_type="file", + dataset_ref=ref.to_simple(), + file_info=[to_file_info_payload(info) for info in self._get_dataset_locations_info(ref)], + ) @transactional def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: diff --git a/python/lsst/daf/butler/datastores/fileDatastoreClient.py b/python/lsst/daf/butler/datastores/fileDatastoreClient.py new file mode 100644 index 0000000000..9eb09b52be --- /dev/null +++ b/python/lsst/daf/butler/datastores/fileDatastoreClient.py @@ -0,0 +1,94 @@ +__all__ = ("get_dataset_as_python_object", "FileDatastoreGetPayload") + +from typing import Any, Literal + +from lsst.daf.butler import DatasetRef, DimensionUniverse, Location, SerializedDatasetRef, StorageClass +from lsst.daf.butler._compat import _BaseModelCompat +from lsst.daf.butler.datastore.cache_manager import DatastoreDisabledCacheManager +from lsst.daf.butler.datastore.stored_file_info import SerializedStoredFileInfo, StoredFileInfo +from lsst.daf.butler.datastores.file_datastore.get import ( + DatasetLocationInformation, + Mapping, + generate_datastore_get_information, + get_dataset_as_python_object_from_get_info, +) + + +class FileDatastoreGetPayloadFileInfo(_BaseModelCompat): + """Information required to read a single file stored in `FileDatastore`""" + + # TODO DM-41879: Allowing arbitrary URLs here is a severe security issue, + # since it allows the server to trick the client into fetching data from + # any file on its local filesystem or from remote storage using credentials + # laying around in the environment. This should be restricted to only + # HTTP, but we don't yet have a means of mocking out HTTP gets in tests. + url: str + """An absolute URL that can be used to read the file""" + + datastoreRecords: SerializedStoredFileInfo + """`FileDatastore` metadata records for this file""" + + +class FileDatastoreGetPayload(_BaseModelCompat): + """A serializable representation of the data needed for retrieving an + artifact and converting it to a python object. + """ + + datastore_type: Literal["file"] + + file_info: list[FileDatastoreGetPayloadFileInfo] + """List of retrieval information for each file associated with this + artifact + """ + + dataset_ref: SerializedDatasetRef + """Registry information associated with this artifact""" + + +def get_dataset_as_python_object( + payload: FileDatastoreGetPayload, + *, + universe: DimensionUniverse, + parameters: Mapping[str, Any] | None, + storageClass: StorageClass | str | None, +) -> Any: + """Retrieve an artifact from storage and return it as a Python object + + Parameters + ---------- + payload : `FileDatastoreGetPayload` + Pre-processed information about each file associated with this + artifact. + universe: `DimensionUniverse` + The universe of dimensions associated with the `DatasetRef` contained + in ``payload``. + parameters : `Mapping`[`str`, `typing.Any`] + `StorageClass` and `Formatter` parameters to be used when converting + the artifact to a Python object + storageClass: `StorageClass` | `str` | `None` + Overrides the `StorageClass` to be used when converting the artifact to + a Python object. If `None`, uses the `StorageClass` specified by + `payload`. + + Returns + ------- + python_object: `Any` + The retrieved artifact, converted to a Python object + """ + fileLocations: list[DatasetLocationInformation] = [ + (Location(None, file_info.url), StoredFileInfo.from_simple(file_info.datastoreRecords)) + for file_info in payload.file_info + ] + + ref = DatasetRef.from_simple(payload.dataset_ref, universe=universe) + if storageClass is not None: + ref = ref.overrideStorageClass(storageClass) + + datastore_file_info = generate_datastore_get_information( + fileLocations, + ref=ref, + parameters=parameters, + ) + return get_dataset_as_python_object_from_get_info( + datastore_file_info, ref=ref, parameters=parameters, cache_manager=DatastoreDisabledCacheManager() + ) diff --git a/python/lsst/daf/butler/datastores/file_datastore/__init__.py b/python/lsst/daf/butler/datastores/file_datastore/__init__.py new file mode 100644 index 0000000000..36d1294e36 --- /dev/null +++ b/python/lsst/daf/butler/datastores/file_datastore/__init__.py @@ -0,0 +1,26 @@ +# This file is part of daf_butler. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . diff --git a/python/lsst/daf/butler/datastores/file_datastore/get.py b/python/lsst/daf/butler/datastores/file_datastore/get.py new file mode 100644 index 0000000000..c10f1914f4 --- /dev/null +++ b/python/lsst/daf/butler/datastores/file_datastore/get.py @@ -0,0 +1,538 @@ +# This file is part of daf_butler. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__all__ = ( + "DatastoreFileGetInformation", + "DatasetLocationInformation", + "generate_datastore_get_information", + "get_dataset_as_python_object_from_get_info", +) + +from collections.abc import Mapping +from dataclasses import dataclass +from typing import Any, TypeAlias + +from lsst.daf.butler import DatasetRef, FileDescriptor, Formatter, Location, StorageClass +from lsst.daf.butler.datastore.cache_manager import AbstractDatastoreCacheManager +from lsst.daf.butler.datastore.generic_base import post_process_get +from lsst.daf.butler.datastore.stored_file_info import StoredFileInfo +from lsst.utils.introspection import get_instance_of +from lsst.utils.logging import getLogger +from lsst.utils.timer import time_this + +log = getLogger(__name__) + +DatasetLocationInformation: TypeAlias = tuple[Location, StoredFileInfo] + + +@dataclass(frozen=True) +class DatastoreFileGetInformation: + """Collection of useful parameters needed to retrieve a file from + a Datastore. + """ + + location: Location + """The location from which to read the dataset.""" + + formatter: Formatter + """The `Formatter` to use to deserialize the dataset.""" + + info: StoredFileInfo + """Stored information about this file and its formatter.""" + + assemblerParams: Mapping[str, Any] + """Parameters to use for post-processing the retrieved dataset.""" + + formatterParams: Mapping[str, Any] + """Parameters that were understood by the associated formatter.""" + + component: str | None + """The component to be retrieved (can be `None`).""" + + readStorageClass: StorageClass + """The `StorageClass` of the dataset being read.""" + + +def generate_datastore_get_information( + fileLocations: list[DatasetLocationInformation], + *, + ref: DatasetRef, + parameters: Mapping[str, Any] | None, + readStorageClass: StorageClass | None = None, +) -> list[DatastoreFileGetInformation]: + """Process parameters and instantiate formatters for in preparation for + retrieving an artifact and converting it to a Python object + + Parameters + ---------- + fileLocations : `list`[`DatasetLocationInformation`] + List of file locations for this artifact and their associated datastore + records + ref : `DatasetRef` + The registry information associated with this artifact. + parameters : `Mapping`[`str`, `Any`] + `StorageClass` and `Formatter` parameters + readStorageClass : `StorageClass` | `None`, optional + The StorageClass to use when ultimately returning the resulting object + from the get. Defaults to the `StorageClass` specified by `ref` + + Returns + ------- + getInfo : `list` [`DatastoreFileGetInformation`] + The parameters needed to retrieve each file + """ + if readStorageClass is None: + readStorageClass = ref.datasetType.storageClass + + # Is this a component request? + refComponent = ref.datasetType.component() + + disassembled = len(fileLocations) > 1 + fileGetInfo = [] + for location, storedFileInfo in fileLocations: + # The storage class used to write the file + writeStorageClass = storedFileInfo.storageClass + + # If this has been disassembled we need read to match the write + if disassembled: + readStorageClass = writeStorageClass + + formatter = get_instance_of( + storedFileInfo.formatter, + FileDescriptor( + location, + readStorageClass=readStorageClass, + storageClass=writeStorageClass, + parameters=parameters, + ), + ref.dataId, + ) + + formatterParams, notFormatterParams = formatter.segregateParameters() + + # Of the remaining parameters, extract the ones supported by + # this StorageClass (for components not all will be handled) + assemblerParams = readStorageClass.filterParameters(notFormatterParams) + + # The ref itself could be a component if the dataset was + # disassembled by butler, or we disassembled in datastore and + # components came from the datastore records + component = storedFileInfo.component if storedFileInfo.component else refComponent + + fileGetInfo.append( + DatastoreFileGetInformation( + location, + formatter, + storedFileInfo, + assemblerParams, + formatterParams, + component, + readStorageClass, + ) + ) + + return fileGetInfo + + +def _read_artifact_into_memory( + getInfo: DatastoreFileGetInformation, + ref: DatasetRef, + cache_manager: AbstractDatastoreCacheManager, + isComponent: bool = False, + cache_ref: DatasetRef | None = None, +) -> Any: + """Read the artifact from datastore into in memory object. + + Parameters + ---------- + getInfo : `DatastoreFileGetInformation` + Information about the artifact within the datastore. + ref : `DatasetRef` + The registry information associated with this artifact. + isComponent : `bool` + Flag to indicate if a component is being read from this artifact. + cache_manager: `AbstractDatastoreCacheManager` + The cache manager to use for caching retrieved files + cache_ref : `DatasetRef`, optional + The DatasetRef to use when looking up the file in the cache. + This ref must have the same ID as the supplied ref but can + be a parent ref or component ref to indicate to the cache whether + a composite file is being requested from the cache or a component + file. Without this the cache will default to the supplied ref but + it can get confused with read-only derived components for + disassembled composites. + + Returns + ------- + inMemoryDataset : `object` + The artifact as a python object. + """ + location = getInfo.location + uri = location.uri + log.debug("Accessing data from %s", uri) + + if cache_ref is None: + cache_ref = ref + if cache_ref.id != ref.id: + raise ValueError( + "The supplied cache dataset ref refers to a different dataset than expected:" + f" {ref.id} != {cache_ref.id}" + ) + + # Cannot recalculate checksum but can compare size as a quick check + # Do not do this if the size is negative since that indicates + # we do not know. + recorded_size = getInfo.info.file_size + resource_size = uri.size() + if recorded_size >= 0 and resource_size != recorded_size: + raise RuntimeError( + "Integrity failure in Datastore. " + f"Size of file {uri} ({resource_size}) " + f"does not match size recorded in registry of {recorded_size}" + ) + + # For the general case we have choices for how to proceed. + # 1. Always use a local file (downloading the remote resource to a + # temporary file if needed). + # 2. Use a threshold size and read into memory and use bytes. + # Use both for now with an arbitrary hand off size. + # This allows small datasets to be downloaded from remote object + # stores without requiring a temporary file. + + formatter = getInfo.formatter + nbytes_max = 10_000_000 # Arbitrary number that we can tune + if resource_size <= nbytes_max and formatter.can_read_bytes(): + with cache_manager.find_in_cache(cache_ref, uri.getExtension()) as cached_file: + if cached_file is not None: + desired_uri = cached_file + msg = f" (cached version of {uri})" + else: + desired_uri = uri + msg = "" + with time_this(log, msg="Reading bytes from %s%s", args=(desired_uri, msg)): + serializedDataset = desired_uri.read() + log.debug( + "Deserializing %s from %d bytes from location %s with formatter %s", + f"component {getInfo.component}" if isComponent else "", + len(serializedDataset), + uri, + formatter.name(), + ) + try: + result = formatter.fromBytes( + serializedDataset, component=getInfo.component if isComponent else None + ) + except Exception as e: + raise ValueError( + f"Failure from formatter '{formatter.name()}' for dataset {ref.id}" + f" ({ref.datasetType.name} from {uri}): {e}" + ) from e + else: + # Read from file. + + # Have to update the Location associated with the formatter + # because formatter.read does not allow an override. + # This could be improved. + location_updated = False + msg = "" + + # First check in cache for local version. + # The cache will only be relevant for remote resources but + # no harm in always asking. Context manager ensures that cache + # file is not deleted during cache expiration. + with cache_manager.find_in_cache(cache_ref, uri.getExtension()) as cached_file: + if cached_file is not None: + msg = f"(via cache read of remote file {uri})" + uri = cached_file + location_updated = True + + with uri.as_local() as local_uri: + can_be_cached = False + if uri != local_uri: + # URI was remote and file was downloaded + cache_msg = "" + location_updated = True + + if cache_manager.should_be_cached(cache_ref): + # In this scenario we want to ask if the downloaded + # file should be cached but we should not cache + # it until after we've used it (to ensure it can't + # be expired whilst we are using it). + can_be_cached = True + + # Say that it is "likely" to be cached because + # if the formatter read fails we will not be + # caching this file. + cache_msg = " and likely cached" + + msg = f"(via download to local file{cache_msg})" + + # Calculate the (possibly) new location for the formatter + # to use. + newLocation = Location(*local_uri.split()) if location_updated else None + + log.debug( + "Reading%s from location %s %s with formatter %s", + f" component {getInfo.component}" if isComponent else "", + uri, + msg, + formatter.name(), + ) + try: + with ( + formatter._updateLocation(newLocation), + time_this( + log, + msg="Reading%s from location %s %s with formatter %s", + args=( + f" component {getInfo.component}" if isComponent else "", + uri, + msg, + formatter.name(), + ), + ), + ): + result = formatter.read(component=getInfo.component if isComponent else None) + except Exception as e: + raise ValueError( + f"Failure from formatter '{formatter.name()}' for dataset {ref.id}" + f" ({ref.datasetType.name} from {uri}): {e}" + ) from e + + # File was read successfully so can move to cache + if can_be_cached: + cache_manager.move_to_cache(local_uri, cache_ref) + + return post_process_get( + result, ref.datasetType.storageClass, getInfo.assemblerParams, isComponent=isComponent + ) + + +def get_dataset_as_python_object_from_get_info( + allGetInfo: list[DatastoreFileGetInformation], + *, + ref: DatasetRef, + parameters: Mapping[str, Any] | None, + cache_manager: AbstractDatastoreCacheManager, +) -> Any: + """Retrieve an artifact from storage and return it as a Python object + + Parameters + ---------- + allGetInfo : `list`[`DatastoreFileGetInformation`] + Pre-processed information about each file associated with this artifact + ref : `DatasetRef` + The registry information associated with this artifact. + parameters : `Mapping`[`str`, `Any`] + `StorageClass` and `Formatter` parameters + cache_manager : `AbstractDatastoreCacheManager` + The cache manager to use for caching retrieved files + + Returns + ------- + python_object : `typing.Any` + The retrieved artifact, converted to a Python object according to the + `StorageClass` specified in ``ref``. + """ + refStorageClass = ref.datasetType.storageClass + refComponent = ref.datasetType.component() + # Create mapping from component name to related info + allComponents = {i.component: i for i in allGetInfo} + + # By definition the dataset is disassembled if we have more + # than one record for it. + isDisassembled = len(allGetInfo) > 1 + + # Look for the special case where we are disassembled but the + # component is a derived component that was not written during + # disassembly. For this scenario we need to check that the + # component requested is listed as a derived component for the + # composite storage class + isDisassembledReadOnlyComponent = False + if isDisassembled and refComponent: + # The composite storage class should be accessible through + # the component dataset type + compositeStorageClass = ref.datasetType.parentStorageClass + + # In the unlikely scenario where the composite storage + # class is not known, we can only assume that this is a + # normal component. If that assumption is wrong then the + # branch below that reads a persisted component will fail + # so there is no need to complain here. + if compositeStorageClass is not None: + isDisassembledReadOnlyComponent = refComponent in compositeStorageClass.derivedComponents + + if isDisassembled and not refComponent: + # This was a disassembled dataset spread over multiple files + # and we need to put them all back together again. + # Read into memory and then assemble + + # Check that the supplied parameters are suitable for the type read + refStorageClass.validateParameters(parameters) + + # We want to keep track of all the parameters that were not used + # by formatters. We assume that if any of the component formatters + # use a parameter that we do not need to apply it again in the + # assembler. + usedParams = set() + + components: dict[str, Any] = {} + for getInfo in allGetInfo: + # assemblerParams are parameters not understood by the + # associated formatter. + usedParams.update(set(getInfo.formatterParams)) + + component = getInfo.component + + if component is None: + raise RuntimeError(f"Internal error in datastore assembly of {ref}") + + # We do not want the formatter to think it's reading + # a component though because it is really reading a + # standalone dataset -- always tell reader it is not a + # component. + components[component] = _read_artifact_into_memory( + getInfo, ref.makeComponentRef(component), cache_manager, isComponent=False + ) + + inMemoryDataset = ref.datasetType.storageClass.delegate().assemble(components) + + # Any unused parameters will have to be passed to the assembler + if parameters: + unusedParams = {k: v for k, v in parameters.items() if k not in usedParams} + else: + unusedParams = {} + + # Process parameters + return ref.datasetType.storageClass.delegate().handleParameters( + inMemoryDataset, parameters=unusedParams + ) + + elif isDisassembledReadOnlyComponent: + compositeStorageClass = ref.datasetType.parentStorageClass + if compositeStorageClass is None: + raise RuntimeError( + f"Unable to retrieve derived component '{refComponent}' since" + "no composite storage class is available." + ) + + if refComponent is None: + # Mainly for mypy + raise RuntimeError("Internal error in datastore: component can not be None here") + + # Assume that every derived component can be calculated by + # forwarding the request to a single read/write component. + # Rather than guessing which rw component is the right one by + # scanning each for a derived component of the same name, + # we ask the storage class delegate directly which one is best to + # use. + compositeDelegate = compositeStorageClass.delegate() + forwardedComponent = compositeDelegate.selectResponsibleComponent(refComponent, set(allComponents)) + + # Select the relevant component + rwInfo = allComponents[forwardedComponent] + + # For now assume that read parameters are validated against + # the real component and not the requested component + forwardedStorageClass = rwInfo.formatter.fileDescriptor.readStorageClass + forwardedStorageClass.validateParameters(parameters) + + # The reference to use for the caching must refer to the forwarded + # component and not the derived component. + cache_ref = ref.makeCompositeRef().makeComponentRef(forwardedComponent) + + # Unfortunately the FileDescriptor inside the formatter will have + # the wrong write storage class so we need to create a new one + # given the immutability constraint. + writeStorageClass = rwInfo.info.storageClass + + # We may need to put some thought into parameters for read + # components but for now forward them on as is + readFormatter = type(rwInfo.formatter)( + FileDescriptor( + rwInfo.location, + readStorageClass=refStorageClass, + storageClass=writeStorageClass, + parameters=parameters, + ), + ref.dataId, + ) + + # The assembler can not receive any parameter requests for a + # derived component at this time since the assembler will + # see the storage class of the derived component and those + # parameters will have to be handled by the formatter on the + # forwarded storage class. + assemblerParams: dict[str, Any] = {} + + # Need to created a new info that specifies the derived + # component and associated storage class + readInfo = DatastoreFileGetInformation( + rwInfo.location, + readFormatter, + rwInfo.info, + assemblerParams, + {}, + refComponent, + refStorageClass, + ) + + return _read_artifact_into_memory(readInfo, ref, cache_manager, isComponent=True, cache_ref=cache_ref) + + else: + # Single file request or component from that composite file + for lookup in (refComponent, None): + if lookup in allComponents: + getInfo = allComponents[lookup] + break + else: + raise FileNotFoundError(f"Component {refComponent} not found for ref {ref} in datastore") + + # Do not need the component itself if already disassembled + if isDisassembled: + isComponent = False + else: + isComponent = getInfo.component is not None + + # For a component read of a composite we want the cache to + # be looking at the composite ref itself. + cache_ref = ref.makeCompositeRef() if isComponent else ref + + # For a disassembled component we can validate parameters against + # the component storage class directly + if isDisassembled: + refStorageClass.validateParameters(parameters) + else: + # For an assembled composite this could be a derived + # component derived from a real component. The validity + # of the parameters is not clear. For now validate against + # the composite storage class + getInfo.formatter.fileDescriptor.storageClass.validateParameters(parameters) + + return _read_artifact_into_memory( + getInfo, ref, cache_manager, isComponent=isComponent, cache_ref=cache_ref + ) diff --git a/python/lsst/daf/butler/datastores/inMemoryDatastore.py b/python/lsst/daf/butler/datastores/inMemoryDatastore.py index 33b65d71a0..aac3a681df 100644 --- a/python/lsst/daf/butler/datastores/inMemoryDatastore.py +++ b/python/lsst/daf/butler/datastores/inMemoryDatastore.py @@ -40,7 +40,7 @@ from lsst.daf.butler import DatasetId, DatasetRef, StorageClass from lsst.daf.butler.datastore import DatasetRefURIs -from lsst.daf.butler.datastore.generic_base import GenericBaseDatastore +from lsst.daf.butler.datastore.generic_base import GenericBaseDatastore, post_process_get from lsst.daf.butler.datastore.record_data import DatastoreRecordData from lsst.daf.butler.datastore.stored_file_info import StoredDatastoreItemInfo from lsst.daf.butler.utils import transactional @@ -349,7 +349,7 @@ def get( # Since there is no formatter to process parameters, they all must be # passed to the assembler. - inMemoryDataset = self._post_process_get( + inMemoryDataset = post_process_get( inMemoryDataset, refStorageClass, parameters, isComponent=component is not None ) diff --git a/python/lsst/daf/butler/direct_butler.py b/python/lsst/daf/butler/direct_butler.py index ab857d9fbd..9b20102c81 100644 --- a/python/lsst/daf/butler/direct_butler.py +++ b/python/lsst/daf/butler/direct_butler.py @@ -1337,6 +1337,7 @@ def get_dataset_type(self, name: str) -> DatasetType: def get_dataset( self, id: DatasetId, + *, storage_class: str | StorageClass | None = None, dimension_records: bool = False, datastore_records: bool = False, diff --git a/python/lsst/daf/butler/remote_butler/_remote_butler.py b/python/lsst/daf/butler/remote_butler/_remote_butler.py index 53a28bc631..2d0c451e9b 100644 --- a/python/lsst/daf/butler/remote_butler/_remote_butler.py +++ b/python/lsst/daf/butler/remote_butler/_remote_butler.py @@ -31,17 +31,20 @@ from collections.abc import Collection, Iterable, Mapping, Sequence from contextlib import AbstractContextManager -from typing import TYPE_CHECKING, Any, TextIO +from typing import TYPE_CHECKING, Any, TextIO, Type, TypeVar import httpx from lsst.daf.butler import __version__ +from lsst.daf.butler.datastores.fileDatastoreClient import get_dataset_as_python_object from lsst.daf.butler.repo_relocation import replaceRoot from lsst.resources import ResourcePath, ResourcePathExpression from lsst.utils.introspection import get_full_type_name from .._butler import Butler from .._butler_config import ButlerConfig -from .._dataset_ref import DatasetRef, SerializedDatasetRef +from .._compat import _BaseModelCompat +from .._config import Config +from .._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef, SerializedDatasetRef from .._dataset_type import DatasetType, SerializedDatasetType from .._storage_class import StorageClass from ..dimensions import DataCoordinate, DimensionConfig, DimensionUniverse, SerializedDataCoordinate @@ -49,12 +52,10 @@ from ..registry.wildcards import CollectionWildcard from ._authentication import get_authentication_headers, get_authentication_token_from_environment from ._config import RemoteButlerConfigModel -from .server_models import FindDatasetModel +from .server_models import FindDatasetModel, GetFileRequestModel, GetFileResponseModel if TYPE_CHECKING: - from .._config import Config from .._dataset_existence import DatasetExistence - from .._dataset_ref import DatasetId, DatasetIdGenEnum from .._deferredDatasetHandle import DeferredDatasetHandle from .._file_dataset import FileDataset from .._limited_butler import LimitedButler @@ -66,6 +67,9 @@ from ..transfers import RepoExportContext +_AnyPydanticModel = TypeVar("_AnyPydanticModel", bound=_BaseModelCompat) + + class RemoteButler(Butler): def __init__( self, @@ -156,14 +160,9 @@ def _simplify_dataId( if isinstance(dataId, DataCoordinate): return dataId.to_simple() - if dataId is None: - data_id = kwargs - elif kwargs: - # Change variable because DataId is immutable and mypy complains. - data_id = dict(dataId) - data_id.update(kwargs) - # Assume we can treat it as a dict. + data_id = dict(dataId) if dataId is not None else {} + data_id.update(kwargs) return SerializedDataCoordinate(dataId=data_id) def _caching_context(self) -> AbstractContextManager[None]: @@ -217,7 +216,20 @@ def get( **kwargs: Any, ) -> Any: # Docstring inherited. - raise NotImplementedError() + if not isinstance(datasetRefOrType, DatasetRef): + raise NotImplementedError("RemoteButler currently only supports get() of a DatasetRef") + + dataset_id = datasetRefOrType.id + request = GetFileRequestModel(dataset_id=dataset_id) + response = self._post("get_file", request) + if response.status_code == 404: + raise LookupError(f"Dataset not found with ID ${dataset_id}") + response.raise_for_status() + model = self._parse_model(response, GetFileResponseModel) + + return get_dataset_as_python_object( + model, parameters=parameters, storageClass=storageClass, universe=self.dimensions + ) def getURIs( self, @@ -262,6 +274,7 @@ def get_dataset_type(self, name: str) -> DatasetType: def get_dataset( self, id: DatasetId, + *, storage_class: str | StorageClass | None = None, dimension_records: bool = False, datastore_records: bool = False, @@ -328,12 +341,12 @@ def find_dataset( ) path = f"find_dataset/{dataset_type}" - response = self._client.post( - self._get_url(path), json=query.model_dump(mode="json", exclude_unset=True, exclude_defaults=True) - ) + response = self._post(path, query) response.raise_for_status() - return DatasetRef.from_simple(SerializedDatasetRef(**response.json()), universe=self.dimensions) + return DatasetRef.from_simple( + self._parse_model(response, SerializedDatasetRef), universe=self.dimensions + ) def retrieveArtifacts( self, @@ -530,3 +543,11 @@ def _get_url(self, path: str, version: str = "v1") -> str: The full path to the endpoint. """ return f"{version}/{path}" + + def _post(self, path: str, model: _BaseModelCompat) -> httpx.Response: + json = model.model_dump_json(exclude_unset=True).encode("utf-8") + url = self._get_url(path) + return self._client.post(url, content=json, headers={"content-type": "application/json"}) + + def _parse_model(self, response: httpx.Response, model: Type[_AnyPydanticModel]) -> _AnyPydanticModel: + return model.model_validate_json(response.content) diff --git a/python/lsst/daf/butler/remote_butler/server/_exceptions.py b/python/lsst/daf/butler/remote_butler/server/_exceptions.py new file mode 100644 index 0000000000..2514cd9046 --- /dev/null +++ b/python/lsst/daf/butler/remote_butler/server/_exceptions.py @@ -0,0 +1,33 @@ +# This file is part of daf_butler. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from fastapi import HTTPException + + +class NotFoundException(HTTPException): + def __init__(self, message: str = "Not found"): + super().__init__(status_code=404, detail=message) diff --git a/python/lsst/daf/butler/remote_butler/server/handlers/_external.py b/python/lsst/daf/butler/remote_butler/server/handlers/_external.py index 7b36133008..a765e55c1e 100644 --- a/python/lsst/daf/butler/remote_butler/server/handlers/_external.py +++ b/python/lsst/daf/butler/remote_butler/server/handlers/_external.py @@ -32,9 +32,14 @@ from fastapi import APIRouter, Depends from lsst.daf.butler import SerializedDatasetRef, SerializedDatasetType -from lsst.daf.butler.remote_butler.server_models import FindDatasetModel +from lsst.daf.butler.remote_butler.server_models import ( + FindDatasetModel, + GetFileRequestModel, + GetFileResponseModel, +) from .._dependencies import factory_dependency +from .._exceptions import NotFoundException from .._factory import Factory external_router = APIRouter() @@ -152,3 +157,16 @@ def find_dataset( **data_id, ) return ref.to_simple() if ref else None + + +@external_router.post("/v1/get_file") +def get_file( + request: GetFileRequestModel, + factory: Factory = Depends(factory_dependency), +) -> GetFileResponseModel: + butler = factory.create_butler() + ref = butler.get_dataset(request.dataset_id, datastore_records=True) + if ref is None: + raise NotFoundException(f"Dataset ID {request.dataset_id} not found") + payload = butler._datastore.prepare_get_for_external_client(ref) + return GetFileResponseModel.model_validate(payload) diff --git a/python/lsst/daf/butler/remote_butler/server_models.py b/python/lsst/daf/butler/remote_butler/server_models.py index 1b4d59b5c1..5b9fa957c6 100644 --- a/python/lsst/daf/butler/remote_butler/server_models.py +++ b/python/lsst/daf/butler/remote_butler/server_models.py @@ -27,9 +27,10 @@ """Models used for client/server communication.""" -__all__ = ["FindDatasetModel"] +__all__ = ["FindDatasetModel", "GetFileResponseModel"] -from lsst.daf.butler import SerializedDataCoordinate +from lsst.daf.butler import DatasetId, SerializedDataCoordinate +from lsst.daf.butler.datastores.fileDatastoreClient import FileDatastoreGetPayload from .._compat import _BaseModelCompat @@ -42,3 +43,10 @@ class FindDatasetModel(_BaseModelCompat): storage_class: str | None dimension_records: bool = False datastore_records: bool = False + + +class GetFileRequestModel(_BaseModelCompat): + dataset_id: DatasetId + + +GetFileResponseModel = FileDatastoreGetPayload diff --git a/python/lsst/daf/butler/tests/utils.py b/python/lsst/daf/butler/tests/utils.py index 84e5d1a650..df4238c254 100644 --- a/python/lsst/daf/butler/tests/utils.py +++ b/python/lsst/daf/butler/tests/utils.py @@ -223,11 +223,16 @@ class MetricTestRepo: The path to the config file, to pass to ``Butler.makeRepo``. """ + METRICS_EXAMPLE_SUMMARY = {"AM1": 5.2, "AM2": 30.6} + """The summary data included in ``MetricsExample`` objects stored in the + test repo + """ + @staticmethod def _makeExampleMetrics() -> MetricsExample: """Make an object to put into the repository.""" return MetricsExample( - {"AM1": 5.2, "AM2": 30.6}, + MetricTestRepo.METRICS_EXAMPLE_SUMMARY, {"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}}, [563, 234, 456.7, 752, 8, 9, 27], ) diff --git a/tests/test_server.py b/tests/test_server.py index c050b97320..8430e91202 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -43,7 +43,7 @@ from lsst.daf.butler import Butler, DataCoordinate, DatasetRef, MissingDatasetTypeError, StorageClassFactory from lsst.daf.butler.tests import DatastoreMock -from lsst.daf.butler.tests.utils import MetricTestRepo, makeTestTempDir, removeTestTempDir +from lsst.daf.butler.tests.utils import MetricsExample, MetricTestRepo, makeTestTempDir, removeTestTempDir from lsst.resources.http import HttpResourcePath TESTDIR = os.path.abspath(os.path.dirname(__file__)) @@ -76,9 +76,6 @@ def setUpClass(cls): # Override the server's Butler initialization to point at our test repo server_butler = Butler.from_config(cls.root, writeable=True) - # Not yet testing butler.get() - DatastoreMock.apply(server_butler) - def create_factory_dependency(): return Factory(butler=server_butler) @@ -89,7 +86,9 @@ def create_factory_dependency(): cls.client.base_url = "http://test.example/api/butler/" cls.butler = _make_remote_butler(cls.client) - # Populate the test server. + # Populate the test server. The DatastoreMock is required because the + # datasets referenced in these imports do not point at real files + DatastoreMock.apply(server_butler) server_butler.import_(filename=os.path.join(TESTDIR, "data", "registry", "base.yaml")) server_butler.import_(filename=os.path.join(TESTDIR, "data", "registry", "datasets.yaml")) @@ -197,6 +196,29 @@ def override_read(http_resource_path): assert isinstance(butler, RemoteButler) assert str(butler._config.remote_butler.url) == "https://test.example/api/butler/" + def test_get(self): + # Test get() of a DatasetRef + ref = self.butler.find_dataset( + "test_metric_comp", + {"instrument": "DummyCamComp", "visit": 423}, + collections="ingest/run", + ) + metric = self.butler.get(ref) + self.assertIsInstance(metric, MetricsExample) + self.assertEqual(metric.summary, MetricTestRepo.METRICS_EXAMPLE_SUMMARY) + + # Test looking up a non-existent ref + invalid_ref = ref.replace(id=uuid.uuid4()) + with self.assertRaises(LookupError): + self.butler.get(invalid_ref) + + # Test storage class override + new_sc = self.storageClassFactory.getStorageClass("MetricsConversion") + converted = self.butler.get(ref, storageClass=new_sc) + self.assertNotEqual(type(metric), type(converted)) + self.assertIs(type(converted), new_sc.pytype) + self.assertEqual(metric, converted) + if __name__ == "__main__": unittest.main()