From f4dca3012bbef38d84c9f94770c74b6905ad98cd Mon Sep 17 00:00:00 2001 From: gibsondan Date: Wed, 1 May 2024 17:24:27 -0500 Subject: [PATCH] Make the asset status cache do no additional DB work when the cursor is up to date and there are no in-progress runs materializing the asset (#21194) Summary: The goal of this PR is to speed up the asset partition status cache in the hopefully reasonably common case where the cache is up to date and there are no in-progress runs currently materializing the asset. By leveraging the "last_planned_materialization_storage_id" field on AssetEntry, which is set in some storages but not others, we can add additional checks to short-circuit any DB queries using only the information that has already been fetched on the AssetEntry. ## Summary & Motivation ## How I Tested These Changes --- .../implementation/fetch_assets.py | 20 +++- .../implementation/fetch_runs.py | 2 +- .../dagster_graphql/schema/asset_graph.py | 14 +-- .../graphql/test_assets.py | 39 ++++++- .../storage/asset_check_execution_record.py | 3 +- .../batch_asset_record_loader.py | 25 +++-- .../dagster/_core/storage/event_log/base.py | 16 +++ .../_core/storage/event_log/sql_event_log.py | 1 + .../_core/storage/partition_status_cache.py | 105 +++++++++++------- .../dagster/_core/workspace/context.py | 2 +- .../_utils/caching_instance_queryer.py | 29 ++--- .../storage_tests/test_asset_events.py | 14 ++- .../storage_tests/utils/event_log_storage.py | 17 +++ .../utils/partition_status_cache.py | 22 +++- 14 files changed, 215 insertions(+), 94 deletions(-) rename python_modules/dagster/dagster/_core/{workspace => storage}/batch_asset_record_loader.py (84%) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py index 21450801095e0..a6c98264c2605 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py @@ -42,11 +42,12 @@ from dagster._core.remote_representation.code_location import CodeLocation from dagster._core.remote_representation.external import ExternalRepository from dagster._core.remote_representation.external_data import ExternalAssetNode -from dagster._core.storage.event_log.base import AssetRecord +from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader from dagster._core.storage.event_log.sql_event_log import get_max_event_records_limit from dagster._core.storage.partition_status_cache import ( build_failed_and_in_progress_partition_subset, get_and_update_asset_status_cache_value, + get_last_planned_storage_id, get_materialized_multipartitions, get_validated_partition_keys, is_cacheable_partition_type, @@ -414,7 +415,7 @@ def get_partition_subsets( instance: DagsterInstance, asset_key: AssetKey, dynamic_partitions_loader: DynamicPartitionsStore, - asset_record: Optional[AssetRecord], + batch_asset_record_loader: Optional[BatchAssetRecordLoader], partitions_def: Optional[PartitionsDefinition] = None, ) -> Tuple[Optional[PartitionsSubset], Optional[PartitionsSubset], Optional[PartitionsSubset]]: """Returns a tuple of PartitionSubset objects: the first is the materialized partitions, @@ -431,7 +432,7 @@ def get_partition_subsets( asset_key, partitions_def, dynamic_partitions_loader, - asset_record, + batch_asset_record_loader, ) materialized_subset = ( updated_cache_value.deserialize_materialized_partition_subsets(partitions_def) @@ -470,8 +471,19 @@ def get_partition_subsets( else partitions_def.empty_subset() ) + if batch_asset_record_loader: + asset_record = batch_asset_record_loader.get_asset_record(asset_key) + else: + asset_record = next(iter(instance.get_asset_records(asset_keys=[asset_key])), None) + failed_subset, in_progress_subset, _ = build_failed_and_in_progress_partition_subset( - instance, asset_key, partitions_def, dynamic_partitions_loader + instance, + asset_key, + partitions_def, + dynamic_partitions_loader, + last_planned_materialization_storage_id=get_last_planned_storage_id( + instance, asset_key, asset_record + ), ) return materialized_subset, failed_subset, in_progress_subset diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index 84737ac8ba52d..ce7bcb42e8c2b 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -26,7 +26,7 @@ from .external import ensure_valid_config, get_external_job_or_raise if TYPE_CHECKING: - from dagster._core.workspace.batch_asset_record_loader import BatchAssetRecordLoader + from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader from ..schema.asset_graph import GrapheneAssetLatestInfo from ..schema.errors import GrapheneRunNotFoundError diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 257526295391d..b154fdd33de17 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -34,8 +34,8 @@ ExternalTimeWindowPartitionsDefinitionData, ) from dagster._core.snap.node import GraphDefSnap, OpDefSnap +from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader from dagster._core.utils import is_valid_email -from dagster._core.workspace.batch_asset_record_loader import BatchAssetRecordLoader from dagster._core.workspace.permissions import Permissions from dagster._utils.caching_instance_queryer import CachingInstanceQueryer @@ -1140,11 +1140,7 @@ def resolve_assetPartitionStatuses( graphene_info.context.instance, asset_key, self._dynamic_partitions_loader, - ( - self._asset_record_loader.get_asset_record(asset_key) - if self._asset_record_loader - else None - ), + self._asset_record_loader, partitions_def, ) @@ -1174,11 +1170,7 @@ def resolve_partitionStats( graphene_info.context.instance, asset_key, self._dynamic_partitions_loader, - ( - self._asset_record_loader.get_asset_record(self._external_asset_node.asset_key) - if self._asset_record_loader - else None - ), + self._asset_record_loader, ( self._external_asset_node.partitions_def_data.get_partitions_definition() if self._external_asset_node.partitions_def_data diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py index 8e401f2320583..45ad34fe2a016 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py @@ -1366,6 +1366,8 @@ def test_default_partitions(self, graphql_context: WorkspaceRequestContext) -> N # Test that when partition a is materialized that the materialized partitions are a _create_partitioned_run(graphql_context, "partition_materialization_job", partition_key="a") + graphql_context.asset_record_loader.clear_cache() + selector = infer_job_selector(graphql_context, "partition_materialization_job") result = execute_dagster_graphql( graphql_context, @@ -1386,6 +1388,8 @@ def test_default_partitions(self, graphql_context: WorkspaceRequestContext) -> N # Test that when partition c is materialized that the materialized partitions are a, c _create_partitioned_run(graphql_context, "partition_materialization_job", partition_key="c") + graphql_context.asset_record_loader.clear_cache() + result = execute_dagster_graphql( graphql_context, GET_1D_ASSET_PARTITIONS, @@ -1428,6 +1432,8 @@ def test_partition_stats(self, graphql_context: WorkspaceRequestContext): tags={"fail": "true"}, ) + graphql_context.asset_record_loader.clear_cache() + result = execute_dagster_graphql( graphql_context, GET_PARTITION_STATS, @@ -1449,6 +1455,8 @@ def test_partition_stats(self, graphql_context: WorkspaceRequestContext): tags={"fail": "true"}, ) + graphql_context.asset_record_loader.clear_cache() + result = execute_dagster_graphql( graphql_context, GET_PARTITION_STATS, @@ -1480,6 +1488,8 @@ def test_partition_stats(self, graphql_context: WorkspaceRequestContext): assert not result.errors assert result.data + graphql_context.asset_record_loader.clear_cache() + stats_result = execute_dagster_graphql( graphql_context, GET_PARTITION_STATS, @@ -1592,6 +1602,8 @@ def _get_datetime_float(dt_str): graphql_context, "time_partitioned_assets_job", partition_key=time_0 ) + graphql_context.asset_record_loader.clear_cache() + selector = infer_job_selector(graphql_context, "time_partitioned_assets_job") result = execute_dagster_graphql( graphql_context, @@ -1616,6 +1628,8 @@ def _get_datetime_float(dt_str): graphql_context, "time_partitioned_assets_job", partition_key=time_2 ) + graphql_context.asset_record_loader.clear_cache() + result = execute_dagster_graphql( graphql_context, GET_1D_ASSET_PARTITIONS, @@ -1642,6 +1656,8 @@ def _get_datetime_float(dt_str): graphql_context, "time_partitioned_assets_job", partition_key=time_1 ) + graphql_context.asset_record_loader.clear_cache() + result = execute_dagster_graphql( graphql_context, GET_1D_ASSET_PARTITIONS, @@ -1760,11 +1776,11 @@ def get_response_by_asset(response): assert result["asset_1"]["latestRun"] is None assert result["asset_1"]["latestMaterialization"] is None - graphql_context.asset_record_loader.clear_cache() - # Test with 1 run on all assets first_run_id = _create_run(graphql_context, "failure_assets_job") + graphql_context.asset_record_loader.clear_cache() + result = execute_dagster_graphql( graphql_context, GET_ASSET_LATEST_RUN_STATS, @@ -1789,8 +1805,6 @@ def get_response_by_asset(response): assert result["asset_3"]["latestRun"]["id"] == first_run_id assert result["asset_3"]["latestMaterialization"] is None - graphql_context.asset_record_loader.clear_cache() - # Confirm that asset selection is respected run_id = _create_run( graphql_context, @@ -1798,6 +1812,8 @@ def get_response_by_asset(response): asset_selection=[{"path": ["asset_3"]}], ) + graphql_context.asset_record_loader.clear_cache() + result = execute_dagster_graphql( graphql_context, GET_ASSET_LATEST_RUN_STATS, @@ -2145,6 +2161,9 @@ def _get_date_float(dt_str): MultiPartitionKey({"date": partition_field[0], "ab": partition_field[1]}), asset_selection=[AssetKey("multipartitions_1")], ) + + graphql_context.asset_record_loader.clear_cache() + result = execute_dagster_graphql( graphql_context, GET_2D_ASSET_PARTITIONS, @@ -2262,6 +2281,9 @@ def _get_date_float(dt_str): MultiPartitionKey({"date": partition_field[0], "ab": partition_field[1]}), tags={"fail": "true"}, ) + + graphql_context.asset_record_loader.clear_cache() + result = execute_dagster_graphql( graphql_context, GET_2D_ASSET_PARTITIONS, @@ -2308,6 +2330,9 @@ def _get_date_float(dt_str): MultiPartitionKey({"date": partition_field[0], "ab": partition_field[1]}), tags={"fail": "true"}, ) + + graphql_context.asset_record_loader.clear_cache() + result = execute_dagster_graphql( graphql_context, GET_2D_ASSET_PARTITIONS, @@ -2337,6 +2362,9 @@ def _get_date_float(dt_str): "multipartitions_fail_job", MultiPartitionKey({"date": partition_field[0], "ab": partition_field[1]}), ) + + graphql_context.asset_record_loader.clear_cache() + result = execute_dagster_graphql( graphql_context, GET_2D_ASSET_PARTITIONS, @@ -2375,6 +2403,9 @@ def test_dynamic_dim_in_multipartitions_def(self, graphql_context: WorkspaceRequ "dynamic_in_multipartitions_success_job", MultiPartitionKey({"dynamic": "1", "static": "a"}), ) + + graphql_context.asset_record_loader.clear_cache() + counter = Counter() traced_counter.set(counter) result = execute_dagster_graphql( diff --git a/python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py b/python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py index 12a2ae6da1aae..7f9c5635cf9d2 100644 --- a/python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py +++ b/python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py @@ -2,8 +2,7 @@ from typing import NamedTuple, Optional import dagster._check as check -from dagster import EventLogEntry -from dagster._core.events import DagsterEventType +from dagster._core.events.log import DagsterEventType, EventLogEntry from dagster._serdes.serdes import deserialize_value from dagster._utils import datetime_as_float diff --git a/python_modules/dagster/dagster/_core/workspace/batch_asset_record_loader.py b/python_modules/dagster/dagster/_core/storage/batch_asset_record_loader.py similarity index 84% rename from python_modules/dagster/dagster/_core/workspace/batch_asset_record_loader.py rename to python_modules/dagster/dagster/_core/storage/batch_asset_record_loader.py index c29b1b9768e8c..ee19aec6d6845 100644 --- a/python_modules/dagster/dagster/_core/workspace/batch_asset_record_loader.py +++ b/python_modules/dagster/dagster/_core/storage/batch_asset_record_loader.py @@ -1,12 +1,12 @@ -from typing import Iterable, Mapping, Optional, Sequence, Set +from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Set -from dagster import ( - DagsterInstance, - _check as check, -) +import dagster._check as check from dagster._core.definitions.events import AssetKey from dagster._core.events.log import EventLogEntry -from dagster._core.storage.event_log.base import AssetRecord +from dagster._core.instance import DagsterInstance + +if TYPE_CHECKING: + from dagster._core.storage.event_log.base import AssetRecord class BatchAssetRecordLoader: @@ -17,13 +17,13 @@ class BatchAssetRecordLoader: def __init__(self, instance: DagsterInstance, asset_keys: Iterable[AssetKey]): self._instance = instance self._unfetched_asset_keys: Set[AssetKey] = set(asset_keys) - self._asset_records: Mapping[AssetKey, Optional[AssetRecord]] = {} + self._asset_records: Mapping[AssetKey, Optional["AssetRecord"]] = {} def add_asset_keys(self, asset_keys: Iterable[AssetKey]): unfetched_asset_keys = set(asset_keys).difference(self._asset_records.keys()) self._unfetched_asset_keys = self._unfetched_asset_keys.union(unfetched_asset_keys) - def get_asset_record(self, asset_key: AssetKey) -> Optional[AssetRecord]: + def get_asset_record(self, asset_key: AssetKey) -> Optional["AssetRecord"]: if asset_key not in self._asset_records and asset_key not in self._unfetched_asset_keys: check.failed( f"Asset key {asset_key} not recognized for this loader. Expected one of:" @@ -31,7 +31,7 @@ def get_asset_record(self, asset_key: AssetKey) -> Optional[AssetRecord]: ) if asset_key in self._unfetched_asset_keys: - self._fetch() + self.fetch() return self._asset_records.get(asset_key) @@ -40,7 +40,10 @@ def clear_cache(self): self._unfetched_asset_keys = self._unfetched_asset_keys.union(self._asset_records.keys()) self._asset_records = {} - def get_asset_records(self, asset_keys: Sequence[AssetKey]) -> Sequence[AssetRecord]: + def has_cached_asset_record(self, asset_key: AssetKey): + return asset_key in self._asset_records + + def get_asset_records(self, asset_keys: Sequence[AssetKey]) -> Sequence["AssetRecord"]: records = [self.get_asset_record(asset_key) for asset_key in asset_keys] return [record for record in records if record] @@ -65,7 +68,7 @@ def get_latest_observation_for_asset_key(self, asset_key: AssetKey) -> Optional[ return asset_record.asset_entry.last_observation - def _fetch(self) -> None: + def fetch(self) -> None: if not self._unfetched_asset_keys: return diff --git a/python_modules/dagster/dagster/_core/storage/event_log/base.py b/python_modules/dagster/dagster/_core/storage/event_log/base.py index 941385cf56516..97bb98d6b6abb 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/base.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/base.py @@ -60,6 +60,8 @@ class AssetEntry( # This is an optional field which can be used for more performant last observation # queries if the underlying storage supports it ("last_observation_record", Optional[EventLogRecord]), + ("last_planned_materialization_storage_id", Optional[int]), + ("last_planned_materialization_run_id", Optional[str]), ], ) ): @@ -71,6 +73,8 @@ def __new__( asset_details: Optional[AssetDetails] = None, cached_status: Optional["AssetStatusCacheValue"] = None, last_observation_record: Optional[EventLogRecord] = None, + last_planned_materialization_storage_id: Optional[int] = None, + last_planned_materialization_run_id: Optional[str] = None, ): from dagster._core.storage.partition_status_cache import AssetStatusCacheValue @@ -88,6 +92,14 @@ def __new__( last_observation_record=check.opt_inst_param( last_observation_record, "last_observation_record", EventLogRecord ), + last_planned_materialization_storage_id=check.opt_int_param( + last_planned_materialization_storage_id, + "last_planned_materialization_storage_id", + ), + last_planned_materialization_run_id=check.opt_str_param( + last_planned_materialization_run_id, + "last_planned_materialization_run_id", + ), ) @property @@ -293,6 +305,10 @@ def get_asset_records( ) -> Sequence[AssetRecord]: pass + @property + def asset_records_have_last_planned_materialization_storage_id(self) -> bool: + return False + @abstractmethod def has_asset_key(self, asset_key: AssetKey) -> bool: pass diff --git a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py index 8d32b4136b6ea..5a1f55f099ff4 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py @@ -1191,6 +1191,7 @@ def _construct_asset_record_from_row( if can_cache_asset_status_data else None ), + last_planned_materialization_storage_id=None, ), ) else: diff --git a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py index 171a744077643..e106e8ebde3b4 100644 --- a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py +++ b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py @@ -31,6 +31,7 @@ from dagster._serdes.serdes import deserialize_value if TYPE_CHECKING: + from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader from dagster._core.storage.event_log.base import AssetRecord @@ -223,7 +224,16 @@ def get_validated_partition_keys( return validated_partitions -def _get_last_planned_storage_id(instance: DagsterInstance, asset_key: AssetKey): +def get_last_planned_storage_id( + instance: DagsterInstance, asset_key: AssetKey, asset_record: Optional["AssetRecord"] +) -> int: + if instance.event_log_storage.asset_records_have_last_planned_materialization_storage_id: + return ( + (asset_record.asset_entry.last_planned_materialization_storage_id or 0) + if asset_record + else 0 + ) + info = instance.get_latest_planned_materialization_info(asset_key) if not info: return 0 @@ -236,15 +246,23 @@ def _build_status_cache( asset_key: AssetKey, partitions_def: Optional[PartitionsDefinition], dynamic_partitions_store: DynamicPartitionsStore, - stored_cache_value: Optional[AssetStatusCacheValue] = None, - last_materialization_storage_id: Optional[int] = None, + stored_cache_value: Optional[AssetStatusCacheValue], + asset_record: Optional["AssetRecord"], ) -> Optional[AssetStatusCacheValue]: """This method refreshes the asset status cache for a given asset key. It recalculates the materialized partition subset for the asset key and updates the cache value. """ + last_materialization_storage_id = ( + asset_record.asset_entry.last_materialization_storage_id if asset_record else None + ) + + last_planned_materialization_storage_id = get_last_planned_storage_id( + instance, asset_key, asset_record + ) + latest_storage_id = max( - last_materialization_storage_id if last_materialization_storage_id else 0, - _get_last_planned_storage_id(instance, asset_key), + last_materialization_storage_id or 0, + last_planned_materialization_storage_id or 0, ) if not latest_storage_id: return None @@ -252,10 +270,14 @@ def _build_status_cache( if not partitions_def or not is_cacheable_partition_type(partitions_def): return AssetStatusCacheValue(latest_storage_id=latest_storage_id) - cached_failed_subset: PartitionsSubset = ( - partitions_def.deserialize_subset(stored_cache_value.serialized_failed_partition_subset) + failed_partitions: Set[str] = ( + set( + partitions_def.deserialize_subset( + stored_cache_value.serialized_failed_partition_subset + ).get_partition_keys() + ) if stored_cache_value and stored_cache_value.serialized_failed_partition_subset - else partitions_def.empty_subset() + else set() ) cached_in_progress_cursor = ( ( @@ -270,6 +292,15 @@ def _build_status_cache( if stored_cache_value: # fetch the incremental new materialized partitions, and update the cached materialized # subset + new_partitions = set() + if ( + last_materialization_storage_id + and last_materialization_storage_id > stored_cache_value.latest_storage_id + ): + new_partitions = instance.get_materialized_partitions( + asset_key, after_cursor=stored_cache_value.latest_storage_id + ) + materialized_subset: PartitionsSubset = ( partitions_def.deserialize_subset( stored_cache_value.serialized_materialized_partition_subset @@ -277,14 +308,11 @@ def _build_status_cache( if stored_cache_value.serialized_materialized_partition_subset else partitions_def.empty_subset() ).with_partition_keys( - get_validated_partition_keys( - dynamic_partitions_store, - partitions_def, - instance.get_materialized_partitions( - asset_key, after_cursor=stored_cache_value.latest_storage_id - ), - ) + get_validated_partition_keys(dynamic_partitions_store, partitions_def, new_partitions) ) + + failed_partitions.difference_update(new_partitions) + else: materialized_subset = partitions_def.empty_subset().with_partition_keys( get_validated_partition_keys( @@ -303,7 +331,8 @@ def _build_status_cache( asset_key, partitions_def, dynamic_partitions_store, - failed_partitions_subset=cached_failed_subset, + last_planned_materialization_storage_id=last_planned_materialization_storage_id, + failed_partitions=failed_partitions, after_storage_id=cached_in_progress_cursor, ) @@ -324,24 +353,23 @@ def build_failed_and_in_progress_partition_subset( asset_key: AssetKey, partitions_def: PartitionsDefinition, dynamic_partitions_store: DynamicPartitionsStore, - failed_partitions_subset: Optional[PartitionsSubset] = None, + last_planned_materialization_storage_id: int, + failed_partitions: Optional[Set[str]] = None, after_storage_id: Optional[int] = None, ) -> Tuple[PartitionsSubset, PartitionsSubset, Optional[int]]: - failed_partitions: Set[str] = ( - set(failed_partitions_subset.get_partition_keys()) if failed_partitions_subset else set() - ) + failed_partitions = failed_partitions or set() in_progress_partitions: Set[str] = set() - if failed_partitions: - # These partitions were cached as having been failed. If they have since been materialized, - # then we can remove them from the set of failed partitions. - materialized_partitions = instance.event_log_storage.get_materialized_partitions( - asset_key, after_cursor=after_storage_id - ) - failed_partitions.difference_update(materialized_partitions) - incomplete_materializations = instance.event_log_storage.get_latest_asset_partition_materialization_attempts_without_materializations( - asset_key, after_storage_id=after_storage_id - ) + incomplete_materializations = {} + + # Fetch incomplete materializations if there have been any planned materializations since the + # cursor + if last_planned_materialization_storage_id and ( + not after_storage_id or last_planned_materialization_storage_id > after_storage_id + ): + incomplete_materializations = instance.event_log_storage.get_latest_asset_partition_materialization_attempts_without_materializations( + asset_key, after_storage_id=after_storage_id + ) cursor = None if incomplete_materializations: @@ -397,18 +425,19 @@ def build_failed_and_in_progress_partition_subset( def get_and_update_asset_status_cache_value( instance: DagsterInstance, asset_key: AssetKey, - partitions_def: Optional[PartitionsDefinition] = None, + partitions_def: Optional[PartitionsDefinition], dynamic_partitions_loader: Optional[DynamicPartitionsStore] = None, - asset_record: Optional["AssetRecord"] = None, + batch_asset_record_loader: Optional["BatchAssetRecordLoader"] = None, ) -> Optional[AssetStatusCacheValue]: - asset_record = asset_record or next( - iter(instance.get_asset_records(asset_keys=[asset_key])), None - ) + if batch_asset_record_loader: + asset_record = batch_asset_record_loader.get_asset_record(asset_key) + else: + asset_record = next(iter(instance.get_asset_records(asset_keys=[asset_key])), None) + if asset_record is None: - stored_cache_value, latest_materialization_storage_id = None, None + stored_cache_value = None else: stored_cache_value = asset_record.asset_entry.cached_status - latest_materialization_storage_id = asset_record.asset_entry.last_materialization_storage_id dynamic_partitions_store = dynamic_partitions_loader if dynamic_partitions_loader else instance use_cached_value = ( @@ -425,7 +454,7 @@ def get_and_update_asset_status_cache_value( partitions_def=partitions_def, dynamic_partitions_store=dynamic_partitions_store, stored_cache_value=stored_cache_value if use_cached_value else None, - last_materialization_storage_id=latest_materialization_storage_id, + asset_record=asset_record, ) if updated_cache_value is not None and updated_cache_value != stored_cache_value: instance.update_asset_cached_status_data(asset_key, updated_cache_value) diff --git a/python_modules/dagster/dagster/_core/workspace/context.py b/python_modules/dagster/dagster/_core/workspace/context.py index 420a18d8dde4d..8a5c3c4fe77d5 100644 --- a/python_modules/dagster/dagster/_core/workspace/context.py +++ b/python_modules/dagster/dagster/_core/workspace/context.py @@ -39,9 +39,9 @@ GrpcServerCodeLocationOrigin, ManagedGrpcPythonEnvCodeLocationOrigin, ) +from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader from dagster._utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info -from .batch_asset_record_loader import BatchAssetRecordLoader from .load_target import WorkspaceLoadTarget from .permissions import ( PermissionResult, diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index e30dd6ed3af89..a87a688d8d6b4 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -43,6 +43,7 @@ from dagster._core.event_api import AssetRecordsFilter, EventRecordsFilter from dagster._core.events import DagsterEventType from dagster._core.instance import DagsterInstance, DynamicPartitionsStore +from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader from dagster._core.storage.dagster_run import ( DagsterRun, RunRecord, @@ -76,7 +77,8 @@ def __init__( self._asset_graph = asset_graph self._logger = logger or logging.getLogger("dagster") - self._asset_record_cache: Dict[AssetKey, Optional[AssetRecord]] = {} + self._batch_asset_record_loader = BatchAssetRecordLoader(self._instance, set()) + self._asset_partitions_cache: Dict[Optional[int], Dict[AssetKey, Set[str]]] = defaultdict( dict ) @@ -110,16 +112,8 @@ def evaluation_time(self) -> datetime: def prefetch_asset_records(self, asset_keys: Iterable[AssetKey]): """For performance, batches together queries for selected assets.""" - keys_to_fetch = set(asset_keys) - set(self._asset_record_cache.keys()) - if len(keys_to_fetch) == 0: - return - # get all asset records for selected assets that aren't already cached - asset_records = self.instance.get_asset_records(list(keys_to_fetch)) - for asset_record in asset_records: - self._asset_record_cache[asset_record.asset_entry.asset_key] = asset_record - for key in asset_keys: - if key not in self._asset_record_cache: - self._asset_record_cache[key] = None + self._batch_asset_record_loader.add_asset_keys(asset_keys) + self._batch_asset_record_loader.fetch() #################### # ASSET STATUS CACHE @@ -132,13 +126,13 @@ def _get_updated_cache_value(self, *, asset_key: AssetKey) -> Optional["AssetSta ) partitions_def = check.not_none(self.asset_graph.get(asset_key).partitions_def) - asset_record = self.get_asset_record(asset_key) + self._batch_asset_record_loader.add_asset_keys([asset_key]) return get_and_update_asset_status_cache_value( instance=self.instance, asset_key=asset_key, partitions_def=partitions_def, dynamic_partitions_loader=self, - asset_record=asset_record, + batch_asset_record_loader=self._batch_asset_record_loader, ) @cached_method @@ -176,14 +170,11 @@ def get_materialized_asset_subset(self, *, asset_key: AssetKey) -> AssetSubset: #################### def has_cached_asset_record(self, asset_key: AssetKey) -> bool: - return asset_key in self._asset_record_cache + return self._batch_asset_record_loader.has_cached_asset_record(asset_key) def get_asset_record(self, asset_key: AssetKey) -> Optional["AssetRecord"]: - if asset_key not in self._asset_record_cache: - self._asset_record_cache[asset_key] = next( - iter(self.instance.get_asset_records([asset_key])), None - ) - return self._asset_record_cache[asset_key] + self._batch_asset_record_loader.add_asset_keys({asset_key}) + return self._batch_asset_record_loader.get_asset_record(asset_key) def _event_type_for_key(self, asset_key: AssetKey) -> DagsterEventType: if self.asset_graph.get(asset_key).is_observable: diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_asset_events.py b/python_modules/dagster/dagster_tests/storage_tests/test_asset_events.py index 30f80cd9cc7b9..56286c95eef9a 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_asset_events.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_asset_events.py @@ -15,10 +15,10 @@ from dagster._core.event_api import EventRecordsFilter from dagster._core.events import DagsterEventType from dagster._core.instance import DagsterInstance +from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader from dagster._core.storage.input_manager import input_manager from dagster._core.storage.io_manager import IOManager from dagster._core.test_utils import instance_for_test -from dagster._core.workspace.batch_asset_record_loader import BatchAssetRecordLoader def n_asset_keys(path, n): @@ -197,6 +197,9 @@ def return_two(): fake_key = AssetKey(path=["fake_key"]) assert asset_records_loader._unfetched_asset_keys == {return_two.key} # noqa + assert not asset_records_loader.has_cached_asset_record(return_two.key) + assert not asset_records_loader.has_cached_asset_record(fake_key) + assert asset_records_loader._asset_records == {} # noqa assert ( @@ -204,6 +207,9 @@ def return_two(): == return_two.key ) + assert asset_records_loader.has_cached_asset_record(return_two.key) + assert not asset_records_loader.has_cached_asset_record(fake_key) + assert asset_records_loader._unfetched_asset_keys == set() # noqa assert len(asset_records_loader._asset_records) == 1 # noqa @@ -215,6 +221,8 @@ def return_two(): asset_records_loader.add_asset_keys({return_one.key, return_two.key, fake_key}) + assert not asset_records_loader.has_cached_asset_record(return_one.key) + assert asset_records_loader._unfetched_asset_keys == {return_one.key, fake_key} # noqa assert ( @@ -226,6 +234,10 @@ def return_two(): == return_one.key ) + assert asset_records_loader.has_cached_asset_record(return_two.key) + assert asset_records_loader.has_cached_asset_record(return_one.key) + assert asset_records_loader.has_cached_asset_record(fake_key) + assert asset_records_loader.get_asset_record(fake_key) is None assert asset_records_loader._unfetched_asset_keys == set() # noqa diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index 8248afe0d0e5e..56f902d89932c 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -3668,6 +3668,7 @@ def second_asset(my_asset): materialize_event = next( event for event in result.all_events if event.is_step_materialization ) + assert asset_entry.last_materialization assert asset_entry.last_materialization.dagster_event == materialize_event assert asset_entry.last_run_id == result.run_id @@ -3679,8 +3680,24 @@ def second_asset(my_asset): asset_key=my_asset_key, ) )[0] + assert asset_entry.last_materialization_record == event_log_record + materialization_planned_record = storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED, + asset_key=my_asset_key, + ) + )[0] + + if storage.asset_records_have_last_planned_materialization_storage_id: + assert ( + asset_entry.last_planned_materialization_storage_id + == materialization_planned_record.storage_id + ) + else: + assert not asset_entry.last_planned_materialization_storage_id + if self.can_wipe(): storage.wipe_asset(my_asset_key) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/partition_status_cache.py b/python_modules/dagster/dagster_tests/storage_tests/utils/partition_status_cache.py index 73e3a7ef5a7e9..768993d335436 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/partition_status_cache.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/partition_status_cache.py @@ -28,6 +28,7 @@ AssetStatusCacheValue, build_failed_and_in_progress_partition_subset, get_and_update_asset_status_cache_value, + get_last_planned_storage_id, ) from dagster._core.test_utils import create_run_for_test from dagster._core.utils import make_new_run_id @@ -157,6 +158,11 @@ def _swap_partitions_def(new_partitions_def, asset, asset_graph, asset_job): asset_records = list(instance.get_asset_records([asset_key])) assert len(asset_records) == 0 + cached_status = get_and_update_asset_status_cache_value( + instance, asset_key, asset_graph.get(asset_key).partitions_def + ) + assert not cached_status + asset_job.execute_in_process(instance=instance, partition_key="2022-02-01") cached_status = get_and_update_asset_status_cache_value( @@ -860,8 +866,16 @@ def test_batch_canceled_partitions(self, instance, delete_runs_instance): _create_test_planned_materialization_record(run_id, my_asset, partition) ) + last_planned_materialization_storage_id = get_last_planned_storage_id( + instance, my_asset, next(iter(instance.get_asset_records([my_asset])), None) + ) + failed_subset, in_progress_subset, _ = build_failed_and_in_progress_partition_subset( - instance, my_asset, static_partitions_def, instance + instance, + my_asset, + static_partitions_def, + instance, + last_planned_materialization_storage_id, ) assert failed_subset.get_partition_keys() == set() assert in_progress_subset.get_partition_keys() == set( @@ -875,7 +889,11 @@ def test_batch_canceled_partitions(self, instance, delete_runs_instance): instance.report_run_canceled(run) failed_subset, in_progress_subset, _ = build_failed_and_in_progress_partition_subset( - instance, my_asset, static_partitions_def, instance + instance, + my_asset, + static_partitions_def, + instance, + last_planned_materialization_storage_id, ) assert failed_subset.get_partition_keys() == set() assert in_progress_subset.get_partition_keys() == set()