Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-47948: Preload dataset type cache for Butler server #1125

Merged
merged 10 commits into from
Dec 11, 2024
10 changes: 0 additions & 10 deletions python/lsst/daf/butler/registry/_caching_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

from ._collection_record_cache import CollectionRecordCache
from ._collection_summary_cache import CollectionSummaryCache
from ._dataset_type_cache import DatasetTypeCache


class CachingContext:
Expand All @@ -46,13 +45,9 @@ class CachingContext:
instances which will be `None` when caching is disabled. Instance of this
class is passed to the relevant managers that can use it to query or
populate caches when caching is enabled.

Dataset type cache is always enabled for now, this avoids the need for
explicitly enabling caching in pipetask executors.
"""

def __init__(self) -> None:
self._dataset_types: DatasetTypeCache = DatasetTypeCache()
self._collection_records: CollectionRecordCache | None = None
self._collection_summaries: CollectionSummaryCache | None = None
self._depth = 0
Expand Down Expand Up @@ -96,8 +91,3 @@ def collection_records(self) -> CollectionRecordCache | None:
def collection_summaries(self) -> CollectionSummaryCache | None:
"""Cache for collection summary records (`CollectionSummaryCache`)."""
return self._collection_summaries

@property
def dataset_types(self) -> DatasetTypeCache:
"""Cache for dataset types, never disabled (`DatasetTypeCache`)."""
return self._dataset_types
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@
__all__ = ("DatasetTypeCache",)

from collections.abc import Iterable, Iterator
from typing import TYPE_CHECKING

from .._dataset_type import DatasetType
from ..dimensions import DimensionGroup

if TYPE_CHECKING:
from .datasets.byDimensions.tables import DynamicTables
from ...._dataset_type import DatasetType
from ....dimensions import DimensionGroup
from .tables import DynamicTables, DynamicTablesCache


class DatasetTypeCache:
Expand All @@ -57,8 +54,6 @@ class DatasetTypeCache:
"""

def __init__(self) -> None:
from .datasets.byDimensions.tables import DynamicTablesCache

self.tables = DynamicTablesCache()
self._by_name_cache: dict[str, tuple[DatasetType, int]] = {}
self._by_dimensions_cache: dict[DimensionGroup, DynamicTables] = {}
Expand Down
122 changes: 50 additions & 72 deletions python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ...interfaces import DatasetRecordStorageManager, RunRecord, VersionTuple
from ...queries import SqlQueryContext # old registry query system
from ...wildcards import DatasetTypeWildcard
from ._dataset_type_cache import DatasetTypeCache
from .summaries import CollectionSummaryManager
from .tables import DynamicTables, addDatasetForeignKey, makeStaticTableSpecs, makeTagTableSpec

