Skip to content

Commit

Permalink
Add DimensionRecordCache and helper classes.
Browse files Browse the repository at this point in the history
This includes a replacement (not used yet) for the caching that
currently happens behind the registry's DimensionRecordStorage object.
  • Loading branch information
TallJimbo committed Nov 27, 2023
1 parent 10bb2d6 commit 607a552
Show file tree
Hide file tree
Showing 2 changed files with 458 additions and 0 deletions.
343 changes: 343 additions & 0 deletions python/lsst/daf/butler/dimensions/record_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = (
"DimensionRecordCache",
"PersistentDimensionRecordCache",
"PersistentDimensionRecordCacheMapping",
"ScopedDimensionRecordCacheMapping",
)

from collections.abc import Callable, Mapping, Set
from typing import final

from ._coordinate import DataCoordinate
from ._group import DimensionGroup
from ._record_set import (
DimensionRecordFactory,
DimensionRecordSet,
DimensionRecordSetMapping,
fail_record_lookup,
make_trivial_record,
)
from ._records import DimensionRecord
from ._universe import DimensionUniverse


@final
class DimensionRecordCache:
"""A hybrid cache of dimension records for all elements in a set of
dimensions.
This class combines a `PersistentDimensionRecordCache` with its own mapping
`DimensionRecordSet` objects for other elements, with the expectation that
the former is shared by many objects and persistent, while the latter has
finite duration (e.g. a single `Butler` operation or explicitly-managed
query context).
Parameters
----------
persistent : `PersistentDimensionRecordCache`
Shared cache with a lifetime that extends beyond that of the cache
being constructed, responsible for caching the records of all elements
in `DimensionUniverse.persistent_cache_elements`.
scoped : `ScopedDimensionRecordCacheMapping`, optional
Mapping with records for elements that are not handled by the
persistent cache, with lifetime typically linked to the cache being
constructed.
default_record_factory : `DimensionRecordFactory`, optional
Callback used as the default for the ``or_add`` argument to `find`;
see that method for details.
"""

def __init__(
self,
persistent: PersistentDimensionRecordCache,
scoped: ScopedDimensionRecordCacheMapping | None = None,
default_record_factory: DimensionRecordFactory = fail_record_lookup,
):
if scoped is None:
scoped = ScopedDimensionRecordCacheMapping(persistent.universe, {})
self._persistent = persistent
self._scoped = scoped
self._reindexers: dict[DimensionGroup, dict[str, list[int]]] = {}
self._default_record_factory = default_record_factory

@property
def universe(self) -> DimensionUniverse:
"""Object that manages the definitions of all dimensions."""
return self._persistent.universe

def find(
self, element: str, data_id: DataCoordinate, or_add: DimensionRecordFactory | None = None
) -> DimensionRecord:
"""Find a record in the cache by data ID.
Parameters
----------
element : `str`
Name of the dimension element of the record being searched for.
data_id : `DataCoordinate`
Data ID used to look up dimension records.
`DataCoordinate.hasFull` must be `True`, and the dimensions of this
data ID must have been included in the cache via a call to
`include_dimension_group`.
or_add : `DimensionRecordFactory`, optional
Callback that is used to construct a new record if an existing one
cannot be found. This record is added to the cache in addition to
being returned. The default is to raise `LookupError`. This is
ignored for elements whose records are managed by the persistent
cache and elements whose records can be computed from just the
data ID values (like skypix dimensions).
Returns
-------
record : `DimensionRecord`
Cached record.
"""
try:
key = tuple([data_id.full_values[i] for i in self._reindexers[data_id.dimensions][element]])
except KeyError:
raise RuntimeError(
f"Dimension group {data_id.dimensions} has not been included in this cache."
) from None
if or_add is None:
or_add = self._default_record_factory
if (record_set := self._scoped.get(element)) is not None:
if record_set.RecordClass._tuple_is_required_values:
or_add = make_trivial_record
return record_set.find_with_required_values(key, or_add)
else:
record_set = self._persistent[element]
try:
return record_set.find_with_required_values(key)
except KeyError:
pass
# Cache misses for the persistent cache should be extremely rare;
# for simplicity we just reset the entire persistent cache and
# force a complete reload. Note that cache lookup is not intended
# to be a way to test whether a record for a given data ID exists
# at all in the repo.
self._persistent.reset()
record_set = self._persistent[element]
return record_set.find_with_required_values(key)

def include_dimension_group(self, dimensions: DimensionGroup) -> None:
"""Ensure the cache supports lookups using data IDs with the given
dimensions.
Parameters
----------
dimensions : `DimensionGroup`
Dimensions of data IDs that will be passed to `find`.
Notes
-----
This method is necessary but not sufficient for `find` to work on a
particular data ID, because it does not actually add any dimension
records to the cache. Actual records can only be added by modifying
the values of `mapping` directly or by passing an ``or_add`` callback
to `find`.
"""
if (reindexer_map := self._reindexers.setdefault(dimensions, {})) or not dimensions:
return
for element in dimensions.elements:
if element not in self.universe.persistent_cache_elements.names:
self._scoped.include(element)
reindexer_map[element] = [
dimensions._data_coordinate_indices[k] for k in self.universe[element].required.names
]

