Skip to content

Commit

Permalink
Move data ID expansion implementation to DataCoordinate.
Browse files Browse the repository at this point in the history
This adds a new method to Registry and SqlRegistry without a
RemoteRegistry implementation, and that's why it isn't decorated with
abstractmethod yet.  I'll fix that after I make the DimensionRecord
containers serializable.

This also adds a check to DataCoordinate.standardize that any passed
keys are actually dimension names; this addresses a long-standing
problem where users would get a keyword argument to some higher-level
API wrong, and it would be forwarded as **kwargs down to
DataCoordinate.standardize and then silently ignored.  And it turns
out we were doing that even in our own test utility code!
  • Loading branch information
TallJimbo authored and timj committed Jan 12, 2023
1 parent decbdc9 commit 20ae036
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

from __future__ import annotations

__all__ = ("HeterogeneousDimensionRecordAbstractSet", "HomogeneousDimensionRecordAbstractSet")
__all__ = (
"HeterogeneousDimensionRecordAbstractSet",
"HomogeneousDimensionRecordAbstractSet",
)

from abc import abstractmethod
from typing import Any, Collection, Iterator, Mapping
Expand Down
98 changes: 84 additions & 14 deletions python/lsst/daf/butler/core/dimensions/_coordinate.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@

from __future__ import annotations

__all__ = ("DataCoordinate", "DataId", "DataIdKey", "DataIdValue", "SerializedDataCoordinate")
__all__ = (
"DataCoordinate",
"DataId",
"DataIdKey",
"DataIdValue",
"InconsistentDataIdError",
"SerializedDataCoordinate",
)

from abc import abstractmethod
import numbers
Expand All @@ -53,6 +60,7 @@

if TYPE_CHECKING: # Imports needed only for type annotations; may be circular.
from ._universe import DimensionUniverse
from .._containers import HeterogeneousDimensionRecordAbstractSet
from ...registry import Registry

