Skip to content

Commit

Permalink
Integrate materialization and data ID uploads with driver.
Browse files Browse the repository at this point in the history
Working through the materialize method signature made me realize we
need the find_first method on RelationQuery, even if it'll be rarely
used.
  • Loading branch information
TallJimbo committed Dec 10, 2023
1 parent 1f3ca64 commit 5df890c
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 18 deletions.
93 changes: 89 additions & 4 deletions python/lsst/daf/butler/queries/_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
from .relation_tree import (
DataCoordinateUpload,
DatasetSearch,
InvalidRelationError,
Materialization,
OrderExpression,
Predicate,
RootRelation,
make_dimension_relation,
make_unit_relation,
)
from .relation_tree.joins import JoinArg

Expand Down Expand Up @@ -328,9 +331,38 @@ def limit(self, limit: int | None = None, offset: int = 0) -> RelationQuery:
include_dimension_records=self._include_dimension_records,
)

# TODO: Materialize should probably go here instead of
# DataCoordinateQueryResults, but the signature should probably change,
# too, and that requires more thought.
def materialize(self, *, dataset_types: Iterable[str] | None = ()) -> RelationQuery:
"""Execute the query, save its results to a temporary location, and
return a new query that represents fetching or joining against those
saved results.
Parameters
----------
dataset_types : `~collections.abc.Iterable` [ `str` ], optional
Names of dataset types whose ID fields (at least) should be
included in the temporary results; default is to include all
datasets types whose ID columns are currently available to the
query. Dataset searches over multiple collections are not
resolved, but enough information is preserved to resolve them
downstream of the materialization.
Returns
-------
query : `Query`
A new query object whose that represents the materialized rows.
"""
if dataset_types is None:
dataset_types = self._tree.available_dataset_types
else:
dataset_types = frozenset(dataset_types)
key = self._driver.materialize(self._tree, dataset_types)
return RelationQuery(
self._driver,
tree=make_unit_relation(self._driver.universe).join(
Materialization.model_construct(key=key, operand=self._tree, dataset_types=dataset_types)
),
include_dimension_records=self._include_dimension_records,
)

