diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition.py b/python_modules/dagster/dagster/_core/definitions/asset_condition.py index 55ca041c55058..6e5c4cd0f3314 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition.py @@ -357,16 +357,12 @@ def evaluate( context.root_context.daemon_context._verbose_log_fn( # noqa f"Evaluating rule: {self.rule.to_snapshot()}" ) - true_subset, subsets_with_metadata, extra_value = self.rule.evaluate_for_asset(context) + evaluation_result = self.rule.evaluate_for_asset(context) context.root_context.daemon_context._verbose_log_fn( # noqa - f"Rule returned {true_subset.size} partitions" f"{true_subset}" - ) - return AssetConditionEvaluationResult.create( - context=context, - true_subset=true_subset, - subsets_with_metadata=subsets_with_metadata, - extra_value=extra_value, + f"Rule returned {evaluation_result.true_subset.size} partitions:" + f"{evaluation_result.true_subset}" ) + return evaluation_result class AndAssetCondition( diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py index 531a8efd435ef..0b7ebf7896597 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py @@ -1,11 +1,24 @@ import dataclasses import datetime import functools +import operator +from collections import defaultdict from dataclasses import dataclass -from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Mapping, Optional, Sequence, Tuple +from typing import ( + TYPE_CHECKING, + AbstractSet, + Any, + Callable, + FrozenSet, + Mapping, + Optional, + Sequence, + Tuple, +) from dagster._core.definitions.data_time import CachingDataTimeResolver from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey +from dagster._core.definitions.metadata import MetadataValue from dagster._core.definitions.partition import PartitionsDefinition from dagster._core.definitions.partition_mapping import IdentityPartitionMapping from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping @@ -290,5 +303,54 @@ def will_update_asset_partition(self, asset_partition: AssetKeyPartitionKey) -> return False return asset_partition in parent_evaluation.true_subset + def add_evaluation_data_from_previous_tick( + self, + asset_partitions_by_frozen_metadata: Mapping[ + FrozenSet[Tuple[str, MetadataValue]], AbstractSet[AssetKeyPartitionKey] + ], + ignore_subset: AssetSubset, + ) -> Tuple[AssetSubset, Sequence["AssetSubsetWithMetadata"]]: + """Combines information calculated on this tick with information from the previous tick, + returning a tuple of the combined true subset and the combined subsets with metadata. + + Args: + asset_partitions_by_frozen_metadata: A mapping from metadata to the set of asset + partitions that the rule applies to. + ignore_subset: An AssetSubset which represents information that we should *not* carry + forward from the previous tick. + """ + from .asset_condition import AssetSubsetWithMetadata + + mapping = defaultdict(lambda: self.empty_subset()) + for frozen_metadata, asset_partitions in asset_partitions_by_frozen_metadata.items(): + mapping[frozen_metadata] = AssetSubset.from_asset_partitions_set( + self.asset_key, self.partitions_def, asset_partitions + ) + + # get the set of all things we have metadata for + has_new_metadata_subset = functools.reduce( + operator.or_, mapping.values(), self.empty_subset() + ) + + # don't use information from the previous tick if we have explicit metadata for it or + # we've explicitly said to ignore it + ignore_subset = has_new_metadata_subset | ignore_subset + + for elt in self.previous_subsets_with_metadata: + carry_forward_subset = elt.subset - ignore_subset + if carry_forward_subset.size > 0: + mapping[elt.frozen_metadata] |= carry_forward_subset + + # for now, an asset is in the "true" subset if and only if we have some metadata for it + true_subset = functools.reduce(operator.or_, mapping.values(), self.empty_subset()) + + return ( + self.candidate_subset & true_subset, + [ + AssetSubsetWithMetadata(subset, dict(metadata)) + for metadata, subset in mapping.items() + ], + ) + def empty_subset(self) -> AssetSubset: return AssetSubset.empty(self.asset_key, self.partitions_def) diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 9eac479d7d0d1..1167fc69abdf8 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -1,9 +1,8 @@ import datetime -import operator from abc import ABC, abstractmethod, abstractproperty from collections import defaultdict -from functools import reduce from typing import ( + TYPE_CHECKING, AbstractSet, Dict, Iterable, @@ -21,10 +20,8 @@ from dagster._core.definitions.asset_subset import AssetSubset from dagster._core.definitions.auto_materialize_rule_evaluation import ( AutoMaterializeDecisionType, - AutoMaterializeRuleEvaluationData, AutoMaterializeRuleSnapshot, ParentUpdatedRuleEvaluationData, - RuleEvaluationResults, WaitingOnAssetsRuleEvaluationData, ) from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey @@ -50,6 +47,9 @@ from .asset_condition_evaluation_context import AssetConditionEvaluationContext from .asset_graph import sort_key_for_asset_partition +if TYPE_CHECKING: + from dagster._core.definitions.asset_condition import AssetConditionEvaluationResult + class AutoMaterializeRule(ABC): """An AutoMaterializeRule defines a bit of logic which helps determine if a materialization @@ -75,61 +75,10 @@ def description(self) -> str: """ ... - def add_evaluation_data_from_previous_tick( - self, - context: AssetConditionEvaluationContext, - asset_partitions_by_evaluation_data: Mapping[ - AutoMaterializeRuleEvaluationData, Set[AssetKeyPartitionKey] - ], - ignore_subset: AssetSubset, - ) -> RuleEvaluationResults: - """Combines evaluation data calculated on this tick with evaluation data calculated on the - previous tick. - - Args: - context: The current RuleEvaluationContext. - asset_partitions_by_evaluation_data: A mapping from evaluation data to the set of asset - partitions that the rule applies to. - ignore_subset: An AssetSubset which represents information that we should *not* carry - forward from the previous tick. - """ - from .asset_condition import AssetSubsetWithMetadata - - mapping = defaultdict(lambda: context.empty_subset()) - for evaluation_data, asset_partitions in asset_partitions_by_evaluation_data.items(): - mapping[ - frozenset(evaluation_data.metadata.items()) - ] = AssetSubset.from_asset_partitions_set( - context.asset_key, context.partitions_def, asset_partitions - ) - - # get the set of all things we have metadata for - has_metadata_subset = context.empty_subset() - for evaluation_data, subset in mapping.items(): - has_metadata_subset |= subset - - # don't use information from the previous tick if we have explicit metadata for it or - # we've explicitly said to ignore it - ignore_subset = has_metadata_subset | ignore_subset - - for elt in context.previous_subsets_with_metadata: - carry_forward_subset = elt.subset - ignore_subset - if carry_forward_subset.size > 0: - mapping[elt.frozen_metadata] |= carry_forward_subset - - # for now, an asset is in the "true" subset if and only if we have some metadata for it - true_subset = reduce(operator.or_, mapping.values(), context.empty_subset()) - return ( - context.candidate_subset & true_subset, - [ - AssetSubsetWithMetadata(subset, dict(metadata)) - for metadata, subset in mapping.items() - ], - {}, - ) - @abstractmethod - def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: + def evaluate_for_asset( + self, context: AssetConditionEvaluationContext + ) -> "AssetConditionEvaluationResult": """The core evaluation function for the rule. This function takes in a context object and returns a mapping from evaluated rules to the set of asset partitions that the rule applies to. @@ -288,8 +237,15 @@ def decision_type(self) -> AutoMaterializeDecisionType: def description(self) -> str: return "required to meet this or downstream asset's freshness policy" - def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: - return freshness_evaluation_results_for_asset_key(context.root_context) + def evaluate_for_asset( + self, context: AssetConditionEvaluationContext + ) -> "AssetConditionEvaluationResult": + from .asset_condition import AssetConditionEvaluationResult + + true_subset, subsets_with_metadata = freshness_evaluation_results_for_asset_key( + context.root_context + ) + return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -395,7 +351,11 @@ def get_new_asset_partitions_to_request( for time_partition_key in missed_time_partition_keys } - def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: + def evaluate_for_asset( + self, context: AssetConditionEvaluationContext + ) -> "AssetConditionEvaluationResult": + from .asset_condition import AssetConditionEvaluationResult + new_asset_partitions_to_request = self.get_new_asset_partitions_to_request(context) asset_subset_to_request = AssetSubset.from_asset_partitions_set( context.asset_key, context.partitions_def, new_asset_partitions_to_request @@ -404,7 +364,7 @@ def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEv - context.materialized_requested_or_discarded_since_previous_tick_subset ) - return asset_subset_to_request, [], {} + return AssetConditionEvaluationResult.create(context, true_subset=asset_subset_to_request) @whitelist_for_serdes @@ -509,10 +469,14 @@ def description(self) -> str: else: return base - def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: + def evaluate_for_asset( + self, context: AssetConditionEvaluationContext + ) -> "AssetConditionEvaluationResult": """Evaluates the set of asset partitions of this asset whose parents have been updated, or will update on this tick. """ + from .asset_condition import AssetConditionEvaluationResult + asset_partitions_by_updated_parents: Mapping[ AssetKeyPartitionKey, Set[AssetKeyPartitionKey] ] = defaultdict(set) @@ -590,14 +554,14 @@ def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEv will_update_asset_keys=frozenset( will_update_parent_assets_by_asset_partition.get(asset_partition, []) ), - ) + ).frozen_metadata ].add(asset_partition) - return self.add_evaluation_data_from_previous_tick( - context, + true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( asset_partitions_by_evaluation_data, ignore_subset=context.materialized_requested_or_discarded_since_previous_tick_subset, ) + return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -637,10 +601,14 @@ def get_handled_subset(self, context: AssetConditionEvaluationContext) -> AssetS | context.materialized_since_previous_tick_subset ) - def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: + def evaluate_for_asset( + self, context: AssetConditionEvaluationContext + ) -> "AssetConditionEvaluationResult": """Evaluates the set of asset partitions for this asset which are missing and were not previously discarded. """ + from .asset_condition import AssetConditionEvaluationResult + handled_subset = self.get_handled_subset(context) unhandled_candidates = ( context.candidate_subset @@ -650,7 +618,14 @@ def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEv if handled_subset.size > 0 else context.candidate_subset ) - return (unhandled_candidates, [], handled_subset) + + return AssetConditionEvaluationResult.create( + context, + true_subset=unhandled_candidates, + # we keep track of the handled subset instead of the unhandled subset because new + # partitions may spontaneously jump into existence at any time + extra_value=handled_subset, + ) @whitelist_for_serdes @@ -663,7 +638,11 @@ def decision_type(self) -> AutoMaterializeDecisionType: def description(self) -> str: return "waiting on upstream data to be up to date" - def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: + def evaluate_for_asset( + self, context: AssetConditionEvaluationContext + ) -> "AssetConditionEvaluationResult": + from .asset_condition import AssetConditionEvaluationResult + asset_partitions_by_evaluation_data = defaultdict(set) # only need to evaluate net-new candidates and candidates whose parents have changed @@ -686,14 +665,13 @@ def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEv ) if outdated_ancestors: asset_partitions_by_evaluation_data[ - WaitingOnAssetsRuleEvaluationData(frozenset(outdated_ancestors)) + WaitingOnAssetsRuleEvaluationData(frozenset(outdated_ancestors)).frozen_metadata ].add(candidate) - return self.add_evaluation_data_from_previous_tick( - context, - asset_partitions_by_evaluation_data, - ignore_subset=subset_to_evaluate, + true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( + asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) + return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -709,7 +687,9 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext, - ) -> RuleEvaluationResults: + ) -> "AssetConditionEvaluationResult": + from .asset_condition import AssetConditionEvaluationResult + asset_partitions_by_evaluation_data = defaultdict(set) # only need to evaluate net-new candidates and candidates whose parents have changed @@ -733,14 +713,15 @@ def evaluate_for_asset( missing_parent_asset_keys.add(parent.asset_key) if missing_parent_asset_keys: asset_partitions_by_evaluation_data[ - WaitingOnAssetsRuleEvaluationData(frozenset(missing_parent_asset_keys)) + WaitingOnAssetsRuleEvaluationData( + frozenset(missing_parent_asset_keys) + ).frozen_metadata ].add(candidate) - return self.add_evaluation_data_from_previous_tick( - context, - asset_partitions_by_evaluation_data, - ignore_subset=subset_to_evaluate, + true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( + asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) + return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -776,7 +757,9 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext, - ) -> RuleEvaluationResults: + ) -> "AssetConditionEvaluationResult": + from .asset_condition import AssetConditionEvaluationResult + asset_partitions_by_evaluation_data = defaultdict(set) # only need to evaluate net-new candidates and candidates whose parents have changed @@ -819,14 +802,15 @@ def evaluate_for_asset( if non_updated_parent_keys: asset_partitions_by_evaluation_data[ - WaitingOnAssetsRuleEvaluationData(frozenset(non_updated_parent_keys)) + WaitingOnAssetsRuleEvaluationData( + frozenset(non_updated_parent_keys) + ).frozen_metadata ].add(candidate) - return self.add_evaluation_data_from_previous_tick( - context, - asset_partitions_by_evaluation_data, - ignore_subset=subset_to_evaluate, + true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( + asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) + return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -841,7 +825,11 @@ def decision_type(self) -> AutoMaterializeDecisionType: def description(self) -> str: return "required parent partitions do not exist" - def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: + def evaluate_for_asset( + self, context: AssetConditionEvaluationContext + ) -> "AssetConditionEvaluationResult": + from .asset_condition import AssetConditionEvaluationResult + asset_partitions_by_evaluation_data = defaultdict(set) subset_to_evaluate = ( @@ -859,14 +847,15 @@ def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEv nonexistent_parent_keys = {parent.asset_key for parent in nonexistent_parent_partitions} if nonexistent_parent_keys: asset_partitions_by_evaluation_data[ - WaitingOnAssetsRuleEvaluationData(frozenset(nonexistent_parent_keys)) + WaitingOnAssetsRuleEvaluationData( + frozenset(nonexistent_parent_keys) + ).frozen_metadata ].add(candidate) - return self.add_evaluation_data_from_previous_tick( - context, - asset_partitions_by_evaluation_data, - ignore_subset=subset_to_evaluate, + true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( + asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) + return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -885,20 +874,23 @@ def description(self) -> str: else: return "targeted by an in-progress backfill" - def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: + def evaluate_for_asset( + self, context: AssetConditionEvaluationContext + ) -> "AssetConditionEvaluationResult": + from .asset_condition import AssetConditionEvaluationResult + backfilling_subset = ( context.instance_queryer.get_active_backfill_target_asset_graph_subset() ).get_asset_subset(context.asset_key, context.asset_graph) if backfilling_subset.size == 0: - return context.empty_subset(), [], {} - - if self.all_partitions: + true_subset = context.empty_subset() + elif self.all_partitions: true_subset = context.candidate_subset else: true_subset = context.candidate_subset & backfilling_subset - return true_subset, [], {} + return AssetConditionEvaluationResult.create(context, true_subset) @whitelist_for_serdes @@ -913,7 +905,11 @@ def decision_type(self) -> AutoMaterializeDecisionType: def description(self) -> str: return f"exceeds {self.limit} materialization(s) per minute" - def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: + def evaluate_for_asset( + self, context: AssetConditionEvaluationContext + ) -> "AssetConditionEvaluationResult": + from .asset_condition import AssetConditionEvaluationResult + # the set of asset partitions which exceed the limit rate_limited_asset_partitions = set( sorted( @@ -922,10 +918,9 @@ def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEv )[self.limit :] ) - return ( + return AssetConditionEvaluationResult.create( + context, AssetSubset.from_asset_partitions_set( context.asset_key, context.partitions_def, rate_limited_asset_partitions ), - [], - {}, ) diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py index ef4435f0a1eda..9ba4cf87b0acb 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py @@ -74,6 +74,10 @@ class AutoMaterializeRuleEvaluationData(ABC): def metadata(self) -> MetadataMapping: raise NotImplementedError() + @property + def frozen_metadata(self) -> FrozenSet[Tuple[str, MetadataValue]]: + return frozenset(self.metadata.items()) + @whitelist_for_serdes class TextRuleEvaluationData( diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py index 99ff47c7a4b39..e27eae206e965 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py @@ -8,7 +8,7 @@ maximum lag minutes. """ import datetime -from typing import TYPE_CHECKING, AbstractSet, Optional, Tuple +from typing import TYPE_CHECKING, AbstractSet, Optional, Sequence, Tuple import pendulum @@ -18,8 +18,9 @@ from dagster._utils.schedules import cron_string_iterator if TYPE_CHECKING: + from .asset_condition import AssetSubsetWithMetadata from .asset_condition_evaluation_context import AssetConditionEvaluationContext - from .auto_materialize_rule_evaluation import RuleEvaluationResults, TextRuleEvaluationData + from .auto_materialize_rule_evaluation import TextRuleEvaluationData def get_execution_period_for_policy( @@ -154,7 +155,7 @@ def get_expected_data_time_for_asset_key( def freshness_evaluation_results_for_asset_key( context: "AssetConditionEvaluationContext", -) -> "RuleEvaluationResults": +) -> Tuple[AssetSubset, Sequence["AssetSubsetWithMetadata"]]: """Returns a set of AssetKeyPartitionKeys to materialize in order to abide by the given FreshnessPolicies. @@ -168,7 +169,7 @@ def freshness_evaluation_results_for_asset_key( if not context.asset_graph.get_downstream_freshness_policies( asset_key=asset_key ) or context.asset_graph.is_partitioned(asset_key): - return context.empty_subset(), [], {} + return context.empty_subset(), [] # figure out the current contents of this asset current_data_time = context.data_time_resolver.get_current_data_time(asset_key, current_time) @@ -181,7 +182,7 @@ def freshness_evaluation_results_for_asset_key( # if executing the asset on this tick would not change its data time, then return if current_data_time == expected_data_time: - return context.empty_subset(), [], {} + return context.empty_subset(), [] # calculate the data times you would expect after all currently-executing runs # were to successfully complete @@ -223,7 +224,6 @@ def freshness_evaluation_results_for_asset_key( return ( AssetSubset.all(asset_key, None), [AssetSubsetWithMetadata(all_subset, evaluation_data.metadata)], - {}, ) else: - return context.empty_subset(), [], {} + return context.empty_subset(), []