DataIdKey = Union[str, Dimension]
Expand All @@ -67,6 +75,12 @@
"""


class InconsistentDataIdError(ValueError):
"""Exception raised when a data ID contains contradictory key-value pairs,
according to dimension relationships.
"""


class SerializedDataCoordinate(BaseModel):
"""Simplified model for serializing a `DataCoordinate`."""

Expand Down Expand Up @@ -149,6 +163,7 @@ def standardize(
graph: Optional[DimensionGraph] = None,
universe: Optional[DimensionUniverse] = None,
defaults: Optional[DataCoordinate] = None,
records: Optional[HeterogeneousDimensionRecordAbstractSet] = None,
**kwargs: Any
) -> DataCoordinate:
"""Standardize the supplied dataId.
Expand All @@ -173,6 +188,10 @@ def standardize(
Default dimension key-value pairs to use when needed. These are
never used to infer ``graph``, and are ignored if a different value
is provided for the same key in ``mapping`` or `**kwargs``.
records : `HeterogeneousDimensionRecordAbstractSet`, optional
Container of `DimensionRecord` instances that may be used to
fill in missing keys and/or attach records. If provided, the
returned object is guaranteed to have `hasRecords` return `True`.
**kwargs
Additional keyword arguments are treated like additional key-value
pairs in ``mapping``.
Expand All @@ -190,33 +209,41 @@ def standardize(
Raised if a key-value pair for a required dimension is missing.
"""
d: Dict[str, DataIdValue] = {}
r: Dict[str, Optional[DimensionRecord]] = {}
if isinstance(mapping, DataCoordinate):
if graph is None:
if not kwargs:
if not kwargs and records is None:
# Already standardized to exactly what we want.
return mapping
elif kwargs.keys().isdisjoint(graph.dimensions.names):
# User provided kwargs, but told us not to use them by
# passing in dimensions that are disjoint from those kwargs.
# This is not necessarily user error - it's a useful pattern
# to pass in all of the key-value pairs you have and let the
# code here pull out only what it needs.
return mapping.subset(graph)
assert universe is None or universe == mapping.universe
universe = mapping.universe
d.update((name, mapping[name]) for name in mapping.graph.required.names)
if mapping.hasFull():
d.update((name, mapping[name]) for name in mapping.graph.implied.names)
if mapping.hasRecords():
r.update((name, mapping.records[name]) for name in mapping.graph.elements.names)
elif isinstance(mapping, NamedKeyMapping):
d.update(mapping.byName())
elif mapping is not None:
d.update(mapping)
d.update(kwargs)
if graph is None:
if defaults is not None:
if universe is None:
if graph is not None:
universe = graph.universe
elif defaults is not None:
universe = defaults.universe
elif universe is None:
raise TypeError("universe must be provided if graph is not.")
else:
raise TypeError("universe must be provided if graph and defaults are not.")
if not (d.keys() <= universe.getStaticDimensions().names):
# We silently ignore keys that aren't relevant for this particular
# data ID, but keys that aren't relevant for any possible data ID
# are a bug that we want to report to the user. This is especially
# important because other code frequently forwards unrecognized
# kwargs here.
raise ValueError(
f"Unrecognized key(s) for data ID: {d.keys() - universe.getStaticDimensions().names}."
)
if graph is None:
graph = DimensionGraph(universe, names=d.keys())
if not graph.dimensions:
return DataCoordinate.makeEmpty(graph.universe)
Expand All @@ -227,6 +254,46 @@ def standardize(
else:
for k, v in defaults.items():
d.setdefault(k.name, v)
if records is not None:
for element in graph.primaryKeyTraversalOrder:
record = r.get(element.name, ...) # Use ... to mean not found; None might mean NULL
if record is ...:
if isinstance(element, Dimension) and d.get(element.name) is None:
if element in graph.required:
raise LookupError(
f"No value or null value for required dimension {element.name}."
)
d[element.name] = None
record = None
else:
subset_data_id = DataCoordinate.standardize(d, graph=element.graph)
try:
record = records.by_definition[element].by_data_id[subset_data_id]
except KeyError:
record = None
r[element.name] = record
if record is not None:
for dimension in element.implied:
value = getattr(record, dimension.name)
if d.setdefault(dimension.name, value) != value:
raise InconsistentDataIdError(
f"Data ID {d} has {dimension.name}={d[dimension.name]!r}, "
f"but {element.name} implies {dimension.name}={value!r}."
)
else:
if element in graph.required:
raise LookupError(
f"Could not find record for required dimension {element.name} via {d}."
)
if element.alwaysJoin:
raise InconsistentDataIdError(
f"Could not fetch record for element {element.name} via {d}, ",
"but it is marked alwaysJoin=True; this means one or more dimensions are not "
"related.",
)
for dimension in element.implied:
d.setdefault(dimension.name, None)
r.setdefault(dimension.name, None)
if d.keys() >= graph.dimensions.names:
values = tuple(d[name] for name in graph._dataCoordinateIndices.keys())
else:
Expand All @@ -238,7 +305,10 @@ def standardize(
# numbers.Integral; convert that to int.
values = tuple(int(val) if isinstance(val, numbers.Integral) # type: ignore
else val for val in values)
return _BasicTupleDataCoordinate(graph, values)
result: DataCoordinate = _BasicTupleDataCoordinate(graph, values)
if r.keys() >= graph.elements.names:
result = result.expanded(r)
return result

@staticmethod
def makeEmpty(universe: DimensionUniverse) -> DataCoordinate:
Expand Down
76 changes: 16 additions & 60 deletions python/lsst/daf/butler/registries/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,10 @@
DimensionGraph,
DimensionRecord,
DimensionUniverse,
HeterogeneousDimensionRecordCache,
HomogeneousDimensionRecordIterable,
NamedKeyMapping,
NameLookupMapping,
Progress,
ScalarDataCoordinateSet,
StorageClassFactory,
Timespan,
)
Expand All @@ -76,7 +75,6 @@
CollectionType,
RegistryDefaults,
ConflictingDefinitionError,
InconsistentDataIdError,
OrphanedRecordError,
CollectionSearch,
)
Expand Down Expand Up @@ -619,6 +617,14 @@ def getDatasetLocations(self, ref: DatasetRef) -> Iterable[str]:
# Docstring inherited from lsst.daf.butler.registry.Registry
return self._managers.datastores.findDatastores(ref)

def getDimensionRecordCache(self) -> HeterogeneousDimensionRecordCache:
# Docstring inherited.
callbacks = {
element.name: self._managers.dimensions[element].fetch
for element in self.dimensions.getStaticElements()
}
return HeterogeneousDimensionRecordCache(self.dimensions, callbacks)

def expandDataId(self, dataId: Optional[DataId] = None, *, graph: Optional[DimensionGraph] = None,
records: Optional[NameLookupMapping[DimensionElement, Optional[DimensionRecord]]] = None,
withDefaults: bool = True,
Expand All @@ -628,63 +634,13 @@ def expandDataId(self, dataId: Optional[DataId] = None, *, graph: Optional[Dimen
defaults = None
else:
defaults = self.defaults.dataId
standardized = DataCoordinate.standardize(dataId, graph=graph, universe=self.dimensions,
defaults=defaults, **kwargs)
if standardized.hasRecords():
return standardized
if records is None:
records = {}
elif isinstance(records, NamedKeyMapping):
records = records.byName()
else:
records = dict(records)
if isinstance(dataId, DataCoordinate) and dataId.hasRecords():
records.update(dataId.records.byName())
keys = standardized.byName()
for element in standardized.graph.primaryKeyTraversalOrder:
record = records.get(element.name, ...) # Use ... to mean not found; None might mean NULL
if record is ...:
if isinstance(element, Dimension) and keys.get(element.name) is None:
if element in standardized.graph.required:
raise LookupError(
f"No value or null value for required dimension {element.name}."
)
keys[element.name] = None
record = None
else:
storage = self._managers.dimensions[element]
dataIdSet = ScalarDataCoordinateSet(
DataCoordinate.standardize(keys, graph=element.graph)
)
fetched = tuple(storage.fetch(dataIdSet))
try:
(record,) = fetched
except ValueError:
record = None
records[element.name] = record
if record is not None:
for d in element.implied:
value = getattr(record, d.name)
if keys.setdefault(d.name, value) != value:
raise InconsistentDataIdError(
f"Data ID {standardized} has {d.name}={keys[d.name]!r}, "
f"but {element.name} implies {d.name}={value!r}."
)
else:
if element in standardized.graph.required:
raise LookupError(
f"Could not fetch record for required dimension {element.name} via keys {keys}."
)
if element.alwaysJoin:
raise InconsistentDataIdError(
f"Could not fetch record for element {element.name} via keys {keys}, ",
"but it is marked alwaysJoin=True; this means one or more dimensions are not "
"related."
)
for d in element.implied:
keys.setdefault(d.name, None)
records.setdefault(d.name, None)
return DataCoordinate.standardize(keys, graph=standardized.graph).expanded(records=records)
cache = self.getDimensionRecordCache()
if records is not None:
for record in records.values():
if record is not None:
cache.add(record)
return DataCoordinate.standardize(dataId, graph=graph, defaults=defaults, records=cache,
universe=self.dimensions, **kwargs)

def insertDimensionData(self, element: Union[DimensionElement, str],
*data: Union[Mapping[str, Any], DimensionRecord],
Expand Down
8 changes: 3 additions & 5 deletions python/lsst/daf/butler/registry/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
"UnsupportedIdGeneratorError",
)


class InconsistentDataIdError(ValueError):
"""Exception raised when a data ID contains contradictory key-value pairs,
according to dimension relationships.
"""
# This exception moved for intra-daf_butler dependency reasons; import here and
# re-export for backwards compatibility.
from ..core import InconsistentDataIdError


class ConflictingDefinitionError(Exception):
Expand Down
31 changes: 31 additions & 0 deletions python/lsst/daf/butler/registry/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
DimensionGraph,
DimensionRecord,
DimensionUniverse,
HeterogeneousDimensionRecordCache,
HomogeneousDimensionRecordIterable,
NameLookupMapping,
StorageClassFactory,
Expand Down Expand Up @@ -1007,6 +1008,36 @@ def getDatasetLocations(self, ref: DatasetRef) -> Iterable[str]:
"""
raise NotImplementedError()

def getDimensionRecordCache(self) -> HeterogeneousDimensionRecordCache:
"""Return a container that fetches and caches `DimensionRecord` objects
from the database.
Returns
-------
cache : `HeterogeneousDimensionRecordCache`
A container that directly holds already-fetched `DimensionRecord`
objects and automatically fetches new ones as requested (see class
documentation for more information).
Notes
-----
This provides a simpler, faster interface for fetching dimension data
when the data IDs desired are already known exactly; use
`queryDimensionRecords` to return records via a more flexible
expression. One can also use the result of a call to
`queryDimensionRecords` to directly populate a cache::
cache = butler.registry.getDimensionRecordCache()
cache.update(butler.registry.queryDimensionRecords(...))
To obtain a container that does not automatically fetch missing
records, construct a `HeterogeneousDimensionRecordSet` from the cache::
cache = butler.registry.getDimensionRecordCache()
set = HeterogeneousDimensionRecordSet(cache.universe, cache)
"""
raise NotImplementedError()

@abstractmethod
def expandDataId(self, dataId: Optional[DataId] = None, *, graph: Optional[DimensionGraph] = None,
records: Optional[NameLookupMapping[DimensionElement, Optional[DimensionRecord]]] = None,
Expand Down
7 changes: 1 addition & 6 deletions python/lsst/daf/butler/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,6 @@ def addDataset(self, dataId, run=None, datasetType=None):
"""Create a new example metric and add it to the named run with the
given dataId.
Overwrites tags, so this does not try to associate the new dataset with
existing tags. (If/when tags are needed this can be added to the
arguments of this function.)
Parameters
----------
dataId : `dict`
Expand All @@ -270,5 +266,4 @@ def addDataset(self, dataId, run=None, datasetType=None):
self.butler.put(metric,
self.datasetType if datasetType is None else datasetType,
dataId,
run=run,
tags=())
run=run)

0 comments on commit 20ae036

Please sign in to comment.