Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-47948: Preload dataset type cache for Butler server #1125

Merged
merged 10 commits into from
Dec 11, 2024
9 changes: 4 additions & 5 deletions python/lsst/daf/butler/_labeled_butler_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __init__(self, repositories: Mapping[str, str] | None = None) -> None:
self._initialization_locks = NamedLocks()

# This may be overridden by unit tests.
self._preload_direct_butler_cache = True
self._preload_unsafe_direct_butler_caches = True

def bind(self, access_token: str | None) -> LabeledButlerFactoryProtocol:
"""Create a callable factory function for generating Butler instances
Expand Down Expand Up @@ -161,7 +161,7 @@ def _create_butler_factory_function(self, label: str) -> _FactoryFunction:

match butler_type:
case ButlerType.DIRECT:
return _create_direct_butler_factory(config, self._preload_direct_butler_cache)
return _create_direct_butler_factory(config, self._preload_unsafe_direct_butler_caches)
case ButlerType.REMOTE:
return _create_remote_butler_factory(config)
case _:
Expand All @@ -177,7 +177,7 @@ def _get_config_uri(self, label: str) -> ResourcePathExpression:
return config_uri


def _create_direct_butler_factory(config: ButlerConfig, preload_cache: bool) -> _FactoryFunction:
def _create_direct_butler_factory(config: ButlerConfig, preload_unsafe_caches: bool) -> _FactoryFunction:
import lsst.daf.butler.direct_butler

# Create a 'template' Butler that will be cloned when callers request an
Expand All @@ -187,8 +187,7 @@ def _create_direct_butler_factory(config: ButlerConfig, preload_cache: bool) ->

# Load caches so that data is available in cloned instances without
# needing to refetch it from the database for every instance.
if preload_cache:
butler._preload_cache()
butler._preload_cache(load_dimension_record_cache=preload_unsafe_caches)

def create_butler(access_token: str | None) -> Butler:
# Access token is ignored because DirectButler does not use Gafaelfawr
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/daf/butler/direct_butler/_direct_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2339,9 +2339,9 @@ def _query_all_datasets_by_page(
pages = query_all_datasets(self, query, args)
yield iter(page.data for page in pages)

def _preload_cache(self) -> None:
def _preload_cache(self, *, load_dimension_record_cache: bool = True) -> None:
"""Immediately load caches that are used for common operations."""
self._registry.preload_cache()
self._registry.preload_cache(load_dimension_record_cache=load_dimension_record_cache)

_config: ButlerConfig
"""Configuration for this Butler instance."""
Expand Down
28 changes: 2 additions & 26 deletions python/lsst/daf/butler/registry/_caching_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,13 @@

from __future__ import annotations

__all__ = ["CachingContext", "GenericCachingContext"]

from typing import Generic, TypeAlias, TypeVar
__all__ = ["CachingContext"]

from ._collection_record_cache import CollectionRecordCache
from ._collection_summary_cache import CollectionSummaryCache
from ._dataset_type_cache import DatasetTypeCache

_T = TypeVar("_T")
_U = TypeVar("_U")


class GenericCachingContext(Generic[_T, _U]):
class CachingContext:
"""Collection of caches for various types of records retrieved from
database.

Expand All @@ -51,19 +45,9 @@ class GenericCachingContext(Generic[_T, _U]):
instances which will be `None` when caching is disabled. Instance of this
class is passed to the relevant managers that can use it to query or
populate caches when caching is enabled.

Dataset type cache is always enabled for now, this avoids the need for
explicitly enabling caching in pipetask executors.

`GenericCachingContext` is generic over two kinds of opaque dataset type
data, with the expectation that most code will use the ``CachingContext``
type alias (which resolves to `GenericCachingContext[object, object]`);
the `DatasetRecordStorageManager` can then cast this to a
`GenericCachingContext` with the actual opaque data types it uses.
"""

def __init__(self) -> None:
self._dataset_types: DatasetTypeCache[_T, _U] = DatasetTypeCache()
self._collection_records: CollectionRecordCache | None = None
self._collection_summaries: CollectionSummaryCache | None = None
self._depth = 0
Expand Down Expand Up @@ -107,11 +91,3 @@ def collection_records(self) -> CollectionRecordCache | None:
def collection_summaries(self) -> CollectionSummaryCache | None:
"""Cache for collection summary records (`CollectionSummaryCache`)."""
return self._collection_summaries

@property
def dataset_types(self) -> DatasetTypeCache[_T, _U]:
"""Cache for dataset types, never disabled (`DatasetTypeCache`)."""
return self._dataset_types


CachingContext: TypeAlias = GenericCachingContext[object, object]
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,21 @@
__all__ = ("DatasetTypeCache",)

from collections.abc import Iterable, Iterator
from typing import Generic, TypeVar

from .._dataset_type import DatasetType
from ..dimensions import DimensionGroup
from ...._dataset_type import DatasetType
from ....dimensions import DimensionGroup
from .tables import DynamicTables, DynamicTablesCache

_T = TypeVar("_T")
_U = TypeVar("_U")


class DatasetTypeCache(Generic[_T, _U]):
class DatasetTypeCache:
"""Cache for dataset types.

Notes
-----
This cache is a pair of mappings with different kinds of keys:

