diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 65d0e9bf31419..ddcc180e4fef2 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -371,7 +371,7 @@ def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationRe partition_key=asset_partition.partition_key, ).parent_partitions - updated_parent_asset_partitions = context.instance_queryer.get_updated_parent_asset_partitions( + updated_parent_asset_partitions = context.instance_queryer.get_parent_asset_partitions_updated_after_child( asset_partition, parent_asset_partitions, # do a precise check for updated parents, factoring in data versions, as long as @@ -446,18 +446,16 @@ def description(self) -> str: def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults: asset_partitions_by_waiting_on_asset_keys = defaultdict(set) for candidate in context.candidates: - unreconciled_ancestors = set() + outdated_ancestors = set() # find the root cause of why this asset partition's parents are outdated (if any) for parent in context.get_parents_that_will_not_be_materialized_on_current_tick( asset_partition=candidate ): - unreconciled_ancestors.update( - context.instance_queryer.get_root_unreconciled_ancestors( - asset_partition=parent, - ) + outdated_ancestors.update( + context.instance_queryer.get_outdated_ancestors(asset_partition=parent) ) - if unreconciled_ancestors: - asset_partitions_by_waiting_on_asset_keys[frozenset(unreconciled_ancestors)].add( + if outdated_ancestors: + asset_partitions_by_waiting_on_asset_keys[frozenset(outdated_ancestors)].add( candidate ) if asset_partitions_by_waiting_on_asset_keys: @@ -553,7 +551,7 @@ def evaluate_for_asset( ).parent_partitions updated_parent_partitions = ( - context.instance_queryer.get_updated_parent_asset_partitions( + context.instance_queryer.get_parent_asset_partitions_updated_after_child( candidate, parent_partitions, context.daemon_context.respect_materialization_data_versions, diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index 9b66010c25265..33298337d51f6 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -81,7 +81,7 @@ def __init__( self._evaluation_time = evaluation_time if evaluation_time else pendulum.now("UTC") - self._root_unreconciled_ancestors_cache: Dict[AssetKeyPartitionKey, Set[AssetKey]] = {} + self._outdated_ancestors_cache: Dict[AssetKeyPartitionKey, Set[AssetKey]] = {} self._respect_materialization_data_versions = ( self._instance.auto_materialize_respect_materialization_data_versions ) @@ -757,9 +757,9 @@ def get_asset_partitions_updated_after_cursor( asset_partitions (Optional[Sequence[AssetKeyPartitionKey]]): If supplied, will filter the set of checked partitions to the given partitions. after_cursor (Optional[int]): The cursor after which to look for updates. - use_asset_versions (bool): If True, will use data versions to filter out asset - partitions which were materialized, but not have not had their data versions - cahnged since the given cursor. + respect_materialization_data_versions (bool): If True, will use data versions to filter + out asset partitions which were materialized, but not have not had their data + versions changed since the given cursor. NOTE: This boolean has been temporarily disabled """ if not self.asset_partition_has_materialization_or_observation( @@ -800,13 +800,16 @@ def get_asset_partitions_updated_after_cursor( asset_key, updated_after_cursor, after_cursor ) - def get_updated_parent_asset_partitions( + def get_parent_asset_partitions_updated_after_child( self, asset_partition: AssetKeyPartitionKey, parent_asset_partitions: AbstractSet[AssetKeyPartitionKey], respect_materialization_data_versions: bool, ignored_parent_keys: AbstractSet[AssetKey], ) -> AbstractSet[AssetKeyPartitionKey]: + """Returns values inside parent_asset_partitions that correspond to asset partitions that + have been updated since the latest materialization of asset_partition. + """ parent_asset_partitions_by_key: Dict[AssetKey, Set[AssetKeyPartitionKey]] = defaultdict(set) for parent in parent_asset_partitions: parent_asset_partitions_by_key[parent.asset_key].add(parent) @@ -851,15 +854,17 @@ def get_updated_parent_asset_partitions( ) return updated_parents - def get_root_unreconciled_ancestors( + def get_outdated_ancestors( self, *, asset_partition: AssetKeyPartitionKey ) -> AbstractSet[AssetKey]: - """Return the set of root unreconciled ancestors of the given asset partition, i.e. the set - of ancestors of this asset partition whose parents have been updated more recently than - they have. + """Return the set of assets that are ancestors of the given asset partition and have parents + that have been updated more recently than they have. + + If two ancestors would be returned, but one of them is an ancestor of the other one, then + only the most upstream ancestor is included. """ - if asset_partition in self._root_unreconciled_ancestors_cache: - return self._root_unreconciled_ancestors_cache[asset_partition] + if asset_partition in self._outdated_ancestors_cache: + return self._outdated_ancestors_cache[asset_partition] if self.asset_graph.is_source(asset_partition.asset_key): return set() @@ -888,7 +893,7 @@ def get_root_unreconciled_ancestors( for parent in parent_asset_partitions: if ( parent not in visited - and parent not in self._root_unreconciled_ancestors_cache + and parent not in self._outdated_ancestors_cache # do not evaluate self-dependency asset partitions and parent.asset_key != current_partition.asset_key ): @@ -911,7 +916,7 @@ def get_root_unreconciled_ancestors( ).parent_partitions updated_parents: AbstractSet[AssetKeyPartitionKey] = ( - self.get_updated_parent_asset_partitions( + self.get_parent_asset_partitions_updated_after_child( asset_partition=current_partition, parent_asset_partitions=parent_asset_partitions, respect_materialization_data_versions=self._respect_materialization_data_versions, @@ -920,17 +925,11 @@ def get_root_unreconciled_ancestors( ) ) - root_unreconciled_ancestors = ( - {current_partition.asset_key} if updated_parents else set() - ) + outdated_ancestors = {current_partition.asset_key} if updated_parents else set() for parent in set(parent_asset_partitions) - updated_parents: - root_unreconciled_ancestors.update( - self._root_unreconciled_ancestors_cache.get(parent, set()) - ) + outdated_ancestors.update(self._outdated_ancestors_cache.get(parent, set())) - self._root_unreconciled_ancestors_cache[current_partition] = ( - root_unreconciled_ancestors - ) + self._outdated_ancestors_cache[current_partition] = outdated_ancestors - return self._root_unreconciled_ancestors_cache[asset_partition] + return self._outdated_ancestors_cache[asset_partition]