@property
def persistent(self) -> PersistentDimensionRecordCache:
"""Shared cache with a lifetime that extends beyond that of this cache,
responsible for caching the records of all elements in
`DimensionUniverse.persistent_cache_elements`.
"""
return self._persistent

@property
def scoped(self) -> ScopedDimensionRecordCacheMapping:
"""Mapping that holds record sets for elements that are not in the
persistent cache.
This is also the only cache state that should typically be serialized
when the goal is transferring the cache between processes (e.g. butler
client and server) that should each already have its own persistent
cache.
"""
return self._scoped


class PersistentDimensionRecordCache:
"""A dimension record cache that attempts to hold all records for certain
elements elements, expecting to be shared by all dimension record caches
in process.
This object caches all records for elements where
`DatabaseElement.is_cached` is `True`. This includes all governor
dimensions and some database dimension elements.
Parameters
----------
universe : `DimensionUniverse`
Object with definitions of all dimension elements.
load_callback : `collections.abc.Callable`
Callable that takes no arguments and returns a `dict` with the same
form and content as ``records``.
records : `dict` [ `str`, `DimensionRecordSet` ], optional
A dictionary mapping the name each element in
`DimensionUniverse.cached_elements` to a `DimensionRecordSet`
containing all of that element's records. If `None`, the cache is
initialized empty, and will be loaded on first use.
"""

def __init__(
self,
universe: DimensionUniverse,
load_callback: Callable[[], PersistentDimensionRecordCacheMapping],
records: PersistentDimensionRecordCacheMapping | None = None,
):
self._universe = universe
self._load_callback = load_callback
self._records = records

@property
def universe(self) -> DimensionUniverse:
"""Object with definitions of all dimension elements."""
return self._universe

@property
def is_loaded(self) -> bool:
"""Whether the cache currently has records."""
return self._records is not None

def __getitem__(self, element: str) -> DimensionRecordSet:
return self._get_records()[element]

def reset(self) -> None:
"""Reset the cache, causing all records to be reloaded on next use."""
self._records = None

def ensure_loaded(self) -> None:
"""Explicitly load all records into the cache if they are not already
present.
"""
self._get_records()

def _get_records(self) -> PersistentDimensionRecordCacheMapping:
"""Return the internal `dict` of records, loading it if necessary."""
if self._records is None:
self._records = self._load_callback()
return self._records

@property
def records(self) -> PersistentDimensionRecordCacheMapping | None:
"""The internal mapping that backs the cache, or `None` if the cache
has not been loaded since it was last `reset`.
"""
return self._records


class PersistentDimensionRecordCacheMapping(DimensionRecordSetMapping):
"""A Pydantic-serializable mapping of dimension record sets that backs
the persistent cache.
The keys of this mapping are fixed to the names of the dimension elements
in `DimensionUniverse.persistent_cache_elements`, in that order.
Parameters
----------
universe : `DimensionUniverse`
Object that holds definitions for all dimensions.
record_sets : `~collections.abc.Mapping` [ `str`, `DimensionRecordSet` ], \
optional
Record sets to initialize the mapping with. Elements whose records are
not persistently cached are ignored, and missing elements whose records
are cached will be initialized with empty sets. `DimensionRecordSet`
objects that are used are included directly, not copied, but the given
mapping itself is not included directly even if it is complete.
"""

def __init__(
self, universe: DimensionUniverse, record_sets: Mapping[str, DimensionRecordSet] | None = None
):
if record_sets is None:
complete_record_sets = {
element: DimensionRecordSet(element, universe)
for element in self._get_serialization_elements(universe)
}
else:
complete_record_sets = {}
for element in self._get_serialization_elements(universe):
if (record_set := record_sets.get(element)) is None:
record_set = DimensionRecordSet(element, universe)
complete_record_sets[element] = record_set
super().__init__(universe, complete_record_sets)

@classmethod
def _get_serialization_elements(cls, universe: DimensionUniverse) -> Set[str]:
return universe.persistent_cache_elements.names


class ScopedDimensionRecordCacheMapping(DimensionRecordSetMapping):
"""A Pydantic-serializable mapping of dimension record sets that backs the
non-persistent part of a dimension record cache.
Parameters
----------
universe : `DimensionUniverse`
Object that holds definitions for all dimensions.
record_sets : `dict` [ `str`, `DimensionRecordSet` ]
Dictionary of record sets used to directly back the mapping. Must not
include elements who records are in the persistent cache.
Notes
-----
While this mapping can hold skypix dimensions and other dimensions whose
records can be constructed from their data IDs, it does not include those
records in its serialized form (or accept them when deserializing).
"""

def __init__(self, universe: DimensionUniverse, record_sets: dict[str, DimensionRecordSet]):
assert record_sets.keys().isdisjoint(
universe.persistent_cache_elements.names
), "ScopedDimensionRecordCacheMapping keys should not include elements in the persistent cache."
super().__init__(universe, record_sets)

@classmethod
def _get_serialization_elements(cls, universe: DimensionUniverse) -> Set[str]:
return universe.database_elements.names - universe.persistent_cache_elements.names

def include(self, element: str) -> None:
"""Add an empty set for the given element's records if there is no set
already.
"""
if element not in self._record_sets:
self._record_sets[element] = DimensionRecordSet(element, self.universe)
Loading

0 comments on commit 607a552

Please sign in to comment.