def join_dataset_search(
self,
Expand Down Expand Up @@ -390,8 +422,9 @@ def join_data_coordinates(self, iterable: Iterable[DataCoordinate]) -> RelationQ
rows.add(data_coordinate.required_values)
if dimensions is None:
raise RuntimeError("Cannot upload an empty data coordinate set.")
key = self._driver.upload_data_coordinates(dimensions, rows)
return RelationQuery(
tree=self._tree.join(DataCoordinateUpload(dimensions=dimensions, rows=rows)),
tree=self._tree.join(DataCoordinateUpload(dimensions=dimensions, key=key)),
driver=self._driver,
include_dimension_records=self._include_dimension_records,
)
Expand Down Expand Up @@ -503,6 +536,58 @@ def where(
include_dimension_records=self._include_dimension_records,
)

def find_first(
self, dataset_type: str | None = None, dimensions: DimensionGroup | None = None
) -> RelationQuery:
"""Return a query that resolves the datasets by searching collections
in order after grouping by data ID.
Parameters
----------
dataset_type : `str`, optional
Dataset type name of the datasets to resolve. May be omitted if
the query has exactly one dataset type joined in.
dimensions : `~collections.abc.Iterable` [ `str` ] or \
`DimensionGroup`, optional
Dimensions of the data IDs to group by. If not provided, the
dataset type's dimensions are used (not the query's dimensions).
Returns
-------
query : `Query`
A new query object that includes the dataset resolution operation.
Dataset columns other than the target dataset type's are dropped,
as are dimensions outside ``dimensions``.
Notes
-----
This operation is typically applied automatically when obtain a results
object from the query, but in rare cases it may need to be added
directly by this method, such as prior to a `materialization` or when
the query is being used as the container in a `relation_tree.InRange`
expression.
"""
if not self._tree.available_dataset_types:
raise InvalidRelationError("Query does not have any dataset searches joined in.")
if dataset_type is None:
try:
(dataset_type,) = self._tree.available_dataset_types
except ValueError:
raise InvalidRelationError(
"Find-first dataset type must be provided if the query has more than one."
) from None
elif dataset_type not in self._tree.available_dataset_types:
raise InvalidRelationError(
f"Find-first dataset type {dataset_type} is not present in query."
) from None
if dimensions is None:
dimensions = self._driver.get_dataset_dimensions(dataset_type)
return RelationQuery(
tree=self._tree.find_first(dataset_type, dimensions),
driver=self._driver,
include_dimension_records=self._include_dimension_records,
)

def _convert_order_by_args(self, *args: str | OrderExpression | ExpressionProxy) -> list[OrderExpression]:
"""Convert ``order_by`` arguments to a list of column expressions."""
raise NotImplementedError("TODO: Parse string expression.")
Expand Down
49 changes: 47 additions & 2 deletions python/lsst/daf/butler/queries/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@

import pydantic

from ..dimensions import DimensionGroup, DimensionUniverse
from ..dimensions import DataIdValue, DimensionGroup, DimensionUniverse
from .data_coordinate_results import DataCoordinateResultPage, DataCoordinateResultSpec
from .dataset_results import DatasetRefResultPage, DatasetRefResultSpec
from .dimension_record_results import DimensionRecordResultPage, DimensionRecordResultSpec
from .general_results import GeneralResultPage, GeneralResultSpec
from .relation_tree import RootRelation
from .relation_tree import MaterializationKey, RootRelation, UploadKey

PageKey: TypeAlias = uuid.UUID

Expand Down Expand Up @@ -185,6 +185,51 @@ def fetch_next_page(self, result_spec: ResultSpec, key: PageKey) -> ResultPage:
# the cursor is exactly.
raise NotImplementedError()

@abstractmethod
def materialize(self, tree: RootRelation, dataset_types: frozenset[str]) -> MaterializationKey:
"""Execute a relation tree, saving results to temporary storage for use
in later queries.
Parameters
----------
tree : `RootRelation`
Relation tree to evaluate.
dataset_types : `frozenset` [ `str` ]
Names of dataset types whose ID columns (at least) should be
preserved.
Returns
-------
key
Unique identifier for the result rows that allows them to be
referenced in a `Materialization` relation instance in relation
trees executed later.
"""
raise NotImplementedError()

@abstractmethod
def upload_data_coordinates(
self, dimensions: DimensionGroup, rows: Iterable[tuple[DataIdValue, ...]]
) -> UploadKey:
"""Upload a table of data coordinates for use in later queries.
Parameters
----------
dimensions : `DimensionGroup`
Dimensions of the data coordinates.
rows : `Iterable` [ `tuple` ]
Tuples of data coordinate values, covering just the "required"
subset of ``dimensions``.
Returns
-------
key
Unique identifier for the upload that allows it to be referenced in
a `DataCoordinateUpload` relation instance in relation trees
executed later.
"""
raise NotImplementedError()

@abstractmethod
def count(self, tree: RootRelation, *, exact: bool, discard: bool) -> int:
"""Return the number of rows a query would return.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@

from __future__ import annotations

__all__ = ("DataCoordinateUpload",)
__all__ = ("DataCoordinateUpload", "UploadKey")

from typing import Literal, final
import uuid
from typing import Literal, TypeAlias, final

from ...dimensions import DataIdValue, DimensionGroup
from ...dimensions import DimensionGroup
from ._base import RelationBase

UploadKey: TypeAlias = uuid.UUID


@final
class DataCoordinateUpload(RelationBase):
Expand All @@ -43,17 +46,13 @@ class DataCoordinateUpload(RelationBase):

relation_type: Literal["data_coordinate_upload"] = "data_coordinate_upload"

key: UploadKey
"""Key used by a `QueryDriver` to track the uploaded rows."""

dimensions: DimensionGroup
"""The dimensions of the data IDs."""

rows: frozenset[tuple[DataIdValue, ...]]
"""The required values of the data IDs."""

@property
def available_dataset_types(self) -> frozenset[str]:
# Docstring inherited.
return frozenset()

# We probably should validate that the tuples in 'rows' have the right
# length (len(dimensions.required)) and maybe the right types, but we might
# switch to Arrow here before that actually matters.
10 changes: 8 additions & 2 deletions python/lsst/daf/butler/queries/relation_tree/_materialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@

from __future__ import annotations

__all__ = ("Materialization",)
__all__ = ("Materialization", "MaterializationKey")

from typing import TYPE_CHECKING, Literal, final
import uuid
from typing import TYPE_CHECKING, Literal, TypeAlias, final

import pydantic

Expand All @@ -39,6 +40,8 @@
if TYPE_CHECKING:
from ._relation import RootRelation

MaterializationKey: TypeAlias = uuid.UUID


@final
class Materialization(RelationBase):
Expand All @@ -48,6 +51,9 @@ class Materialization(RelationBase):

relation_type: Literal["materialization"] = "materialization"

key: MaterializationKey
"""Key used by a `QueryDriver` to track the materialized rows."""

operand: RootRelation
"""The upstream relation to evaluate."""

Expand Down

0 comments on commit 5df890c

Please sign in to comment.