- the `DatasetType` itself is cached by name, as is some opaque data used
only by a `DatasetRecordStorageManager` implementation;
- additional opaque data (also used only by `DatasetRecordStorageManager`
implementations can be cached by the dimensions dataset types (i.e. a
`DimensionGroup`).

`DatasetTypeCache` is generic over these two opaque data types.
- Dataset type name -> (`DatasetType`, database integer primary key)
- `DimensionGroup` -> database table information

In some contexts (e.g. ``resolve_wildcard``) a full list of dataset types
is needed. To signify that cache content can be used in such contexts,
Expand All @@ -62,11 +54,41 @@ class DatasetTypeCache(Generic[_T, _U]):
"""

def __init__(self) -> None:
self._by_name_cache: dict[str, tuple[DatasetType, _T]] = {}
self._by_dimensions_cache: dict[DimensionGroup, _U] = {}
self.tables = DynamicTablesCache()
self._by_name_cache: dict[str, tuple[DatasetType, int]] = {}
self._by_dimensions_cache: dict[DimensionGroup, DynamicTables] = {}
self._full = False
self._dimensions_full = False

def clone(self) -> DatasetTypeCache:
"""Make a copy of the caches that are safe to use in another thread.

Notes
-----
After cloning, the ``tables`` cache will be shared between the new
instance and the current instance. It is safe to read and update
``tables`` from multiple threads simultaneously -- the cached values
are immutable table schemas, and they are looked up one at a time by
name.

The other caches are copied, because their access patterns are more
complex.

``full`` and ``dimensions_full`` will initially return `False` in the
new instance. This preserves the invariant that a Butler is able to
see any changes to the database made before the Butler is instantiated.
The downside is that the cloned cache will have to be re-fetched before
it can be used for glob searches.
"""
clone = DatasetTypeCache()
# Share DynamicTablesCache between instances.
clone.tables = self.tables
# The inner key/value objects are immutable in both of these caches, so
# we can shallow-copy the dicts.
clone._by_name_cache = self._by_name_cache.copy()
clone._by_dimensions_cache = self._by_dimensions_cache.copy()
return clone

@property
def full(self) -> bool:
"""`True` if cache holds all known dataset types (`bool`)."""
Expand All @@ -77,25 +99,26 @@ def dimensions_full(self) -> bool:
"""`True` if cache holds all known dataset type dimensions (`bool`)."""
return self._dimensions_full

def add(self, dataset_type: DatasetType, extra: _T) -> None:
def add(self, dataset_type: DatasetType, id: int) -> None:
"""Add one record to the cache.

Parameters
----------
dataset_type : `DatasetType`
Dataset type, replaces any existing dataset type with the same
name.
extra : `Any`
id : `int`
The dataset type primary key
dhirving marked this conversation as resolved.
Show resolved Hide resolved
Additional opaque object stored with this dataset type.
"""
self._by_name_cache[dataset_type.name] = (dataset_type, extra)
self._by_name_cache[dataset_type.name] = (dataset_type, id)

def set(
self,
data: Iterable[tuple[DatasetType, _T]],
data: Iterable[tuple[DatasetType, int]],
*,
full: bool = False,
dimensions_data: Iterable[tuple[DimensionGroup, _U]] | None = None,
dimensions_data: Iterable[tuple[DimensionGroup, DynamicTables]] | None = None,
dimensions_full: bool = False,
) -> None:
"""Replace cache contents with the new set of dataset types.
Expand Down Expand Up @@ -136,7 +159,7 @@ def discard(self, name: str) -> None:
"""
self._by_name_cache.pop(name, None)

def get(self, name: str) -> tuple[DatasetType | None, _T | None]:
def get(self, name: str) -> tuple[DatasetType | None, int | None]:
"""Return cached info given dataset type name.

Parameters
Expand Down Expand Up @@ -177,7 +200,7 @@ def get_dataset_type(self, name: str) -> DatasetType | None:
return None
return item[0]

def items(self) -> Iterator[tuple[DatasetType, _T]]:
def items(self) -> Iterator[tuple[DatasetType, int]]:
"""Return iterator for the set of items in the cache, can only be
used if `full` is true.

Expand All @@ -195,19 +218,19 @@ def items(self) -> Iterator[tuple[DatasetType, _T]]:
raise RuntimeError("cannot call items() if cache is not full")
return iter(self._by_name_cache.values())

def add_by_dimensions(self, dimensions: DimensionGroup, extra: _U) -> None:
def add_by_dimensions(self, dimensions: DimensionGroup, tables: DynamicTables) -> None:
"""Add information about a set of dataset type dimensions to the cache.

Parameters
----------
dimensions : `DimensionGroup`
Dimensions of one or more dataset types.
extra : `Any`
tables : `DynamicTables`
Additional opaque object stored with these dimensions.
"""
self._by_dimensions_cache[dimensions] = extra
self._by_dimensions_cache[dimensions] = tables

def get_by_dimensions(self, dimensions: DimensionGroup) -> _U | None:
def get_by_dimensions(self, dimensions: DimensionGroup) -> DynamicTables | None:
"""Get information about a set of dataset type dimensions.

Parameters
Expand All @@ -217,13 +240,13 @@ def get_by_dimensions(self, dimensions: DimensionGroup) -> _U | None:

Returns
-------
extra : `Any` or `None`
tables : `DynamicTables` or `None`
Additional opaque object stored with these dimensions, or `None` if
these dimensions are not present in the cache.
"""
return self._by_dimensions_cache.get(dimensions)

def by_dimensions_items(self) -> Iterator[tuple[DimensionGroup, _U]]:
def by_dimensions_items(self) -> Iterator[tuple[DimensionGroup, DynamicTables]]:
"""Return iterator for all dimensions-keyed data in the cache.

This can only be called if `dimensions_full` is `True`.
Expand Down
Loading
Loading