diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py index 3db7b7190c533..c8e8bb62313a9 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py @@ -2,7 +2,7 @@ import datetime import functools from dataclasses import dataclass -from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Mapping, Optional, Sequence +from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Mapping, Optional, Sequence, Tuple from dagster._core.definitions.data_time import CachingDataTimeResolver from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey @@ -151,16 +151,6 @@ def parent_will_update_subset(self) -> AssetSubset: subset |= parent_subset._replace(asset_key=self.asset_key) return subset - @functools.cached_property - @root_property - def new_max_storage_id(self) -> Optional[int]: - """Returns the new max storage ID for this asset, if any.""" - # TODO: This is not a good way of doing this, as it opens us up to potential race conditions, - # but in the interest of keeping this PR simple, I'm leaving this logic as is. In the next - # PR, I'll update the code to return a "maximum observed record id" from inside the - # `get_asset_partitions_updated_after_cursor` call. - return self.instance_queryer.instance.event_log_storage.get_maximum_record_id() - @functools.cached_property @root_property def materialized_since_previous_tick_subset(self) -> AssetSubset: @@ -193,19 +183,35 @@ def materialized_requested_or_discarded_since_previous_tick_subset(self) -> Asse @functools.cached_property @root_property - def parent_has_updated_subset(self) -> AssetSubset: + def _parent_has_updated_subset_and_new_latest_storage_id( + self, + ) -> Tuple[AssetSubset, Optional[int]]: """Returns the set of asset partitions whose parents have updated since the last time this condition was evaluated. """ - return AssetSubset.from_asset_partitions_set( - self.asset_key, - self.partitions_def, - self.root_context.instance_queryer.asset_partitions_with_newly_updated_parents( - latest_storage_id=self.cursor.previous_max_storage_id, - child_asset_key=self.root_context.asset_key, - map_old_time_partitions=False, - ), + ( + asset_partitions, + cursor, + ) = self.root_context.instance_queryer.asset_partitions_with_newly_updated_parents_and_new_cursor( + latest_storage_id=self.cursor.previous_max_storage_id, + child_asset_key=self.root_context.asset_key, + map_old_time_partitions=False, ) + return AssetSubset.from_asset_partitions_set( + self.asset_key, self.partitions_def, asset_partitions + ), cursor + + @property + @root_property + def parent_has_updated_subset(self) -> AssetSubset: + subset, _ = self._parent_has_updated_subset_and_new_latest_storage_id + return subset + + @property + @root_property + def new_max_storage_id(self) -> AssetSubset: + _, storage_id = self._parent_has_updated_subset_and_new_latest_storage_id + return storage_id @property def candidate_parent_has_or_will_update_subset(self) -> AssetSubset: diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index b37e06acb5a7b..eef30fd3fbc25 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -1296,10 +1296,10 @@ def execute_asset_backfill_iteration_inner( parent_materialized_asset_partitions = set().union( *( - instance_queryer.asset_partitions_with_newly_updated_parents( + instance_queryer.asset_partitions_with_newly_updated_parents_and_new_cursor( latest_storage_id=asset_backfill_data.latest_storage_id, child_asset_key=asset_key, - ) + )[0] for asset_key in asset_backfill_data.target_subset.asset_keys ) ) diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index b5218cc6f5a84..9e86e0cbe48ff 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -10,6 +10,7 @@ Optional, Sequence, Set, + Tuple, Union, cast, ) @@ -495,29 +496,32 @@ def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> return partition_key in self.get_dynamic_partitions(partitions_def_name) @cached_method - def asset_partitions_with_newly_updated_parents( + def asset_partitions_with_newly_updated_parents_and_new_cursor( self, *, latest_storage_id: Optional[int], child_asset_key: AssetKey, map_old_time_partitions: bool = True, - ) -> AbstractSet[AssetKeyPartitionKey]: + ) -> Tuple[AbstractSet[AssetKeyPartitionKey], Optional[int]]: """Finds asset partitions of the given child whose parents have been materialized since latest_storage_id. """ if self.asset_graph.is_source(child_asset_key): - return set() + return set(), latest_storage_id child_partitions_def = self.asset_graph.get_partitions_def(child_asset_key) child_time_partitions_def = get_time_partitions_def(child_partitions_def) child_asset_partitions_with_updated_parents = set() + + max_storage_ids = [] for parent_asset_key in self.asset_graph.get_parents(child_asset_key): # ignore non-observable sources if self.asset_graph.is_source(parent_asset_key) and not self.asset_graph.is_observable( parent_asset_key ): continue + # if the parent has not been updated at all since the latest_storage_id, then skip if not self.get_asset_partitions_updated_after_cursor( asset_key=parent_asset_key, @@ -527,6 +531,13 @@ def asset_partitions_with_newly_updated_parents( ): continue + # keep track of the maximum storage id that we've seen for a given parent + parent_asset_record = self.get_asset_record(parent_asset_key) + if parent_asset_record: + max_storage_ids.append( + parent_asset_record.asset_entry.last_materialization_storage_id + ) + parent_partitions_def = self.asset_graph.get_partitions_def(parent_asset_key) if parent_partitions_def is None: latest_parent_record = check.not_none( @@ -564,7 +575,10 @@ def asset_partitions_with_newly_updated_parents( # we know a parent updated, and because the parent has a partitions def and the # child does not, the child could not have been materialized in the same run if child_partitions_def is None: - return {AssetKeyPartitionKey(child_asset_key)} + child_asset_partitions_with_updated_parents = { + AssetKeyPartitionKey(child_asset_key) + } + break # the set of asset partitions which have been updated since the latest storage id parent_partitions_subset = self.get_partitions_subset_updated_after_cursor( asset_key=parent_asset_key, after_cursor=latest_storage_id @@ -623,7 +637,16 @@ def asset_partitions_with_newly_updated_parents( ): child_asset_partitions_with_updated_parents.add(child_asset_partition) - return child_asset_partitions_with_updated_parents + asset_record = self.get_asset_record(child_asset_key) + if asset_record: + max_storage_ids.append(asset_record.asset_entry.last_materialization_storage_id) + + # the new latest storage id will be the greatest observed storage id among this asset and + # its parents + new_latest_storage_id = max( + filter(None, [latest_storage_id, *max_storage_ids]), default=None + ) + return (child_asset_partitions_with_updated_parents, new_latest_storage_id) #################### # RECONCILIATION