Expand Down Expand Up @@ -132,8 +133,6 @@ class ByDimensionsDatasetRecordStorageManagerUUID(DatasetRecordStorageManager):
tables used by this class.
summaries : `CollectionSummaryManager`
Structure containing tables that summarize the contents of collections.
caching_context : `CachingContext`
Object controlling caching of information returned by managers.
registry_schema_version : `VersionTuple` or `None`, optional
Version of registry schema.
"""
Expand All @@ -146,7 +145,6 @@ def __init__(
dimensions: DimensionRecordStorageManager,
static: StaticDatasetTablesTuple,
summaries: CollectionSummaryManager,
caching_context: CachingContext,
registry_schema_version: VersionTuple | None = None,
):
super().__init__(registry_schema_version=registry_schema_version)
Expand All @@ -155,7 +153,7 @@ def __init__(
self._dimensions = dimensions
self._static = static
self._summaries = summaries
self._caching_context = caching_context
self._cache = DatasetTypeCache()
self._use_astropy_ingest_date = self.ingest_date_dtype() is ddl.AstropyTimeNsecTai
self._run_key_column = collections.getRunForeignKeyName()

Expand Down Expand Up @@ -196,7 +194,6 @@ def initialize(
dimensions=dimensions,
static=static,
summaries=summaries,
caching_context=caching_context,
registry_schema_version=registry_schema_version,
)

Expand Down Expand Up @@ -272,14 +269,12 @@ def clone(
dimensions=dimensions,
static=self._static,
summaries=self._summaries.clone(db=db, collections=collections, caching_context=caching_context),
caching_context=caching_context,
registry_schema_version=self._registry_schema_version,
)

def refresh(self) -> None:
# Docstring inherited from DatasetRecordStorageManager.
if self._caching_context.dataset_types is not None:
self._caching_context.dataset_types.clear()
self._cache.clear()

def remove_dataset_type(self, name: str) -> None:
# Docstring inherited from DatasetRecordStorageManager.
Expand Down Expand Up @@ -317,21 +312,15 @@ def register_dataset_type(self, dataset_type: DatasetType) -> bool:
)
record = self._fetch_dataset_type_record(dataset_type.name)
if record is None:
if (
dynamic_tables := self._caching_context.dataset_types.get_by_dimensions(
dataset_type.dimensions
)
) is None:
if (dynamic_tables := self._cache.get_by_dimensions(dataset_type.dimensions)) is None:
dimensions_key = self._dimensions.save_dimension_group(dataset_type.dimensions)
dynamic_tables = DynamicTables.from_dimensions_key(
dataset_type.dimensions, dimensions_key, dataset_type.isCalibration()
)
dynamic_tables.create(
self._db, type(self._collections), self._caching_context.dataset_types.tables
)
dynamic_tables.create(self._db, type(self._collections), self._cache.tables)
elif dataset_type.isCalibration() and dynamic_tables.calibs_name is None:
dynamic_tables = dynamic_tables.add_calibs(
self._db, type(self._collections), self._caching_context.dataset_types.tables
self._db, type(self._collections), self._cache.tables
)
row, inserted = self._db.sync(
self._static.dataset_type,
Expand All @@ -351,9 +340,9 @@ def register_dataset_type(self, dataset_type: DatasetType) -> bool:
returning=["id", "tag_association_table"],
)
# Make sure that cache is updated
if self._caching_context.dataset_types is not None and row is not None:
self._caching_context.dataset_types.add(dataset_type, row["id"])
self._caching_context.dataset_types.add_by_dimensions(dataset_type.dimensions, dynamic_tables)
if row is not None:
self._cache.add(dataset_type, row["id"])
self._cache.add_by_dimensions(dataset_type.dimensions, dynamic_tables)
else:
if dataset_type != record.dataset_type:
raise ConflictingDefinitionError(
Expand Down Expand Up @@ -438,22 +427,15 @@ def getDatasetRef(self, id: DatasetId) -> DatasetRef | None:
run = row[self._run_key_column]
record = self._record_from_row(row)
dynamic_tables: DynamicTables | None = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This declaration is not needed any more?

if self._caching_context.dataset_types is not None:
_, dataset_type_id = self._caching_context.dataset_types.get(record.dataset_type.name)
if dataset_type_id is None:
if self._caching_context.dataset_types is not None:
self._caching_context.dataset_types.add(record.dataset_type, record.dataset_type_id)
else:
assert record.dataset_type_id == dataset_type_id, "Two IDs for the same dataset type name!"
dynamic_tables = self._caching_context.dataset_types.get_by_dimensions(
record.dataset_type.dimensions
)
_, dataset_type_id = self._cache.get(record.dataset_type.name)
if dataset_type_id is None:
self._cache.add(record.dataset_type, record.dataset_type_id)
else:
assert record.dataset_type_id == dataset_type_id, "Two IDs for the same dataset type name!"
dynamic_tables = self._cache.get_by_dimensions(record.dataset_type.dimensions)
if dynamic_tables is None:
dynamic_tables = record.make_dynamic_tables()
if self._caching_context.dataset_types is not None:
self._caching_context.dataset_types.add_by_dimensions(
record.dataset_type.dimensions, dynamic_tables
)
self._cache.add_by_dimensions(record.dataset_type.dimensions, dynamic_tables)
if record.dataset_type.dimensions:
# This query could return multiple rows (one for each tagged
# collection the dataset is in, plus one for its run collection),
Expand Down Expand Up @@ -544,61 +526,57 @@ def _fetch_dataset_types(self) -> list[DatasetType]:
# whenever a dataset type is added to the cache by name, to reduce the
# number of possible states the cache can be in and minimize the number
# of queries.
if self._caching_context.dataset_types is not None:
if self._caching_context.dataset_types.full:
return [dataset_type for dataset_type, _ in self._caching_context.dataset_types.items()]
if self._cache.full:
return [dataset_type for dataset_type, _ in self._cache.items()]
with self._db.query(self._static.dataset_type.select()) as sql_result:
sql_rows = sql_result.mappings().fetchall()
records = [self._record_from_row(row) for row in sql_rows]
# Cache everything and specify that cache is complete.
if self._caching_context.dataset_types is not None:
cache_data: list[tuple[DatasetType, int]] = []
cache_dimensions_data: dict[DimensionGroup, DynamicTables] = {}
for record in records:
cache_data.append((record.dataset_type, record.dataset_type_id))
if (dynamic_tables := cache_dimensions_data.get(record.dataset_type.dimensions)) is None:
tables = record.make_dynamic_tables()
else:
tables = record.update_dynamic_tables(dynamic_tables)
cache_dimensions_data[record.dataset_type.dimensions] = tables
self._caching_context.dataset_types.set(
cache_data, full=True, dimensions_data=cache_dimensions_data.items(), dimensions_full=True
)
cache_data: list[tuple[DatasetType, int]] = []
cache_dimensions_data: dict[DimensionGroup, DynamicTables] = {}
for record in records:
cache_data.append((record.dataset_type, record.dataset_type_id))
if (dynamic_tables := cache_dimensions_data.get(record.dataset_type.dimensions)) is None:
tables = record.make_dynamic_tables()
else:
tables = record.update_dynamic_tables(dynamic_tables)
cache_dimensions_data[record.dataset_type.dimensions] = tables
self._cache.set(
cache_data, full=True, dimensions_data=cache_dimensions_data.items(), dimensions_full=True
)
return [record.dataset_type for record in records]

def _find_storage(self, name: str) -> _DatasetRecordStorage:
"""Find a dataset type and the extra information needed to work with
it, utilizing and populating the cache as needed.
"""
if self._caching_context.dataset_types is not None:
dataset_type, dataset_type_id = self._caching_context.dataset_types.get(name)
dataset_type, dataset_type_id = self._cache.get(name)
if dataset_type is not None:
tables = self._cache.get_by_dimensions(dataset_type.dimensions)
assert (
dataset_type_id is not None and tables is not None
), "Dataset type cache population is incomplete."
return _DatasetRecordStorage(
dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables
)
else:
# On the first cache miss populate the cache with complete list
# of dataset types (if it was not done yet).
if not self._cache.full:
self._fetch_dataset_types()
# Try again
dataset_type, dataset_type_id = self._cache.get(name)
if dataset_type is not None:
tables = self._caching_context.dataset_types.get_by_dimensions(dataset_type.dimensions)
tables = self._cache.get_by_dimensions(dataset_type.dimensions)
assert (
dataset_type_id is not None and tables is not None
), "Dataset type cache population is incomplete."
return _DatasetRecordStorage(
dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables
)
else:
# On the first cache miss populate the cache with complete list
# of dataset types (if it was not done yet).
if not self._caching_context.dataset_types.full:
self._fetch_dataset_types()
# Try again
dataset_type, dataset_type_id = self._caching_context.dataset_types.get(name)
if dataset_type is not None:
tables = self._caching_context.dataset_types.get_by_dimensions(dataset_type.dimensions)
assert (
dataset_type_id is not None and tables is not None
), "Dataset type cache population is incomplete."
return _DatasetRecordStorage(
dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables
)
record = self._fetch_dataset_type_record(name)
if record is not None:
if self._caching_context.dataset_types is not None:
self._caching_context.dataset_types.add(record.dataset_type, record.dataset_type_id)
self._cache.add(record.dataset_type, record.dataset_type_id)
return _DatasetRecordStorage(
record.dataset_type, record.dataset_type_id, record.make_dynamic_tables()
)
Expand Down Expand Up @@ -1641,10 +1619,10 @@ def refresh_collection_summaries(self, dataset_type: DatasetType) -> None:
self._summaries.delete_collections(storage.dataset_type_id, collections_to_delete)

def _get_tags_table(self, table: DynamicTables) -> sqlalchemy.Table:
return table.tags(self._db, type(self._collections), self._caching_context.dataset_types.tables)
return table.tags(self._db, type(self._collections), self._cache.tables)

def _get_calibs_table(self, table: DynamicTables) -> sqlalchemy.Table:
return table.calibs(self._db, type(self._collections), self._caching_context.dataset_types.tables)
return table.calibs(self._db, type(self._collections), self._cache.tables)


def _create_case_expression_for_collections(
Expand Down