From 59b45c5f331a62a3cb8dd7bc0af3c684a7ee75b6 Mon Sep 17 00:00:00 2001 From: "David H. Irving" Date: Wed, 4 Dec 2024 11:24:45 -0700 Subject: [PATCH 01/10] Make DatasetTypeCache non-generic DatasetTypeCache is only used in a single file with a specific set of types, so it no longer needs to be generic. Upcoming changes to make it thread-safe and cloneable will be easier to reason about with concrete types. --- .../daf/butler/registry/_caching_context.py | 22 ++------- .../butler/registry/_dataset_type_cache.py | 48 +++++++++---------- .../datasets/byDimensions/_manager.py | 6 +-- 3 files changed, 29 insertions(+), 47 deletions(-) diff --git a/python/lsst/daf/butler/registry/_caching_context.py b/python/lsst/daf/butler/registry/_caching_context.py index 81362df826..631f3ab278 100644 --- a/python/lsst/daf/butler/registry/_caching_context.py +++ b/python/lsst/daf/butler/registry/_caching_context.py @@ -27,19 +27,14 @@ from __future__ import annotations -__all__ = ["CachingContext", "GenericCachingContext"] - -from typing import Generic, TypeAlias, TypeVar +__all__ = ["CachingContext"] from ._collection_record_cache import CollectionRecordCache from ._collection_summary_cache import CollectionSummaryCache from ._dataset_type_cache import DatasetTypeCache -_T = TypeVar("_T") -_U = TypeVar("_U") - -class GenericCachingContext(Generic[_T, _U]): +class CachingContext: """Collection of caches for various types of records retrieved from database. @@ -54,16 +49,10 @@ class is passed to the relevant managers that can use it to query or Dataset type cache is always enabled for now, this avoids the need for explicitly enabling caching in pipetask executors. - - `GenericCachingContext` is generic over two kinds of opaque dataset type - data, with the expectation that most code will use the ``CachingContext`` - type alias (which resolves to `GenericCachingContext[object, object]`); - the `DatasetRecordStorageManager` can then cast this to a - `GenericCachingContext` with the actual opaque data types it uses. """ def __init__(self) -> None: - self._dataset_types: DatasetTypeCache[_T, _U] = DatasetTypeCache() + self._dataset_types: DatasetTypeCache = DatasetTypeCache() self._collection_records: CollectionRecordCache | None = None self._collection_summaries: CollectionSummaryCache | None = None self._depth = 0 @@ -109,9 +98,6 @@ def collection_summaries(self) -> CollectionSummaryCache | None: return self._collection_summaries @property - def dataset_types(self) -> DatasetTypeCache[_T, _U]: + def dataset_types(self) -> DatasetTypeCache: """Cache for dataset types, never disabled (`DatasetTypeCache`).""" return self._dataset_types - - -CachingContext: TypeAlias = GenericCachingContext[object, object] diff --git a/python/lsst/daf/butler/registry/_dataset_type_cache.py b/python/lsst/daf/butler/registry/_dataset_type_cache.py index ab43c80ca5..17569b4058 100644 --- a/python/lsst/daf/butler/registry/_dataset_type_cache.py +++ b/python/lsst/daf/butler/registry/_dataset_type_cache.py @@ -30,29 +30,24 @@ __all__ = ("DatasetTypeCache",) from collections.abc import Iterable, Iterator -from typing import Generic, TypeVar +from typing import TYPE_CHECKING from .._dataset_type import DatasetType from ..dimensions import DimensionGroup -_T = TypeVar("_T") -_U = TypeVar("_U") +if TYPE_CHECKING: + from .datasets.byDimensions.tables import DynamicTables -class DatasetTypeCache(Generic[_T, _U]): +class DatasetTypeCache: """Cache for dataset types. Notes ----- This cache is a pair of mappings with different kinds of keys: - - the `DatasetType` itself is cached by name, as is some opaque data used - only by a `DatasetRecordStorageManager` implementation; - - additional opaque data (also used only by `DatasetRecordStorageManager` - implementations can be cached by the dimensions dataset types (i.e. a - `DimensionGroup`). - - `DatasetTypeCache` is generic over these two opaque data types. + - Dataset type name -> (`DatasetType`, database integer primary key) + - `DimensionGroup` -> database table information In some contexts (e.g. ``resolve_wildcard``) a full list of dataset types is needed. To signify that cache content can be used in such contexts, @@ -62,8 +57,8 @@ class DatasetTypeCache(Generic[_T, _U]): """ def __init__(self) -> None: - self._by_name_cache: dict[str, tuple[DatasetType, _T]] = {} - self._by_dimensions_cache: dict[DimensionGroup, _U] = {} + self._by_name_cache: dict[str, tuple[DatasetType, int]] = {} + self._by_dimensions_cache: dict[DimensionGroup, DynamicTables] = {} self._full = False self._dimensions_full = False @@ -77,7 +72,7 @@ def dimensions_full(self) -> bool: """`True` if cache holds all known dataset type dimensions (`bool`).""" return self._dimensions_full - def add(self, dataset_type: DatasetType, extra: _T) -> None: + def add(self, dataset_type: DatasetType, id: int) -> None: """Add one record to the cache. Parameters @@ -85,17 +80,18 @@ def add(self, dataset_type: DatasetType, extra: _T) -> None: dataset_type : `DatasetType` Dataset type, replaces any existing dataset type with the same name. - extra : `Any` + id : `int` + The dataset type primary key Additional opaque object stored with this dataset type. """ - self._by_name_cache[dataset_type.name] = (dataset_type, extra) + self._by_name_cache[dataset_type.name] = (dataset_type, id) def set( self, - data: Iterable[tuple[DatasetType, _T]], + data: Iterable[tuple[DatasetType, int]], *, full: bool = False, - dimensions_data: Iterable[tuple[DimensionGroup, _U]] | None = None, + dimensions_data: Iterable[tuple[DimensionGroup, DynamicTables]] | None = None, dimensions_full: bool = False, ) -> None: """Replace cache contents with the new set of dataset types. @@ -136,7 +132,7 @@ def discard(self, name: str) -> None: """ self._by_name_cache.pop(name, None) - def get(self, name: str) -> tuple[DatasetType | None, _T | None]: + def get(self, name: str) -> tuple[DatasetType | None, int | None]: """Return cached info given dataset type name. Parameters @@ -177,7 +173,7 @@ def get_dataset_type(self, name: str) -> DatasetType | None: return None return item[0] - def items(self) -> Iterator[tuple[DatasetType, _T]]: + def items(self) -> Iterator[tuple[DatasetType, int]]: """Return iterator for the set of items in the cache, can only be used if `full` is true. @@ -195,19 +191,19 @@ def items(self) -> Iterator[tuple[DatasetType, _T]]: raise RuntimeError("cannot call items() if cache is not full") return iter(self._by_name_cache.values()) - def add_by_dimensions(self, dimensions: DimensionGroup, extra: _U) -> None: + def add_by_dimensions(self, dimensions: DimensionGroup, tables: DynamicTables) -> None: """Add information about a set of dataset type dimensions to the cache. Parameters ---------- dimensions : `DimensionGroup` Dimensions of one or more dataset types. - extra : `Any` + tables : `DynamicTables` Additional opaque object stored with these dimensions. """ - self._by_dimensions_cache[dimensions] = extra + self._by_dimensions_cache[dimensions] = tables - def get_by_dimensions(self, dimensions: DimensionGroup) -> _U | None: + def get_by_dimensions(self, dimensions: DimensionGroup) -> DynamicTables | None: """Get information about a set of dataset type dimensions. Parameters @@ -217,13 +213,13 @@ def get_by_dimensions(self, dimensions: DimensionGroup) -> _U | None: Returns ------- - extra : `Any` or `None` + tables : `DynamicTables` or `None` Additional opaque object stored with these dimensions, or `None` if these dimensions are not present in the cache. """ return self._by_dimensions_cache.get(dimensions) - def by_dimensions_items(self) -> Iterator[tuple[DimensionGroup, _U]]: + def by_dimensions_items(self) -> Iterator[tuple[DimensionGroup, DynamicTables]]: """Return iterator for all dimensions-keyed data in the cache. This can only be called if `dimensions_full` is `True`. diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py index 590e4b7702..e8b9ff7222 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py @@ -6,7 +6,7 @@ import datetime import logging from collections.abc import Iterable, Mapping, Sequence, Set -from typing import TYPE_CHECKING, Any, ClassVar, cast +from typing import TYPE_CHECKING, Any, ClassVar import astropy.time import sqlalchemy @@ -24,7 +24,7 @@ from ....dimensions import DataCoordinate, DimensionGroup, DimensionUniverse from ....direct_query_driver import SqlJoinsBuilder, SqlSelectBuilder # new query system, server+direct only from ....queries import tree as qt # new query system, both clients + server -from ..._caching_context import CachingContext, GenericCachingContext +from ..._caching_context import CachingContext from ..._collection_summary import CollectionSummary from ..._exceptions import ConflictingDefinitionError, DatasetTypeExpressionError, OrphanedRecordError from ...interfaces import DatasetRecordStorageManager, RunRecord, VersionTuple @@ -155,7 +155,7 @@ def __init__( self._dimensions = dimensions self._static = static self._summaries = summaries - self._caching_context = cast(GenericCachingContext[int, DynamicTables], caching_context) + self._caching_context = caching_context self._use_astropy_ingest_date = self.ingest_date_dtype() is ddl.AstropyTimeNsecTai self._run_key_column = collections.getRunForeignKeyName() From c5ae328fcbeddec4b72e90d62aa11f7ed8e2fee3 Mon Sep 17 00:00:00 2001 From: "David H. Irving" Date: Wed, 4 Dec 2024 14:55:39 -0700 Subject: [PATCH 02/10] Centralize tags table cache In preparation for sharing DatasetTypeCache between threads, make its inner DynamicTables values immutable. The mutable portion moved to a separate cache inside DatasetTypeCache. As a side effect, this reduces the number of times we go to the DB to check for the existence of tag and calib tables. --- .../butler/registry/_dataset_type_cache.py | 3 + .../datasets/byDimensions/_manager.py | 51 +++++---- .../registry/datasets/byDimensions/tables.py | 106 ++++++++++++------ 3 files changed, 105 insertions(+), 55 deletions(-) diff --git a/python/lsst/daf/butler/registry/_dataset_type_cache.py b/python/lsst/daf/butler/registry/_dataset_type_cache.py index 17569b4058..05d3b27b35 100644 --- a/python/lsst/daf/butler/registry/_dataset_type_cache.py +++ b/python/lsst/daf/butler/registry/_dataset_type_cache.py @@ -57,6 +57,9 @@ class DatasetTypeCache: """ def __init__(self) -> None: + from .datasets.byDimensions.tables import DynamicTablesCache + + self.tables = DynamicTablesCache() self._by_name_cache: dict[str, tuple[DatasetType, int]] = {} self._by_dimensions_cache: dict[DimensionGroup, DynamicTables] = {} self._full = False diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py index e8b9ff7222..6f554f0373 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py @@ -79,7 +79,7 @@ def update_dynamic_tables(self, current: DynamicTables) -> DynamicTables: else: # Some previously-cached dataset type had the same dimensions # but was not a calibration. - current.calibs_name = self.calib_table_name + current = current.copy(calibs_name=self.calib_table_name) # If some previously-cached dataset type was a calibration but this # one isn't, we don't want to forget the calibs table. return current @@ -326,9 +326,13 @@ def register_dataset_type(self, dataset_type: DatasetType) -> bool: dynamic_tables = DynamicTables.from_dimensions_key( dataset_type.dimensions, dimensions_key, dataset_type.isCalibration() ) - dynamic_tables.create(self._db, type(self._collections)) + dynamic_tables.create( + self._db, type(self._collections), self._caching_context.dataset_types.tables + ) elif dataset_type.isCalibration() and dynamic_tables.calibs_name is None: - dynamic_tables.add_calibs(self._db, type(self._collections)) + dynamic_tables = dynamic_tables.add_calibs( + self._db, type(self._collections), self._caching_context.dataset_types.tables + ) row, inserted = self._db.sync( self._static.dataset_type, keys={"name": dataset_type.name}, @@ -454,7 +458,7 @@ def getDatasetRef(self, id: DatasetId) -> DatasetRef | None: # This query could return multiple rows (one for each tagged # collection the dataset is in, plus one for its run collection), # and we don't care which of those we get. - tags_table = dynamic_tables.tags(self._db, type(self._collections)) + tags_table = self._get_tags_table(dynamic_tables) data_id_sql = ( tags_table.select() .where( @@ -553,9 +557,10 @@ def _fetch_dataset_types(self) -> list[DatasetType]: for record in records: cache_data.append((record.dataset_type, record.dataset_type_id)) if (dynamic_tables := cache_dimensions_data.get(record.dataset_type.dimensions)) is None: - cache_dimensions_data[record.dataset_type.dimensions] = record.make_dynamic_tables() + tables = record.make_dynamic_tables() else: - record.update_dynamic_tables(dynamic_tables) + tables = record.update_dynamic_tables(dynamic_tables) + cache_dimensions_data[record.dataset_type.dimensions] = tables self._caching_context.dataset_types.set( cache_data, full=True, dimensions_data=cache_dimensions_data.items(), dimensions_full=True ) @@ -684,7 +689,7 @@ def insert( for dataId, row in zip(data_id_list, rows, strict=True) ] # Insert those rows into the tags table. - self._db.insert(storage.dynamic_tables.tags(self._db, type(self._collections)), *tagsRows) + self._db.insert(self._get_tags_table(storage.dynamic_tables), *tagsRows) return [ DatasetRef( @@ -767,9 +772,7 @@ def import_( summary.add_datasets(refs) self._summaries.update(run, [storage.dataset_type_id], summary) # Copy from temp table into tags table. - self._db.insert( - storage.dynamic_tables.tags(self._db, type(self._collections)), select=tmp_tags.select() - ) + self._db.insert(self._get_tags_table(storage.dynamic_tables), select=tmp_tags.select()) return refs def _validate_import( @@ -793,7 +796,7 @@ def _validate_import( Raise if new datasets conflict with existing ones. """ dataset = self._static.dataset - tags = storage.dynamic_tables.tags(self._db, type(self._collections)) + tags = self._get_tags_table(storage.dynamic_tables) collection_fkey_name = self._collections.getCollectionForeignKeyName() # Check that existing datasets have the same dataset type and @@ -943,7 +946,7 @@ def associate( # inserted there. self._summaries.update(collection, [storage.dataset_type_id], summary) # Update the tag table itself. - self._db.replace(storage.dynamic_tables.tags(self._db, type(self._collections)), *rows) + self._db.replace(self._get_tags_table(storage.dynamic_tables), *rows) def disassociate( self, dataset_type: DatasetType, collection: CollectionRecord, datasets: Iterable[DatasetRef] @@ -964,7 +967,7 @@ def disassociate( for dataset in datasets ] self._db.delete( - storage.dynamic_tables.tags(self._db, type(self._collections)), + self._get_tags_table(storage.dynamic_tables), ["dataset_id", self._collections.getCollectionForeignKeyName()], *rows, ) @@ -1015,7 +1018,7 @@ def certify( # inserted there. self._summaries.update(collection, [storage.dataset_type_id], summary) # Update the association table itself. - calibs_table = storage.dynamic_tables.calibs(self._db, type(self._collections)) + calibs_table = self._get_calibs_table(storage.dynamic_tables) if TimespanReprClass.hasExclusionConstraint(): # Rely on database constraint to enforce invariants; we just # reraise the exception for consistency across DB engines. @@ -1099,7 +1102,7 @@ def decertify( rows_to_insert = [] # Acquire a table lock to ensure there are no concurrent writes # between the SELECT and the DELETE and INSERT queries based on it. - calibs_table = storage.dynamic_tables.calibs(self._db, type(self._collections)) + calibs_table = self._get_calibs_table(storage.dynamic_tables) with self._db.transaction(lock=[calibs_table], savepoint=True): # Enter SqlQueryContext in case we need to use a temporary table to # include the give data IDs in the query (see similar block in @@ -1186,7 +1189,7 @@ def make_relation( tag_relation: Relation | None = None calib_relation: Relation | None = None if collection_types != {CollectionType.CALIBRATION}: - tags_table = storage.dynamic_tables.tags(self._db, type(self._collections)) + tags_table = self._get_tags_table(storage.dynamic_tables) # We'll need a subquery for the tags table if any of the given # collections are not a CALIBRATION collection. This intentionally # also fires when the list of collections is empty as a way to @@ -1214,7 +1217,7 @@ def make_relation( # If at least one collection is a CALIBRATION collection, we'll # need a subquery for the calibs table, and could include the # timespan as a result or constraint. - calibs_table = storage.dynamic_tables.calibs(self._db, type(self._collections)) + calibs_table = self._get_calibs_table(storage.dynamic_tables) calibs_parts = sql.Payload[LogicalColumn](calibs_table.alias(f"{dataset_type.name}_calibs")) if "timespan" in columns: calibs_parts.columns_available[DatasetColumnTag(dataset_type.name, "timespan")] = ( @@ -1422,7 +1425,7 @@ def make_joins_builder( # create a dummy subquery that we know will fail. # We give the table an alias because it might appear multiple times # in the same query, for different dataset types. - tags_table = storage.dynamic_tables.tags(self._db, type(self._collections)).alias( + tags_table = self._get_tags_table(storage.dynamic_tables).alias( f"{dataset_type.name}_tags{'_union' if is_union else ''}" ) tags_builder = self._finish_query_builder( @@ -1441,7 +1444,7 @@ def make_joins_builder( # If at least one collection is a CALIBRATION collection, we'll # need a subquery for the calibs table, and could include the # timespan as a result or constraint. - calibs_table = storage.dynamic_tables.calibs(self._db, type(self._collections)).alias( + calibs_table = self._get_calibs_table(storage.dynamic_tables).alias( f"{dataset_type.name}_calibs{'_union' if is_union else ''}" ) calibs_builder = self._finish_query_builder( @@ -1616,14 +1619,14 @@ def refresh_collection_summaries(self, dataset_type: DatasetType) -> None: # Query datasets tables for associated collections. column_name = self._collections.getCollectionForeignKeyName() - tags_table = storage.dynamic_tables.tags(self._db, type(self._collections)) + tags_table = self._get_tags_table(storage.dynamic_tables) query: sqlalchemy.sql.expression.SelectBase = ( sqlalchemy.select(tags_table.columns[column_name]) .where(tags_table.columns.dataset_type_id == storage.dataset_type_id) .distinct() ) if dataset_type.isCalibration(): - calibs_table = storage.dynamic_tables.calibs(self._db, type(self._collections)) + calibs_table = self._get_calibs_table(storage.dynamic_tables) query2 = ( sqlalchemy.select(calibs_table.columns[column_name]) .where(calibs_table.columns.dataset_type_id == storage.dataset_type_id) @@ -1637,6 +1640,12 @@ def refresh_collection_summaries(self, dataset_type: DatasetType) -> None: collections_to_delete = summary_collection_ids - collection_ids self._summaries.delete_collections(storage.dataset_type_id, collections_to_delete) + def _get_tags_table(self, table: DynamicTables) -> sqlalchemy.Table: + return table.tags(self._db, type(self._collections), self._caching_context.dataset_types.tables) + + def _get_calibs_table(self, table: DynamicTables) -> sqlalchemy.Table: + return table.calibs(self._db, type(self._collections), self._caching_context.dataset_types.tables) + def _create_case_expression_for_collections( collections: Iterable[CollectionRecord], id_column: sqlalchemy.ColumnElement diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/tables.py b/python/lsst/daf/butler/registry/datasets/byDimensions/tables.py index 029997ba64..7254c8d104 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/tables.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/tables.py @@ -38,11 +38,13 @@ ) from collections import namedtuple -from typing import Any +from typing import Any, TypeAlias import sqlalchemy +from lsst.utils.classes import immutable from .... import ddl +from ...._utilities.thread_safe_cache import ThreadSafeCache from ....dimensions import DimensionGroup, DimensionUniverse, GovernorDimension, addDimensionForeignKey from ....timespan_database_representation import TimespanDatabaseRepresentation from ...interfaces import CollectionManager, Database, VersionTuple @@ -449,10 +451,17 @@ def makeCalibTableSpec( return tableSpec +DynamicTablesCache: TypeAlias = ThreadSafeCache[str, sqlalchemy.Table] + + +@immutable class DynamicTables: """A struct that holds the "dynamic" tables common to dataset types that share the same dimensions. + Objects of this class may be shared between multiple threads, so it must be + immutable to prevent concurrency issues. + Parameters ---------- dimensions : `DimensionGroup` @@ -477,8 +486,9 @@ def __init__( self.dimensions_key = dimensions_key self.tags_name = tags_name self.calibs_name = calibs_name - self._tags_table: sqlalchemy.Table | None = None - self._calibs_table: sqlalchemy.Table | None = None + + def copy(self, calibs_name: str) -> DynamicTables: + return DynamicTables(self._dimensions, self.dimensions_key, self.tags_name, calibs_name) @classmethod def from_dimensions_key( @@ -509,7 +519,7 @@ def from_dimensions_key( calibs_name=makeCalibTableName(dimensions_key) if is_calibration else None, ) - def create(self, db: Database, collections: type[CollectionManager]) -> None: + def create(self, db: Database, collections: type[CollectionManager], cache: DynamicTablesCache) -> None: """Create the tables if they don't already exist. Parameters @@ -519,19 +529,30 @@ def create(self, db: Database, collections: type[CollectionManager]) -> None: collections : `type` [ `CollectionManager` ] Manager class for collections; used to create foreign key columns for collections. + cache : `DynamicTablesCache` + Cache used to store sqlalchemy Table objects. """ - if self._tags_table is None: - self._tags_table = db.ensureTableExists( + if cache.get(self.tags_name) is None: + cache.set_or_get( self.tags_name, - makeTagTableSpec(self._dimensions, collections), + db.ensureTableExists( + self.tags_name, + makeTagTableSpec(self._dimensions, collections), + ), ) - if self.calibs_name is not None and self._calibs_table is None: - self._calibs_table = db.ensureTableExists( + + if self.calibs_name is not None and cache.get(self.calibs_name) is None: + cache.set_or_get( self.calibs_name, - makeCalibTableSpec(self._dimensions, collections, db.getTimespanRepresentation()), + db.ensureTableExists( + self.calibs_name, + makeCalibTableSpec(self._dimensions, collections, db.getTimespanRepresentation()), + ), ) - def add_calibs(self, db: Database, collections: type[CollectionManager]) -> None: + def add_calibs( + self, db: Database, collections: type[CollectionManager], cache: DynamicTablesCache + ) -> DynamicTables: """Create a calibs table for a dataset type whose dimensions already have a tags table. @@ -542,14 +563,23 @@ def add_calibs(self, db: Database, collections: type[CollectionManager]) -> None collections : `type` [ `CollectionManager` ] Manager class for collections; used to create foreign key columns for collections. + cache : `DynamicTablesCache` + Cache used to store sqlalchemy Table objects. """ - self.calibs_name = makeCalibTableName(self.dimensions_key) - self._calibs_table = db.ensureTableExists( - self.calibs_name, - makeCalibTableSpec(self._dimensions, collections, db.getTimespanRepresentation()), + calibs_name = makeCalibTableName(self.dimensions_key) + cache.set_or_get( + calibs_name, + db.ensureTableExists( + calibs_name, + makeCalibTableSpec(self._dimensions, collections, db.getTimespanRepresentation()), + ), ) - def tags(self, db: Database, collections: type[CollectionManager]) -> sqlalchemy.Table: + return self.copy(calibs_name=calibs_name) + + def tags( + self, db: Database, collections: type[CollectionManager], cache: DynamicTablesCache + ) -> sqlalchemy.Table: """Return the "tags" table that associates datasets with data IDs in TAGGED and RUN collections. @@ -563,21 +593,27 @@ def tags(self, db: Database, collections: type[CollectionManager]) -> sqlalchemy collections : `type` [ `CollectionManager` ] Manager class for collections; used to create foreign key columns for collections. + cache : `DynamicTablesCache` + Cache used to store sqlalchemy Table objects. Returns ------- table : `sqlalchemy.Table` SQLAlchemy table object. """ - if self._tags_table is None: - spec = makeTagTableSpec(self._dimensions, collections) - table = db.getExistingTable(self.tags_name, spec) - if table is None: - raise MissingDatabaseTableError(f"Table {self.tags_name!r} is missing from database schema.") - self._tags_table = table - return self._tags_table - - def calibs(self, db: Database, collections: type[CollectionManager]) -> sqlalchemy.Table: + table = cache.get(self.tags_name) + if table is not None: + return table + + spec = makeTagTableSpec(self._dimensions, collections) + table = db.getExistingTable(self.tags_name, spec) + if table is None: + raise MissingDatabaseTableError(f"Table {self.tags_name!r} is missing from database schema.") + return cache.set_or_get(self.tags_name, table) + + def calibs( + self, db: Database, collections: type[CollectionManager], cache: DynamicTablesCache + ) -> sqlalchemy.Table: """Return the "calibs" table that associates datasets with data IDs and timespans in CALIBRATION collections. @@ -592,6 +628,8 @@ def calibs(self, db: Database, collections: type[CollectionManager]) -> sqlalche collections : `type` [ `CollectionManager` ] Manager class for collections; used to create foreign key columns for collections. + cache : `DynamicTablesCache` + Cache used to store sqlalchemy Table objects. Returns ------- @@ -601,12 +639,12 @@ def calibs(self, db: Database, collections: type[CollectionManager]) -> sqlalche assert ( self.calibs_name is not None ), "Dataset type should be checked to be calibration by calling code." - if self._calibs_table is None: - spec = makeCalibTableSpec(self._dimensions, collections, db.getTimespanRepresentation()) - table = db.getExistingTable(self.calibs_name, spec) - if table is None: - raise MissingDatabaseTableError( - f"Table {self.calibs_name!r} is missing from database schema." - ) - self._calibs_table = table - return self._calibs_table + table = cache.get(self.calibs_name) + if table is not None: + return table + + spec = makeCalibTableSpec(self._dimensions, collections, db.getTimespanRepresentation()) + table = db.getExistingTable(self.calibs_name, spec) + if table is None: + raise MissingDatabaseTableError(f"Table {self.calibs_name!r} is missing from database schema.") + return cache.set_or_get(self.calibs_name, table) From 6161b8963ef901811d3da917cb2505e227f7289e Mon Sep 17 00:00:00 2001 From: "David H. Irving" Date: Wed, 4 Dec 2024 15:30:04 -0700 Subject: [PATCH 03/10] Remove DatasetTypeCache from CachingContext This cache is used only by a single manager, and has never participated in the caching enable/disable logic associated with CachingContext. Getting it out of CachingContext encapsulates its creation and use within a single manager class, simplifying upcoming changes. This also removes some unused branches from the code. --- .../daf/butler/registry/_caching_context.py | 10 -- .../byDimensions}/_dataset_type_cache.py | 11 +- .../datasets/byDimensions/_manager.py | 122 +++++++----------- 3 files changed, 53 insertions(+), 90 deletions(-) rename python/lsst/daf/butler/registry/{ => datasets/byDimensions}/_dataset_type_cache.py (96%) diff --git a/python/lsst/daf/butler/registry/_caching_context.py b/python/lsst/daf/butler/registry/_caching_context.py index 631f3ab278..7b021a8516 100644 --- a/python/lsst/daf/butler/registry/_caching_context.py +++ b/python/lsst/daf/butler/registry/_caching_context.py @@ -31,7 +31,6 @@ from ._collection_record_cache import CollectionRecordCache from ._collection_summary_cache import CollectionSummaryCache -from ._dataset_type_cache import DatasetTypeCache class CachingContext: @@ -46,13 +45,9 @@ class CachingContext: instances which will be `None` when caching is disabled. Instance of this class is passed to the relevant managers that can use it to query or populate caches when caching is enabled. - - Dataset type cache is always enabled for now, this avoids the need for - explicitly enabling caching in pipetask executors. """ def __init__(self) -> None: - self._dataset_types: DatasetTypeCache = DatasetTypeCache() self._collection_records: CollectionRecordCache | None = None self._collection_summaries: CollectionSummaryCache | None = None self._depth = 0 @@ -96,8 +91,3 @@ def collection_records(self) -> CollectionRecordCache | None: def collection_summaries(self) -> CollectionSummaryCache | None: """Cache for collection summary records (`CollectionSummaryCache`).""" return self._collection_summaries - - @property - def dataset_types(self) -> DatasetTypeCache: - """Cache for dataset types, never disabled (`DatasetTypeCache`).""" - return self._dataset_types diff --git a/python/lsst/daf/butler/registry/_dataset_type_cache.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py similarity index 96% rename from python/lsst/daf/butler/registry/_dataset_type_cache.py rename to python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py index 05d3b27b35..1865736919 100644 --- a/python/lsst/daf/butler/registry/_dataset_type_cache.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py @@ -30,13 +30,10 @@ __all__ = ("DatasetTypeCache",) from collections.abc import Iterable, Iterator -from typing import TYPE_CHECKING -from .._dataset_type import DatasetType -from ..dimensions import DimensionGroup - -if TYPE_CHECKING: - from .datasets.byDimensions.tables import DynamicTables +from ...._dataset_type import DatasetType +from ....dimensions import DimensionGroup +from .tables import DynamicTables, DynamicTablesCache class DatasetTypeCache: @@ -57,8 +54,6 @@ class DatasetTypeCache: """ def __init__(self) -> None: - from .datasets.byDimensions.tables import DynamicTablesCache - self.tables = DynamicTablesCache() self._by_name_cache: dict[str, tuple[DatasetType, int]] = {} self._by_dimensions_cache: dict[DimensionGroup, DynamicTables] = {} diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py index 6f554f0373..e7708a8614 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py @@ -30,6 +30,7 @@ from ...interfaces import DatasetRecordStorageManager, RunRecord, VersionTuple from ...queries import SqlQueryContext # old registry query system from ...wildcards import DatasetTypeWildcard +from ._dataset_type_cache import DatasetTypeCache from .summaries import CollectionSummaryManager from .tables import DynamicTables, addDatasetForeignKey, makeStaticTableSpecs, makeTagTableSpec @@ -132,8 +133,6 @@ class ByDimensionsDatasetRecordStorageManagerUUID(DatasetRecordStorageManager): tables used by this class. summaries : `CollectionSummaryManager` Structure containing tables that summarize the contents of collections. - caching_context : `CachingContext` - Object controlling caching of information returned by managers. registry_schema_version : `VersionTuple` or `None`, optional Version of registry schema. """ @@ -146,7 +145,6 @@ def __init__( dimensions: DimensionRecordStorageManager, static: StaticDatasetTablesTuple, summaries: CollectionSummaryManager, - caching_context: CachingContext, registry_schema_version: VersionTuple | None = None, ): super().__init__(registry_schema_version=registry_schema_version) @@ -155,7 +153,7 @@ def __init__( self._dimensions = dimensions self._static = static self._summaries = summaries - self._caching_context = caching_context + self._cache = DatasetTypeCache() self._use_astropy_ingest_date = self.ingest_date_dtype() is ddl.AstropyTimeNsecTai self._run_key_column = collections.getRunForeignKeyName() @@ -196,7 +194,6 @@ def initialize( dimensions=dimensions, static=static, summaries=summaries, - caching_context=caching_context, registry_schema_version=registry_schema_version, ) @@ -272,14 +269,12 @@ def clone( dimensions=dimensions, static=self._static, summaries=self._summaries.clone(db=db, collections=collections, caching_context=caching_context), - caching_context=caching_context, registry_schema_version=self._registry_schema_version, ) def refresh(self) -> None: # Docstring inherited from DatasetRecordStorageManager. - if self._caching_context.dataset_types is not None: - self._caching_context.dataset_types.clear() + self._cache.clear() def remove_dataset_type(self, name: str) -> None: # Docstring inherited from DatasetRecordStorageManager. @@ -317,21 +312,15 @@ def register_dataset_type(self, dataset_type: DatasetType) -> bool: ) record = self._fetch_dataset_type_record(dataset_type.name) if record is None: - if ( - dynamic_tables := self._caching_context.dataset_types.get_by_dimensions( - dataset_type.dimensions - ) - ) is None: + if (dynamic_tables := self._cache.get_by_dimensions(dataset_type.dimensions)) is None: dimensions_key = self._dimensions.save_dimension_group(dataset_type.dimensions) dynamic_tables = DynamicTables.from_dimensions_key( dataset_type.dimensions, dimensions_key, dataset_type.isCalibration() ) - dynamic_tables.create( - self._db, type(self._collections), self._caching_context.dataset_types.tables - ) + dynamic_tables.create(self._db, type(self._collections), self._cache.tables) elif dataset_type.isCalibration() and dynamic_tables.calibs_name is None: dynamic_tables = dynamic_tables.add_calibs( - self._db, type(self._collections), self._caching_context.dataset_types.tables + self._db, type(self._collections), self._cache.tables ) row, inserted = self._db.sync( self._static.dataset_type, @@ -351,9 +340,9 @@ def register_dataset_type(self, dataset_type: DatasetType) -> bool: returning=["id", "tag_association_table"], ) # Make sure that cache is updated - if self._caching_context.dataset_types is not None and row is not None: - self._caching_context.dataset_types.add(dataset_type, row["id"]) - self._caching_context.dataset_types.add_by_dimensions(dataset_type.dimensions, dynamic_tables) + if row is not None: + self._cache.add(dataset_type, row["id"]) + self._cache.add_by_dimensions(dataset_type.dimensions, dynamic_tables) else: if dataset_type != record.dataset_type: raise ConflictingDefinitionError( @@ -438,22 +427,15 @@ def getDatasetRef(self, id: DatasetId) -> DatasetRef | None: run = row[self._run_key_column] record = self._record_from_row(row) dynamic_tables: DynamicTables | None = None - if self._caching_context.dataset_types is not None: - _, dataset_type_id = self._caching_context.dataset_types.get(record.dataset_type.name) - if dataset_type_id is None: - if self._caching_context.dataset_types is not None: - self._caching_context.dataset_types.add(record.dataset_type, record.dataset_type_id) - else: - assert record.dataset_type_id == dataset_type_id, "Two IDs for the same dataset type name!" - dynamic_tables = self._caching_context.dataset_types.get_by_dimensions( - record.dataset_type.dimensions - ) + _, dataset_type_id = self._cache.get(record.dataset_type.name) + if dataset_type_id is None: + self._cache.add(record.dataset_type, record.dataset_type_id) + else: + assert record.dataset_type_id == dataset_type_id, "Two IDs for the same dataset type name!" + dynamic_tables = self._cache.get_by_dimensions(record.dataset_type.dimensions) if dynamic_tables is None: dynamic_tables = record.make_dynamic_tables() - if self._caching_context.dataset_types is not None: - self._caching_context.dataset_types.add_by_dimensions( - record.dataset_type.dimensions, dynamic_tables - ) + self._cache.add_by_dimensions(record.dataset_type.dimensions, dynamic_tables) if record.dataset_type.dimensions: # This query could return multiple rows (one for each tagged # collection the dataset is in, plus one for its run collection), @@ -544,61 +526,57 @@ def _fetch_dataset_types(self) -> list[DatasetType]: # whenever a dataset type is added to the cache by name, to reduce the # number of possible states the cache can be in and minimize the number # of queries. - if self._caching_context.dataset_types is not None: - if self._caching_context.dataset_types.full: - return [dataset_type for dataset_type, _ in self._caching_context.dataset_types.items()] + if self._cache.full: + return [dataset_type for dataset_type, _ in self._cache.items()] with self._db.query(self._static.dataset_type.select()) as sql_result: sql_rows = sql_result.mappings().fetchall() records = [self._record_from_row(row) for row in sql_rows] # Cache everything and specify that cache is complete. - if self._caching_context.dataset_types is not None: - cache_data: list[tuple[DatasetType, int]] = [] - cache_dimensions_data: dict[DimensionGroup, DynamicTables] = {} - for record in records: - cache_data.append((record.dataset_type, record.dataset_type_id)) - if (dynamic_tables := cache_dimensions_data.get(record.dataset_type.dimensions)) is None: - tables = record.make_dynamic_tables() - else: - tables = record.update_dynamic_tables(dynamic_tables) - cache_dimensions_data[record.dataset_type.dimensions] = tables - self._caching_context.dataset_types.set( - cache_data, full=True, dimensions_data=cache_dimensions_data.items(), dimensions_full=True - ) + cache_data: list[tuple[DatasetType, int]] = [] + cache_dimensions_data: dict[DimensionGroup, DynamicTables] = {} + for record in records: + cache_data.append((record.dataset_type, record.dataset_type_id)) + if (dynamic_tables := cache_dimensions_data.get(record.dataset_type.dimensions)) is None: + tables = record.make_dynamic_tables() + else: + tables = record.update_dynamic_tables(dynamic_tables) + cache_dimensions_data[record.dataset_type.dimensions] = tables + self._cache.set( + cache_data, full=True, dimensions_data=cache_dimensions_data.items(), dimensions_full=True + ) return [record.dataset_type for record in records] def _find_storage(self, name: str) -> _DatasetRecordStorage: """Find a dataset type and the extra information needed to work with it, utilizing and populating the cache as needed. """ - if self._caching_context.dataset_types is not None: - dataset_type, dataset_type_id = self._caching_context.dataset_types.get(name) + dataset_type, dataset_type_id = self._cache.get(name) + if dataset_type is not None: + tables = self._cache.get_by_dimensions(dataset_type.dimensions) + assert ( + dataset_type_id is not None and tables is not None + ), "Dataset type cache population is incomplete." + return _DatasetRecordStorage( + dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables + ) + else: + # On the first cache miss populate the cache with complete list + # of dataset types (if it was not done yet). + if not self._cache.full: + self._fetch_dataset_types() + # Try again + dataset_type, dataset_type_id = self._cache.get(name) if dataset_type is not None: - tables = self._caching_context.dataset_types.get_by_dimensions(dataset_type.dimensions) + tables = self._cache.get_by_dimensions(dataset_type.dimensions) assert ( dataset_type_id is not None and tables is not None ), "Dataset type cache population is incomplete." return _DatasetRecordStorage( dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables ) - else: - # On the first cache miss populate the cache with complete list - # of dataset types (if it was not done yet). - if not self._caching_context.dataset_types.full: - self._fetch_dataset_types() - # Try again - dataset_type, dataset_type_id = self._caching_context.dataset_types.get(name) - if dataset_type is not None: - tables = self._caching_context.dataset_types.get_by_dimensions(dataset_type.dimensions) - assert ( - dataset_type_id is not None and tables is not None - ), "Dataset type cache population is incomplete." - return _DatasetRecordStorage( - dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables - ) record = self._fetch_dataset_type_record(name) if record is not None: - if self._caching_context.dataset_types is not None: - self._caching_context.dataset_types.add(record.dataset_type, record.dataset_type_id) + self._cache.add(record.dataset_type, record.dataset_type_id) return _DatasetRecordStorage( record.dataset_type, record.dataset_type_id, record.make_dynamic_tables() ) @@ -1641,10 +1619,10 @@ def refresh_collection_summaries(self, dataset_type: DatasetType) -> None: self._summaries.delete_collections(storage.dataset_type_id, collections_to_delete) def _get_tags_table(self, table: DynamicTables) -> sqlalchemy.Table: - return table.tags(self._db, type(self._collections), self._caching_context.dataset_types.tables) + return table.tags(self._db, type(self._collections), self._cache.tables) def _get_calibs_table(self, table: DynamicTables) -> sqlalchemy.Table: - return table.calibs(self._db, type(self._collections), self._caching_context.dataset_types.tables) + return table.calibs(self._db, type(self._collections), self._cache.tables) def _create_case_expression_for_collections( From d15e0bcd65bfa1f8bf3ba760d688dc8ef993b1e0 Mon Sep 17 00:00:00 2001 From: "David H. Irving" Date: Wed, 4 Dec 2024 17:07:20 -0700 Subject: [PATCH 04/10] Preload dataset type cache in Butler server Pre-fetch dataset types the first time a repository is accessed in Butler server, to avoid the need to re-fetch them in most later operations. --- .../byDimensions/_dataset_type_cache.py | 29 +++++++++++++++++++ .../datasets/byDimensions/_manager.py | 11 ++++++- .../butler/registry/interfaces/_datasets.py | 7 +++++ .../lsst/daf/butler/registry/sql_registry.py | 1 + 4 files changed, 47 insertions(+), 1 deletion(-) diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py index 1865736919..75e5c1596e 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py @@ -60,6 +60,35 @@ def __init__(self) -> None: self._full = False self._dimensions_full = False + def clone(self) -> DatasetTypeCache: + """Make a copy of the caches that are safe to use in another thread. + + Notes + ----- + After cloning, the ``tables`` cache will be shared between the new + instance and the current instance. It is safe to read and update + ``tables`` from multiple threads simultaneously -- the cached values + are immutable table schemas, and they are looked up one at a time by + name. + + The other caches are copied, because their access patterns are more + complex. + + ``full`` and ``dimensions_full`` will initially return `False` in the + new instance. This preserves the invariant that a Butler is able to + see any changes to the database made before the Butler is instantiated. + The downside is that the cloned cache will have to be re-fetched before + it can be used for glob searches. + """ + clone = DatasetTypeCache() + # Share DynamicTablesCache between instances. + clone.tables = self.tables + # The inner key/value objects are immutable in both of these caches, so + # we can shallow-copy the dicts. + clone._by_name_cache = self._by_name_cache.copy() + clone._by_dimensions_cache = self._by_dimensions_cache.copy() + return clone + @property def full(self) -> bool: """`True` if cache holds all known dataset types (`bool`).""" diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py index e7708a8614..288c452fbe 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py @@ -135,6 +135,8 @@ class ByDimensionsDatasetRecordStorageManagerUUID(DatasetRecordStorageManager): Structure containing tables that summarize the contents of collections. registry_schema_version : `VersionTuple` or `None`, optional Version of registry schema. + _cache : `None`, optional + For internal use only. """ def __init__( @@ -146,6 +148,7 @@ def __init__( static: StaticDatasetTablesTuple, summaries: CollectionSummaryManager, registry_schema_version: VersionTuple | None = None, + _cache: DatasetTypeCache | None = None, ): super().__init__(registry_schema_version=registry_schema_version) self._db = db @@ -153,7 +156,7 @@ def __init__( self._dimensions = dimensions self._static = static self._summaries = summaries - self._cache = DatasetTypeCache() + self._cache = _cache if _cache is not None else DatasetTypeCache() self._use_astropy_ingest_date = self.ingest_date_dtype() is ddl.AstropyTimeNsecTai self._run_key_column = collections.getRunForeignKeyName() @@ -270,6 +273,9 @@ def clone( static=self._static, summaries=self._summaries.clone(db=db, collections=collections, caching_context=caching_context), registry_schema_version=self._registry_schema_version, + # See notes on DatasetTypeCache.clone() about cache behavior after + # cloning. + _cache=self._cache.clone(), ) def refresh(self) -> None: @@ -502,6 +508,9 @@ def _record_from_row(self, row: Mapping) -> _DatasetTypeRecord: def _dataset_type_from_row(self, row: Mapping) -> DatasetType: return self._record_from_row(row).dataset_type + def preload_cache(self) -> None: + self._fetch_dataset_types() + def _fetch_dataset_types(self) -> list[DatasetType]: """Fetch list of all defined dataset types.""" # This is one of three places we populate the dataset type cache: diff --git a/python/lsst/daf/butler/registry/interfaces/_datasets.py b/python/lsst/daf/butler/registry/interfaces/_datasets.py index 2ace8b61ad..41f1ccef39 100644 --- a/python/lsst/daf/butler/registry/interfaces/_datasets.py +++ b/python/lsst/daf/butler/registry/interfaces/_datasets.py @@ -103,6 +103,13 @@ def clone( """ raise NotImplementedError() + @abstractmethod + def preload_cache(self) -> None: + """Fetch data from the database and use it to pre-populate caches to + speed up later operations. + """ + raise NotImplementedError() + @classmethod @abstractmethod def initialize( diff --git a/python/lsst/daf/butler/registry/sql_registry.py b/python/lsst/daf/butler/registry/sql_registry.py index 5365db1733..4e4c53255b 100644 --- a/python/lsst/daf/butler/registry/sql_registry.py +++ b/python/lsst/daf/butler/registry/sql_registry.py @@ -2485,6 +2485,7 @@ def make_datastore_tables(self, tables: Mapping[str, DatastoreOpaqueTable]) -> N def preload_cache(self) -> None: """Immediately load caches that are used for common operations.""" self.dimension_record_cache.preload_cache() + self._managers.datasets.preload_cache() @property def obsCoreTableManager(self) -> ObsCoreTableManager | None: From efa37119190639dd5413e9aa82da96bec61aff77 Mon Sep 17 00:00:00 2001 From: "David H. Irving" Date: Mon, 9 Dec 2024 17:02:13 -0700 Subject: [PATCH 05/10] Use dataset type preload throughout unit tests Trigger dataset type preload the first time a connection is made to the Butler server in each unit test, to better match the conditions that will exist in the actual server. --- .../lsst/daf/butler/_labeled_butler_factory.py | 9 ++++----- .../daf/butler/direct_butler/_direct_butler.py | 4 ++-- python/lsst/daf/butler/registry/sql_registry.py | 16 +++++++++++++--- python/lsst/daf/butler/tests/server.py | 2 +- 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/python/lsst/daf/butler/_labeled_butler_factory.py b/python/lsst/daf/butler/_labeled_butler_factory.py index 141ddc0eee..7de3bec32b 100644 --- a/python/lsst/daf/butler/_labeled_butler_factory.py +++ b/python/lsst/daf/butler/_labeled_butler_factory.py @@ -88,7 +88,7 @@ def __init__(self, repositories: Mapping[str, str] | None = None) -> None: self._initialization_locks = NamedLocks() # This may be overridden by unit tests. - self._preload_direct_butler_cache = True + self._preload_unsafe_direct_butler_caches = True def bind(self, access_token: str | None) -> LabeledButlerFactoryProtocol: """Create a callable factory function for generating Butler instances @@ -161,7 +161,7 @@ def _create_butler_factory_function(self, label: str) -> _FactoryFunction: match butler_type: case ButlerType.DIRECT: - return _create_direct_butler_factory(config, self._preload_direct_butler_cache) + return _create_direct_butler_factory(config, self._preload_unsafe_direct_butler_caches) case ButlerType.REMOTE: return _create_remote_butler_factory(config) case _: @@ -177,7 +177,7 @@ def _get_config_uri(self, label: str) -> ResourcePathExpression: return config_uri -def _create_direct_butler_factory(config: ButlerConfig, preload_cache: bool) -> _FactoryFunction: +def _create_direct_butler_factory(config: ButlerConfig, preload_unsafe_caches: bool) -> _FactoryFunction: import lsst.daf.butler.direct_butler # Create a 'template' Butler that will be cloned when callers request an @@ -187,8 +187,7 @@ def _create_direct_butler_factory(config: ButlerConfig, preload_cache: bool) -> # Load caches so that data is available in cloned instances without # needing to refetch it from the database for every instance. - if preload_cache: - butler._preload_cache() + butler._preload_cache(load_dimension_record_cache=preload_unsafe_caches) def create_butler(access_token: str | None) -> Butler: # Access token is ignored because DirectButler does not use Gafaelfawr diff --git a/python/lsst/daf/butler/direct_butler/_direct_butler.py b/python/lsst/daf/butler/direct_butler/_direct_butler.py index bc9952f8da..69972d8856 100644 --- a/python/lsst/daf/butler/direct_butler/_direct_butler.py +++ b/python/lsst/daf/butler/direct_butler/_direct_butler.py @@ -2339,9 +2339,9 @@ def _query_all_datasets_by_page( pages = query_all_datasets(self, query, args) yield iter(page.data for page in pages) - def _preload_cache(self) -> None: + def _preload_cache(self, *, load_dimension_record_cache: bool = True) -> None: """Immediately load caches that are used for common operations.""" - self._registry.preload_cache() + self._registry.preload_cache(load_dimension_record_cache=load_dimension_record_cache) _config: ButlerConfig """Configuration for this Butler instance.""" diff --git a/python/lsst/daf/butler/registry/sql_registry.py b/python/lsst/daf/butler/registry/sql_registry.py index 4e4c53255b..edc35da3d2 100644 --- a/python/lsst/daf/butler/registry/sql_registry.py +++ b/python/lsst/daf/butler/registry/sql_registry.py @@ -2482,11 +2482,21 @@ def make_datastore_tables(self, tables: Mapping[str, DatastoreOpaqueTable]) -> N pass self._datastore_record_classes = datastore_record_classes - def preload_cache(self) -> None: - """Immediately load caches that are used for common operations.""" - self.dimension_record_cache.preload_cache() + def preload_cache(self, *, load_dimension_record_cache: bool) -> None: + """Immediately load caches that are used for common operations. + + Parameters + ---------- + load_dimension_record_cache : `bool` + If True, preload the dimension record cache. When this cache is + preloaded, subsequent external changes to governor dimension + records will not be visible to this Butler. + """ self._managers.datasets.preload_cache() + if load_dimension_record_cache: + self.dimension_record_cache.preload_cache() + @property def obsCoreTableManager(self) -> ObsCoreTableManager | None: """The ObsCore manager instance for this registry diff --git a/python/lsst/daf/butler/tests/server.py b/python/lsst/daf/butler/tests/server.py index 1146f8acfb..7b9d74a8ff 100644 --- a/python/lsst/daf/butler/tests/server.py +++ b/python/lsst/daf/butler/tests/server.py @@ -119,7 +119,7 @@ def create_test_server( # instrument records etc during setup. So configure the # factory to disable this preloading and re-fetch the records # as needed. - server_butler_factory._preload_direct_butler_cache = False + server_butler_factory._preload_unsafe_direct_butler_caches = False app.dependency_overrides[butler_factory_dependency] = lambda: server_butler_factory # Using TestClient in a context manager ensures that it uses From 096c8cf2803054e557d359668df48c0c3b81f713 Mon Sep 17 00:00:00 2001 From: "David H. Irving" Date: Tue, 10 Dec 2024 10:55:57 -0700 Subject: [PATCH 06/10] Fix failed assertion after cache miss Fix a bug where loading a dataset type registered externally after the Butler had loaded a "full" dataset type cache would cause an assertion failure "Dataset type cache population is incomplete" due to only filling in one of the two caches. --- .../datasets/byDimensions/_manager.py | 6 ++--- .../daf/butler/registry/tests/_registry.py | 26 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py index 288c452fbe..e9a6a891bb 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py @@ -586,9 +586,9 @@ def _find_storage(self, name: str) -> _DatasetRecordStorage: record = self._fetch_dataset_type_record(name) if record is not None: self._cache.add(record.dataset_type, record.dataset_type_id) - return _DatasetRecordStorage( - record.dataset_type, record.dataset_type_id, record.make_dynamic_tables() - ) + tables = record.make_dynamic_tables() + self._cache.add_by_dimensions(record.dataset_type.dimensions, tables) + return _DatasetRecordStorage(record.dataset_type, record.dataset_type_id, tables) raise MissingDatasetTypeError(f"Dataset type {name!r} does not exist.") def getCollectionSummary(self, collection: CollectionRecord) -> CollectionSummary: diff --git a/python/lsst/daf/butler/registry/tests/_registry.py b/python/lsst/daf/butler/registry/tests/_registry.py index 4b6334fb18..1de8babfac 100644 --- a/python/lsst/daf/butler/registry/tests/_registry.py +++ b/python/lsst/daf/butler/registry/tests/_registry.py @@ -326,6 +326,32 @@ def testDatasetType(self): self.assertCountEqual([dt.name for dt in types], ["test", "testNoneTemplate"]) self.assertEqual(missing, ["notarealdatasettype"]) + def testDatasetTypeCache(self): + """Test for dataset type cache update logic after a cache miss.""" + butler1 = self.make_butler() + butler2 = butler1.clone() + self.load_data(butler1, "base.yaml") + + # Trigger full cache load. + butler2.get_dataset_type("flat") + # Have an external process register a dataset type. + butler1.registry.registerDatasetType( + DatasetType("test_type", ["instrument"], "int", universe=butler1.dimensions) + ) + # Try to read the new dataset type -- this is a cache miss that + # triggers fetch of a single dataset type. + dt = butler2.get_dataset_type("test_type") + self.assertEqual(dt.name, "test_type") + self.assertEqual(list(dt.dimensions.names), ["instrument"]) + # Read it again -- this time it should pull from the cache. + dt = butler2.get_dataset_type("test_type") + self.assertEqual(dt.name, "test_type") + self.assertEqual(list(dt.dimensions.names), ["instrument"]) + # Do a query that uses the dataset type's tags table. + self.assertEqual( + butler2.query_datasets("test_type", collections="*", find_first=False, explain=False), [] + ) + def testDimensions(self): """Tests for `SqlRegistry.insertDimensionData`, `SqlRegistry.syncDimensionData`, and `SqlRegistry.expandDataId`. From fdcd9ecfd8566c0aba2c04d86400706952fbdd0c Mon Sep 17 00:00:00 2001 From: "David H. Irving" Date: Tue, 10 Dec 2024 11:55:44 -0700 Subject: [PATCH 07/10] Add documentation about server caches Copy some notes from the comments on DM-42317, and update them for the changes to dataset type caching. --- .../daf/butler/remote_butler/server/README.md | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 python/lsst/daf/butler/remote_butler/server/README.md diff --git a/python/lsst/daf/butler/remote_butler/server/README.md b/python/lsst/daf/butler/remote_butler/server/README.md new file mode 100644 index 0000000000..3188ba9750 --- /dev/null +++ b/python/lsst/daf/butler/remote_butler/server/README.md @@ -0,0 +1,57 @@ +# Butler server + +## Concurrency and caching + +The server internally uses `DirectButler` instances to retrieve data from the +database. `DirectButler` instances are not thread-safe, so we need a separate +instance for each request. It is expensive to create a `DirectButler` instance +from scratch, so we create a "template" `DirectButler` the first a repository +is accessed, and clone it for each request. (The cloning process is managed by +`LabeledButlerFactory`.) + +Within `DirectButler`, there are a number of internal caches. Some of these +caches assume that no external processes will be mutating the repository during +the lifetime of the `DirectButler` instance, and lock in state at the first +time the data is accessed. This behavior is OK in the context of a single HTTP +request in Butler server. However, it is problematic across requests if the +repository can be changing in the background, because new requests wouldn't be +able to see updated data. + +It is expected that the main data release repositories will be immutable, +which would allow for more aggressive caching, but we don't yet have a way to +configure "mutable" vs "immutable" repositories in the server. + +### Summary of caches that exist in `DirectButler` + +Caches that are shared globally between all Butler instances within a process: + +- `DimensionUniverse` +- `DimensionGroup` +- `StorageClassFactory` + +Caches shared between instances cloned from the same parent Butler instance: + +- Most SqlAlchemy `Table` objects created during managers' `initialize()` classmethods +- Tables created by `register()` in `ByNameOpaqueTableStorageManager` +- Dataset "tag" and "calib" tables created by + `ByDimensionsDatasetRecordStorageManagerUUID` and stored in `DatasetTypeCache` + +Caches copied between instances at the time of cloning: + +- `DimensionRecordCache` +- `DatasetType` and `DynamicTables` instances in `DatasetTypeCache` + +Caches that start empty in a newly cloned instance: + +- Collection cache and collection summary cache in `CachingContext` +- `DimensionGraph` cache in `_DimensionGroupStorage` +- `DatastoreCacheManager` in `FileDatastore` (not relevant to Butler server, + since the server does not access files on the filesystem.) + +### Caching caveats + +There is not currently a way to detect all changes to a Butler repository and invalidate the caches. The Butler server must be restarted if a repository is changed in any of the following ways: + +- Any additions/deletions/modifications to governor dimension records +- Updating a dataset type (deleting it and re-registering the same name with different values) +- Deleting a dataset type From e4efd1141276138e36ba5d5f5ae2bc41b037535e Mon Sep 17 00:00:00 2001 From: "David H. Irving" Date: Wed, 11 Dec 2024 16:00:59 -0700 Subject: [PATCH 08/10] Add missing period in docstring Co-authored-by: Andy Salnikov --- .../registry/datasets/byDimensions/_dataset_type_cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py index 75e5c1596e..064107cded 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py @@ -108,7 +108,7 @@ def add(self, dataset_type: DatasetType, id: int) -> None: Dataset type, replaces any existing dataset type with the same name. id : `int` - The dataset type primary key + The dataset type primary key. Additional opaque object stored with this dataset type. """ self._by_name_cache[dataset_type.name] = (dataset_type, id) From 163ae6e2e5f5dde6b3211ca1e365582ef7cfbdb0 Mon Sep 17 00:00:00 2001 From: "David H. Irving" Date: Wed, 11 Dec 2024 16:02:45 -0700 Subject: [PATCH 09/10] Remove unnecessary variable declaration --- .../lsst/daf/butler/registry/datasets/byDimensions/_manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py index e9a6a891bb..714f315aad 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py @@ -432,7 +432,6 @@ def getDatasetRef(self, id: DatasetId) -> DatasetRef | None: return None run = row[self._run_key_column] record = self._record_from_row(row) - dynamic_tables: DynamicTables | None = None _, dataset_type_id = self._cache.get(record.dataset_type.name) if dataset_type_id is None: self._cache.add(record.dataset_type, record.dataset_type_id) From 8dfcee82a45aad39965aa6661736aa831fab2286 Mon Sep 17 00:00:00 2001 From: "David H. Irving" Date: Wed, 11 Dec 2024 16:05:06 -0700 Subject: [PATCH 10/10] Rename cache class --- .../datasets/byDimensions/_dataset_type_cache.py | 4 ++-- .../butler/registry/datasets/byDimensions/tables.py | 12 +++++------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py index 064107cded..efc48d3870 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/_dataset_type_cache.py @@ -33,7 +33,7 @@ from ...._dataset_type import DatasetType from ....dimensions import DimensionGroup -from .tables import DynamicTables, DynamicTablesCache +from .tables import DynamicTables, TableCache class DatasetTypeCache: @@ -54,7 +54,7 @@ class DatasetTypeCache: """ def __init__(self) -> None: - self.tables = DynamicTablesCache() + self.tables = TableCache() self._by_name_cache: dict[str, tuple[DatasetType, int]] = {} self._by_dimensions_cache: dict[DimensionGroup, DynamicTables] = {} self._full = False diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/tables.py b/python/lsst/daf/butler/registry/datasets/byDimensions/tables.py index 7254c8d104..8864cc451f 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/tables.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/tables.py @@ -451,7 +451,7 @@ def makeCalibTableSpec( return tableSpec -DynamicTablesCache: TypeAlias = ThreadSafeCache[str, sqlalchemy.Table] +TableCache: TypeAlias = ThreadSafeCache[str, sqlalchemy.Table] @immutable @@ -519,7 +519,7 @@ def from_dimensions_key( calibs_name=makeCalibTableName(dimensions_key) if is_calibration else None, ) - def create(self, db: Database, collections: type[CollectionManager], cache: DynamicTablesCache) -> None: + def create(self, db: Database, collections: type[CollectionManager], cache: TableCache) -> None: """Create the tables if they don't already exist. Parameters @@ -551,7 +551,7 @@ def create(self, db: Database, collections: type[CollectionManager], cache: Dyna ) def add_calibs( - self, db: Database, collections: type[CollectionManager], cache: DynamicTablesCache + self, db: Database, collections: type[CollectionManager], cache: TableCache ) -> DynamicTables: """Create a calibs table for a dataset type whose dimensions already have a tags table. @@ -577,9 +577,7 @@ def add_calibs( return self.copy(calibs_name=calibs_name) - def tags( - self, db: Database, collections: type[CollectionManager], cache: DynamicTablesCache - ) -> sqlalchemy.Table: + def tags(self, db: Database, collections: type[CollectionManager], cache: TableCache) -> sqlalchemy.Table: """Return the "tags" table that associates datasets with data IDs in TAGGED and RUN collections. @@ -612,7 +610,7 @@ def tags( return cache.set_or_get(self.tags_name, table) def calibs( - self, db: Database, collections: type[CollectionManager], cache: DynamicTablesCache + self, db: Database, collections: type[CollectionManager], cache: TableCache ) -> sqlalchemy.Table: """Return the "calibs" table that associates datasets with data IDs and timespans in CALIBRATION collections.