diff --git a/python/lsst/daf/butler/queries/_query.py b/python/lsst/daf/butler/queries/_query.py index 934f813dee..f2c8a1be53 100644 --- a/python/lsst/daf/butler/queries/_query.py +++ b/python/lsst/daf/butler/queries/_query.py @@ -35,6 +35,11 @@ from .._query import Query from ..dimensions import DataCoordinate, DataId, DataIdValue, DimensionGroup from .data_coordinate_results import DataCoordinateResultSpec, RelationDataCoordinateQueryResults +from .dataset_results import ( + ChainedDatasetQueryResults, + DatasetRefResultSpec, + RelationSingleTypeDatasetQueryResults, +) from .dimension_record_results import DimensionRecordResultSpec, RelationDimensionRecordQueryResults from .driver import QueryDriver from .expression_factory import ExpressionFactory, ExpressionProxy @@ -171,6 +176,8 @@ def data_ids( ) return RelationDataCoordinateQueryResults(self._driver, tree, result_spec) + # TODO add typing.overload variants for single-dataset-type and patterns. + def datasets( self, dataset_type: Any, @@ -183,7 +190,46 @@ def datasets( **kwargs: Any, ) -> DatasetQueryResults: # Docstring inherited. - raise NotImplementedError("TODO") + resolved_dataset_types = self._driver.resolve_dataset_type_wildcard(dataset_type) + data_id = DataCoordinate.standardize(data_id, universe=self._driver.universe, **kwargs) + where_terms = convert_where_args(self._tree, where, data_id, bind=bind, **kwargs) + single_type_results: list[RelationSingleTypeDatasetQueryResults] = [] + for name, resolved_dataset_type in resolved_dataset_types.items(): + tree = self._tree + if name not in tree.available_dataset_types: + resolved_collections, collections_ordered = self._driver.resolve_collection_wildcard( + collections + ) + if find_first and not collections_ordered: + raise InvalidRelationError( + f"Unordered collections argument {collections} requires find_first=False." + ) + tree = tree.join( + DatasetSearch.model_construct( + dataset_type=name, + dimensions=resolved_dataset_type.dimensions.as_group(), + collections=tuple(resolved_collections), + ) + ) + elif collections is not None: + raise InvalidRelationError( + f"Dataset type {name!r} was already joined into this query but new collections " + f"{collections!r} were still provided." + ) + if where_terms: + tree = tree.where(*where_terms) + if find_first: + tree = tree.find_first(name, resolved_dataset_type.dimensions.as_group()) + spec = DatasetRefResultSpec.model_construct( + dataset_type=resolved_dataset_type, include_dimension_records=self._include_dimension_records + ) + single_type_results.append( + RelationSingleTypeDatasetQueryResults(self._driver, tree=tree, spec=spec) + ) + if len(single_type_results) == 1: + return single_type_results[0] + else: + return ChainedDatasetQueryResults(tuple(single_type_results)) def dimension_records( self, diff --git a/python/lsst/daf/butler/queries/data_coordinate_results.py b/python/lsst/daf/butler/queries/data_coordinate_results.py index 5f85fa55de..75a61fdb28 100644 --- a/python/lsst/daf/butler/queries/data_coordinate_results.py +++ b/python/lsst/daf/butler/queries/data_coordinate_results.py @@ -77,7 +77,7 @@ class DataCoordinateResultPage(pydantic.BaseModel): class RelationDataCoordinateQueryResults(DataCoordinateQueryResults): - """Implementation of DataCoordinateQueryResults for the relation-based + """Implementation of `DataCoordinateQueryResults` for the relation-based query system. Parameters @@ -145,7 +145,9 @@ def expanded(self) -> DataCoordinateQueryResults: return RelationDataCoordinateQueryResults( self._driver, tree=self._tree, - spec=DataCoordinateResultSpec(dimensions=self._spec.dimensions, include_dimension_records=True), + spec=DataCoordinateResultSpec.model_construct( + dimensions=self._spec.dimensions, include_dimension_records=True + ), ) def subset( @@ -170,7 +172,7 @@ def subset( return RelationDataCoordinateQueryResults( self._driver, tree=self._tree, - spec=DataCoordinateResultSpec( + spec=DataCoordinateResultSpec.model_construct( dimensions=dimensions, include_dimension_records=self._spec.include_dimension_records ), ) diff --git a/python/lsst/daf/butler/queries/dataset_results.py b/python/lsst/daf/butler/queries/dataset_results.py index e655800dbd..be6bd1fa2d 100644 --- a/python/lsst/daf/butler/queries/dataset_results.py +++ b/python/lsst/daf/butler/queries/dataset_results.py @@ -30,24 +30,34 @@ __all__ = ( "DatasetRefResultSpec", "DatasetRefResultPage", + "RelationSingleTypeDatasetQueryResults", ) +import itertools +from collections.abc import Iterable, Iterator +from contextlib import ExitStack, contextmanager from typing import Literal import pydantic from .._dataset_ref import DatasetRef -from ..dimensions import DimensionGroup -from .driver import PageKey +from .._dataset_type import DatasetType +from .._query_results import SingleTypeDatasetQueryResults +from .data_coordinate_results import ( + DataCoordinateResultSpec, + DatasetQueryResults, + RelationDataCoordinateQueryResults, +) +from .driver import PageKey, QueryDriver +from .relation_tree import Materialization, RootRelation, make_unit_relation class DatasetRefResultSpec(pydantic.BaseModel): """Specification for a query that yields `DatasetRef` objects.""" result_type: Literal["dataset_ref"] = "dataset_ref" - dataset_type_name: str | None - dimensions: DimensionGroup - with_dimension_records: bool + dataset_type: DatasetType + include_dimension_records: bool class DatasetRefResultPage(pydantic.BaseModel): @@ -60,3 +70,166 @@ class DatasetRefResultPage(pydantic.BaseModel): # attached DimensionRecords and is Pydantic-friendly. Right now this model # isn't actually serializable. rows: list[DatasetRef] + + +class RelationSingleTypeDatasetQueryResults(SingleTypeDatasetQueryResults): + """Implementation of `SingleTypeDatasetQueryResults` for the relation-based + query system. + + Parameters + ---------- + driver : `QueryDriver` + Implementation object that knows how to actually execute queries. + tree : `RootRelation` + Description of the query as a tree of relation operations. The + instance returned directly by the `Butler._query` entry point should be + constructed via `make_unit_relation`. + spec : `DatasetRefResultSpec` + Specification for the details of the dataset references to return. + + Notes + ----- + Ideally this will eventually just be "SingleTypeDatasetQueryResults", + because we won't need an ABC if this is the only implementation. + """ + + def __init__(self, driver: QueryDriver, tree: RootRelation, spec: DatasetRefResultSpec): + self._driver = driver + self._tree = tree + self._spec = spec + + def __iter__(self) -> Iterator[DatasetRef]: + page = self._driver.execute(self._tree, self._spec) + yield from page.rows + while page.next_key is not None: + page = self._driver.fetch_next_page(self._spec, page.next_key) + yield from page.rows + + @contextmanager + def materialize(self) -> Iterator[RelationSingleTypeDatasetQueryResults]: + # Docstring inherited from DatasetQueryResults. + key = self._driver.materialize(self._tree, frozenset()) + yield RelationSingleTypeDatasetQueryResults( + self._driver, + tree=make_unit_relation(self._driver.universe).join( + Materialization.model_construct( + key=key, operand=self._tree, dataset_types=frozenset({self.dataset_type.name}) + ) + ), + spec=self._spec, + ) + # TODO: Right now we just rely on the QueryDriver context instead of + # using this one. If we want this to remain a context manager, we + # should make it do something, e.g. by adding QueryDriver method to + # drop a materialization. + + @property + def dataset_type(self) -> DatasetType: + # Docstring inherited. + return self._spec.dataset_type + + @property + def data_ids(self) -> RelationDataCoordinateQueryResults: + # Docstring inherited. + return RelationDataCoordinateQueryResults( + self._driver, + tree=self._tree, + spec=DataCoordinateResultSpec.model_construct( + dimensions=self.dataset_type.dimensions.as_group(), + include_dimension_records=self._spec.include_dimension_records, + ), + ) + + def expanded(self) -> RelationSingleTypeDatasetQueryResults: + # Docstring inherited. + if self._spec.include_dimension_records: + return self + return RelationSingleTypeDatasetQueryResults( + self._driver, + tree=self._tree, + spec=DatasetRefResultSpec.model_construct( + dataset_type=self.dataset_type, include_dimension_records=True + ), + ) + + def by_dataset_type(self) -> Iterator[SingleTypeDatasetQueryResults]: + # Docstring inherited. + return iter((self,)) + + def count(self, *, exact: bool = True, discard: bool = False) -> int: + # Docstring inherited. + return self._driver.count(self._tree, exact=exact, discard=discard) + + def any(self, *, execute: bool = True, exact: bool = True) -> bool: + # Docstring inherited. + return self._driver.any(self._tree, execute=execute, exact=exact) + + def explain_no_results(self, execute: bool = True) -> Iterable[str]: + # Docstring inherited. + return self._driver.explain_no_results(self._tree, execute=execute) + + +class ChainedDatasetQueryResults(DatasetQueryResults): + """Implementation of `DatasetQueryResults` that delegates to a sequence + of `SingleTypeDatasetQueryResults`. + + Parameters + ---------- + by_dataset_type : `tuple` [ `SingleTypeDatasetQueryResults` ] + Tuple of single-dataset-type query result objects to combine. + + Notes + ----- + Ideally this will eventually just be "DatasetQueryResults", because we + won't need an ABC if this is the only implementation. + """ + + def __init__(self, by_dataset_type: tuple[SingleTypeDatasetQueryResults, ...]): + self._by_dataset_type = by_dataset_type + + def __iter__(self) -> Iterator[DatasetRef]: + return itertools.chain.from_iterable(self._by_dataset_type) + + def by_dataset_type(self) -> Iterator[SingleTypeDatasetQueryResults]: + # Docstring inherited. + return iter(self._by_dataset_type) + + @contextmanager + def materialize(self) -> Iterator[DatasetQueryResults]: + # Docstring inherited. + with ExitStack() as exit_stack: + yield ChainedDatasetQueryResults( + tuple( + [ + exit_stack.enter_context(single_type_results.materialize()) + for single_type_results in self._by_dataset_type + ] + ) + ) + + def expanded(self) -> ChainedDatasetQueryResults: + # Docstring inherited. + return ChainedDatasetQueryResults( + tuple([single_type_results.expanded() for single_type_results in self._by_dataset_type]) + ) + + def count(self, *, exact: bool = True, discard: bool = False) -> int: + # Docstring inherited. + return sum( + single_type_results.count(exact=exact, discard=discard) + for single_type_results in self._by_dataset_type + ) + + def any(self, *, execute: bool = True, exact: bool = True) -> bool: + # Docstring inherited. + return any( + single_type_results.any(execute=execute, exact=exact) + for single_type_results in self._by_dataset_type + ) + + def explain_no_results(self, execute: bool = True) -> Iterable[str]: + # Docstring inherited. + messages: list[str] = [] + for single_type_results in self._by_dataset_type: + messages.extend(single_type_results.explain_no_results(execute=execute)) + return messages diff --git a/python/lsst/daf/butler/queries/driver.py b/python/lsst/daf/butler/queries/driver.py index 0b875b891e..0cd8e4ecd3 100644 --- a/python/lsst/daf/butler/queries/driver.py +++ b/python/lsst/daf/butler/queries/driver.py @@ -33,10 +33,11 @@ from abc import abstractmethod from collections.abc import Iterable from contextlib import AbstractContextManager -from typing import Annotated, TypeAlias, Union, overload +from typing import TYPE_CHECKING, Annotated, Any, TypeAlias, Union, overload import pydantic +from .._dataset_type import DatasetType from ..dimensions import DataIdValue, DimensionGroup, DimensionUniverse from .data_coordinate_results import DataCoordinateResultPage, DataCoordinateResultSpec from .dataset_results import DatasetRefResultPage, DatasetRefResultSpec @@ -44,6 +45,9 @@ from .general_results import GeneralResultPage, GeneralResultSpec from .relation_tree import MaterializationKey, RootRelation, UploadKey +if TYPE_CHECKING: + from ..registry import CollectionArgType + PageKey: TypeAlias = uuid.UUID @@ -63,6 +67,8 @@ class QueryDriver(AbstractContextManager[None]): """Base class for the implementation object inside `RelationQuery` objects that is specialized for DirectButler vs. RemoteButler. + Notes + ----- Implementations should be context managers. This allows them to manage the lifetime of server-side state, such as: @@ -300,6 +306,49 @@ def explain_no_results(self, tree: RootRelation, execute: bool) -> Iterable[str] """ raise NotImplementedError() + @abstractmethod + def resolve_collection_wildcard( + self, collections: CollectionArgType | None = None + ) -> tuple[list[str], bool]: + """Resolve a collection argument into a sequence of collection names. + + Parameters + ---------- + collections + Collection search path argument. If `None`, the default + collections for the client should be used, if there are any. + + Returns + ------- + matched : `list` [ `str` ] + Matching collection names. `~CollectionType.CHAINED` collections + are included directly rather than flattened. + ordered : `bool` + If `True`, the expression specified an order that can be used in + a find-first search. + """ + raise NotImplementedError() + + @abstractmethod + def resolve_dataset_type_wildcard(self, dataset_type: Any) -> dict[str, DatasetType]: + """Resolve a dataset type argument into a mapping of `DatasetType` + objects. + + Parameters + ---------- + dataset_type + Dataset type name, object, or wildcard to resolve. + + Returns + ------- + matched : `dict` [ `str`, `DatasetType` ] + Mapping from dataset type name to dataset type. Storage classes + passed in should be preserved, but component dataset types should + result in an exception. + """ + raise NotImplementedError() + @abstractmethod def get_dataset_dimensions(self, name: str) -> DimensionGroup: + """Return the dimensions for a dataset type.""" raise NotImplementedError()