From 3037dfd2a24a11f710cd67faacd0df74ad97f182 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Fri, 8 Dec 2023 13:35:14 -0800 Subject: [PATCH] RuleEvaluationData -> SubsetWithMetadata --- .../asset_automation_condition_context.py | 13 +- .../definitions/asset_automation_evaluator.py | 228 +++++++++++------- .../_core/definitions/asset_daemon_cursor.py | 5 +- .../definitions/auto_materialize_policy.py | 30 ++- .../definitions/auto_materialize_rule.py | 122 ++++++---- .../auto_materialize_rule_evaluation.py | 92 ++++++- .../freshness_based_auto_materialize.py | 14 +- 7 files changed, 326 insertions(+), 178 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_automation_condition_context.py b/python_modules/dagster/dagster/_core/definitions/asset_automation_condition_context.py index 60a3385f9c4bb..67d83eaac8582 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_automation_condition_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_automation_condition_context.py @@ -1,9 +1,8 @@ import datetime import functools from dataclasses import dataclass -from typing import TYPE_CHECKING, AbstractSet, Mapping, Optional +from typing import TYPE_CHECKING, AbstractSet, Mapping, Optional, Sequence -from dagster._core.definitions.auto_materialize_rule_evaluation import RuleEvaluationResults from dagster._core.definitions.data_time import CachingDataTimeResolver from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey from dagster._core.definitions.partition import PartitionsDefinition @@ -16,6 +15,8 @@ from .asset_subset import AssetSubset if TYPE_CHECKING: + from dagster._core.definitions.asset_automation_evaluator import AssetSubsetWithMetdata + from .asset_automation_evaluator import AutomationCondition, ConditionEvaluation from .asset_daemon_context import AssetDaemonContext @@ -101,7 +102,7 @@ def materialized_requested_or_discarded_since_previous_tick_subset(self) -> Asse return ( self.materialized_since_previous_tick_subset | self.latest_evaluation.true_subset - | (self.latest_evaluation.discard_subset or self.empty_subset()) + | (self.latest_evaluation.discard_subset(self.root_condition) or self.empty_subset()) ) @functools.cached_property @@ -200,7 +201,7 @@ def get_new_asset_cursor(self, evaluation: "ConditionEvaluation") -> AssetDaemon previous_handled_subset | self.materialized_requested_or_discarded_since_previous_tick_subset | evaluation.true_subset - | (evaluation.discard_subset or self.empty_subset()) + | (evaluation.discard_subset(self.root_condition) or self.empty_subset()) ) return AssetDaemonAssetCursor( asset_key=self.asset_key, @@ -299,9 +300,9 @@ def materialized_requested_or_discarded_since_previous_tick_subset(self) -> Asse return self.asset_context.materialized_requested_or_discarded_since_previous_tick_subset @property - def previous_tick_results(self) -> RuleEvaluationResults: + def previous_tick_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetdata"]: """Returns the RuleEvaluationResults calculated on the previous tick for this condition.""" - return self.latest_evaluation.results if self.latest_evaluation else [] + return self.latest_evaluation.subsets_with_metadata if self.latest_evaluation else [] def empty_subset(self) -> AssetSubset: return self.asset_context.empty_subset() diff --git a/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py b/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py index 6fad476eea6b8..393ff582408be 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py @@ -1,29 +1,39 @@ -import dataclasses import functools import hashlib from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, AbstractSet, List, NamedTuple, Optional, Sequence, Tuple +from typing import ( + TYPE_CHECKING, + AbstractSet, + FrozenSet, + List, + NamedTuple, + Optional, + Sequence, + Tuple, +) import dagster._check as check from dagster._core.definitions.asset_daemon_cursor import AssetDaemonAssetCursor from dagster._core.definitions.asset_graph import AssetGraph +from dagster._core.definitions.auto_materialize_rule_evaluation import ( + AutoMaterializeAssetEvaluation, + AutoMaterializeDecisionType, + AutoMaterializeRuleEvaluation, + AutoMaterializeRuleEvaluationData, +) from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey +from dagster._core.definitions.metadata import MetadataMapping, MetadataValue from .asset_automation_condition_context import ( AssetAutomationConditionEvaluationContext, AssetAutomationEvaluationContext, ) from .asset_subset import AssetSubset -from .auto_materialize_rule_evaluation import ( - AutoMaterializeAssetEvaluation, - AutoMaterializeDecisionType, - AutoMaterializeRuleEvaluation, -) if TYPE_CHECKING: from dagster._utils.caching_instance_queryer import CachingInstanceQueryer - from .auto_materialize_rule import AutoMaterializeRule, RuleEvaluationResults + from .auto_materialize_rule import AutoMaterializeRule class AutomationConditionNodeSnapshot(NamedTuple): @@ -41,22 +51,26 @@ def hash(self) -> str: ).hexdigest() +class AssetSubsetWithMetdata(NamedTuple): + """An asset subset with metadata that corresponds to it.""" + + subset: AssetSubset + metadata: MetadataMapping + + @property + def frozen_metadata(self) -> FrozenSet[Tuple[str, MetadataValue]]: + return frozenset(self.metadata.items()) + + class ConditionEvaluation(NamedTuple): """Internal representation of the results of evaluating a node in the evaluation tree.""" condition_snapshot: AutomationConditionNodeSnapshot true_subset: AssetSubset candidate_subset: AssetSubset - - results: "RuleEvaluationResults" = [] + subsets_with_metadata: Sequence[AssetSubsetWithMetdata] = [] child_evaluations: Sequence["ConditionEvaluation"] = [] - # backcompat until we remove the discard concept - discard_subset: Optional[AssetSubset] = None - discard_results: Sequence[ - Tuple[AutoMaterializeRuleEvaluation, AbstractSet[AssetKeyPartitionKey]] - ] = [] - def all_results( self, condition: "AutomationCondition" ) -> Sequence[Tuple[AutoMaterializeRuleEvaluation, AbstractSet[AssetKeyPartitionKey]]]: @@ -64,22 +78,62 @@ def all_results( of the system understand. """ if isinstance(condition, RuleCondition): - results = [ - ( - AutoMaterializeRuleEvaluation( - rule_snapshot=condition.rule.to_snapshot(), - evaluation_data=evaluation_data, - ), - subset, + if self.subsets_with_metadata: + results = [ + ( + AutoMaterializeRuleEvaluation( + rule_snapshot=condition.rule.to_snapshot(), + evaluation_data=AutoMaterializeRuleEvaluationData.from_metadata( + elt.metadata + ), + ), + elt.subset.asset_partitions, + ) + for elt in self.subsets_with_metadata + ] + else: + # if not provided specific metadata, just use the true subset + asset_partitions = self.true_subset.asset_partitions + results = ( + [ + ( + AutoMaterializeRuleEvaluation( + rule_snapshot=condition.rule.to_snapshot(), evaluation_data=None + ), + asset_partitions, + ) + ] + if asset_partitions + else [] ) - for evaluation_data, subset in self.results - ] else: results = [] for i, child in enumerate(self.child_evaluations): results = [*results, *child.all_results(condition.children[i])] return results + def skip_subset_size(self, condition: "AutomationCondition") -> int: + # backcompat way to calculate the set of skipped partitions for legacy policies + if not condition.is_legacy: + return 0 + + not_skip_evaluation = self.child_evaluations[1] + skip_evaluation = not_skip_evaluation.child_evaluations[0] + return skip_evaluation.true_subset.size + + def discard_subset(self, condition: "AutomationCondition") -> Optional[AssetSubset]: + not_discard_condition = condition.not_discard_condition + if not not_discard_condition or len(self.child_evaluations) != 3: + return None + + not_discard_evaluation = self.child_evaluations[2] + discard_evaluation = not_discard_evaluation.child_evaluations[0] + return discard_evaluation.true_subset + + def discard_subset_size(self, condition: "AutomationCondition") -> int: + discard_subset = self.discard_subset(condition) + return discard_subset.size if discard_subset else 0 + def for_child(self, child_condition: "AutomationCondition") -> Optional["ConditionEvaluation"]: """Returns the evaluation of a given child condition by finding the child evaluation that has an identical hash to the given condition. @@ -105,30 +159,14 @@ def to_evaluation( .to_auto_materialize_policy_evaluator() .condition ) - # backcompat way to calculate the set of skipped partitions for legacy policies - if condition.is_legacy and len(self.child_evaluations) == 2: - # the first child is the materialize condition, the second child is the negation of - # the skip condition - _, nor_skip_evaluation = self.child_evaluations - skip_evaluation = nor_skip_evaluation.child_evaluations[0] - skipped_subset_size = skip_evaluation.true_subset.size - else: - skipped_subset_size = 0 - - discard_subset = self.discard_subset or AssetSubset.empty( - asset_key, asset_graph.get_partitions_def(asset_key) - ) return AutoMaterializeAssetEvaluation.from_rule_evaluation_results( asset_key=asset_key, asset_graph=asset_graph, - asset_partitions_by_rule_evaluation=[ - *self.all_results(condition), - *self.discard_results, - ], - num_requested=(self.true_subset - discard_subset).size, - num_skipped=skipped_subset_size, - num_discarded=discard_subset.size, + asset_partitions_by_rule_evaluation=self.all_results(condition), + num_requested=self.true_subset.size, + num_skipped=self.skip_subset_size(condition), + num_discarded=self.discard_subset_size(condition), dynamic_partitions_store=instance_queryer, ) @@ -140,15 +178,17 @@ def from_evaluation_and_rule( ) -> "ConditionEvaluation": asset_key = evaluation.asset_key partitions_def = asset_graph.get_partitions_def(asset_key) - empty_subset = AssetSubset.empty(asset_key, partitions_def) + + true_subset, subsets_with_metadata = evaluation.get_rule_evaluation_results( + rule.to_snapshot(), asset_graph + ) return ConditionEvaluation( condition_snapshot=RuleCondition(rule=rule).snapshot, - true_subset=empty_subset, - candidate_subset=empty_subset + true_subset=true_subset, + candidate_subset=AssetSubset.empty(asset_key, partitions_def) if rule.decision_type == AutoMaterializeDecisionType.MATERIALIZE else evaluation.get_evaluated_subset(asset_graph), - discard_subset=empty_subset, - results=evaluation.get_rule_evaluation_results(rule.to_snapshot(), asset_graph), + subsets_with_metadata=subsets_with_metadata, ) @staticmethod @@ -167,7 +207,8 @@ def from_evaluation( partitions_def = asset_graph.get_partitions_def(asset_key) empty_subset = AssetSubset.empty(asset_key, partitions_def) - materialize_condition, skip_condition = condition.children + materialize_condition, not_skip_condition = condition.children[:2] + skip_condition = not_skip_condition.children[0] materialize_rules = [ materialize_condition.rule for materialize_condition in materialize_condition.children @@ -191,19 +232,43 @@ def from_evaluation( ], ), ConditionEvaluation( - condition_snapshot=skip_condition.snapshot, + condition_snapshot=not_skip_condition.snapshot, true_subset=empty_subset, candidate_subset=empty_subset, child_evaluations=[ - ConditionEvaluation.from_evaluation_and_rule(evaluation, asset_graph, rule) - for rule in skip_rules + ConditionEvaluation( + condition_snapshot=skip_condition.snapshot, + true_subset=empty_subset, + candidate_subset=empty_subset, + child_evaluations=[ + ConditionEvaluation.from_evaluation_and_rule( + evaluation, asset_graph, rule + ) + for rule in skip_rules + ], + ) ], ), ] + if condition.not_discard_condition: + discard_condition = condition.not_discard_condition.children[0] + if isinstance(discard_condition, RuleCondition): + children.append( + ConditionEvaluation( + condition_snapshot=condition.not_discard_condition.snapshot, + true_subset=empty_subset, + candidate_subset=empty_subset, + child_evaluations=[ + ConditionEvaluation.from_evaluation_and_rule( + evaluation, asset_graph, discard_condition.rule + ) + ], + ) + ) + return ConditionEvaluation( condition_snapshot=condition.snapshot, true_subset=evaluation.get_requested_subset(asset_graph), - discard_subset=evaluation.get_discarded_subset(asset_graph), candidate_subset=empty_subset, child_evaluations=children, ) @@ -241,15 +306,27 @@ def is_legacy(self) -> bool: """ return ( isinstance(self, AndAutomationCondition) - and len(self.children) == 2 + and len(self.children) in {2, 3} and isinstance(self.children[0], OrAutomationCondition) and isinstance(self.children[1], NotAutomationCondition) + # the third child is the discard condition, which is optional + and (len(self.children) == 2 or isinstance(self.children[2], NotAutomationCondition)) ) @property def children(self) -> Sequence["AutomationCondition"]: return [] + @property + def indexed_children(self) -> Sequence[Tuple[int, "AutomationCondition"]]: + return list(enumerate(self.children)) + + @property + def not_discard_condition(self) -> Optional["AutomationCondition"]: + if not self.is_legacy or not len(self.children) == 3: + return None + return self.children[-1] + @functools.cached_property def snapshot(self) -> AutomationConditionNodeSnapshot: """Returns a snapshot of this condition that can be used for serialization.""" @@ -270,12 +347,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit context.asset_context.daemon_context._verbose_log_fn( # noqa f"Evaluating rule: {self.rule.to_snapshot()}" ) - results = self.rule.evaluate_for_asset(context) - true_subset = context.empty_subset() - for _, asset_partitions in results: - true_subset |= AssetSubset.from_asset_partitions_set( - context.asset_key, context.partitions_def, asset_partitions - ) + true_subset, subsets_with_metadata = self.rule.evaluate_for_asset(context) context.asset_context.daemon_context._verbose_log_fn( # noqa f"Rule returned {true_subset.size} partitions" ) @@ -283,7 +355,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, - results=results, + subsets_with_metadata=subsets_with_metadata, ) @@ -368,7 +440,6 @@ class AssetAutomationEvaluator(NamedTuple): """ condition: AutomationCondition - max_materializations_per_minute: Optional[int] = 1 def evaluate( self, context: AssetAutomationEvaluationContext @@ -382,34 +453,7 @@ def evaluate( are in the format `(a | b | ...) & ~(c | d | ...). - A new AssetDaemonAssetCursor that represents the state of the world after this evaluation. """ - from .auto_materialize_rule import DiscardOnMaxMaterializationsExceededRule - condition_context = context.get_root_condition_context() condition_evaluation = self.condition.evaluate(condition_context) - # this is treated separately from other rules, for now - discard_subset = context.empty_subset() - discard_results = [] - if self.max_materializations_per_minute is not None: - discard_context = dataclasses.replace( - condition_context, candidate_subset=condition_evaluation.true_subset - ) - discard_rule = DiscardOnMaxMaterializationsExceededRule( - limit=self.max_materializations_per_minute - ) - condition = RuleCondition(discard_rule) - discard_condition_evaluation = condition.evaluate(discard_context) - discard_subset = discard_condition_evaluation.true_subset - discard_results = [ - (AutoMaterializeRuleEvaluation(discard_rule.to_snapshot(), evaluation_data), aps) - for evaluation_data, aps in discard_condition_evaluation.results - ] - - return ( - condition_evaluation._replace( - true_subset=condition_evaluation.true_subset - discard_subset, - discard_subset=discard_subset, - discard_results=discard_results, - ), - context.get_new_asset_cursor(evaluation=condition_evaluation), - ) + return condition_evaluation, context.get_new_asset_cursor(evaluation=condition_evaluation) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index 8448ff35a499f..64918c2a94be8 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -20,12 +20,13 @@ ) from dagster._serdes.serdes import deserialize_value, serialize_value -if TYPE_CHECKING: - from .asset_automation_evaluator import ConditionEvaluation from .asset_graph import AssetGraph from .asset_subset import AssetSubset from .partition import PartitionsSubset +if TYPE_CHECKING: + from .asset_automation_evaluator import ConditionEvaluation + class AssetDaemonAssetCursor(NamedTuple): """Convenience class to represent the state of an individual asset being handled by the daemon. diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py index 80a9581d234d0..b5aac93e1cc22 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py @@ -262,19 +262,29 @@ def to_auto_materialize_policy_evaluator(self) -> "AssetAutomationEvaluator": OrAutomationCondition, RuleCondition, ) + from .auto_materialize_rule import DiscardOnMaxMaterializationsExceededRule materialize_condition = OrAutomationCondition( - children=[RuleCondition(rule) for rule in self.materialize_rules] + children=[ + RuleCondition(rule) + for rule in sorted(self.materialize_rules, key=lambda rule: rule.description) + ] ) skip_condition = OrAutomationCondition( - children=[RuleCondition(rule) for rule in self.skip_rules] + children=[ + RuleCondition(rule) + for rule in sorted(self.skip_rules, key=lambda rule: rule.description) + ] ) + children = [ + materialize_condition, + NotAutomationCondition([skip_condition]), + ] + if self.max_materializations_per_minute: + discard_condition = RuleCondition( + DiscardOnMaxMaterializationsExceededRule(self.max_materializations_per_minute) + ) + children.append(NotAutomationCondition([discard_condition])) - # results in an expression of the form (m1 | m2 | ... | mn) & ~(s1 | s2 | ... | sn) - condition = AndAutomationCondition( - children=[materialize_condition, NotAutomationCondition([skip_condition])] - ) - return AssetAutomationEvaluator( - condition=condition, - max_materializations_per_minute=self.max_materializations_per_minute, - ) + # results in an expression of the form (m1 | m2 | ... | mn) & ~(s1 | s2 | ... | sn) & ~d + return AssetAutomationEvaluator(condition=AndAutomationCondition(children)) diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 594fffa4bd631..6a2debaab8239 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -1,9 +1,10 @@ import datetime +import operator from abc import ABC, abstractmethod, abstractproperty from collections import defaultdict +from functools import reduce from typing import ( AbstractSet, - Callable, Dict, Iterable, Mapping, @@ -17,6 +18,7 @@ import dagster._check as check from dagster._annotations import experimental, public +from dagster._core.definitions.asset_subset import AssetSubset from dagster._core.definitions.auto_materialize_rule_evaluation import ( AutoMaterializeDecisionType, AutoMaterializeRuleEvaluationData, @@ -74,32 +76,53 @@ def add_evaluation_data_from_previous_tick( self, context: AssetAutomationConditionEvaluationContext, asset_partitions_by_evaluation_data: Mapping[ - Optional[AutoMaterializeRuleEvaluationData], Set[AssetKeyPartitionKey] + AutoMaterializeRuleEvaluationData, Set[AssetKeyPartitionKey] ], - should_use_past_data_fn: Callable[[AssetKeyPartitionKey], bool], - ) -> "RuleEvaluationResults": - """Combines a given set of evaluation data with evaluation data from the previous tick. The - returned value will include the union of the evaluation data contained within - `asset_partitions_by_evaluation_data` and the evaluation data calculated for asset - partitions on the previous tick for which `should_use_past_data_fn` evaluates to `True`. + ignore_subset: AssetSubset, + ) -> RuleEvaluationResults: + """Combines evaluation data calculated on this tick with evaluation data calculated on the + previous tick. Args: context: The current RuleEvaluationContext. asset_partitions_by_evaluation_data: A mapping from evaluation data to the set of asset partitions that the rule applies to. - should_use_past_data_fn: A function that returns whether a given asset partition from the - previous tick should be included in the results of this tick. + ignore_subset: An AssetSubset which represents information that we should *not* carry + forward from the previous tick. """ - asset_partitions_by_evaluation_data = defaultdict(set, asset_partitions_by_evaluation_data) - evaluated_asset_partitions = set().union(*asset_partitions_by_evaluation_data.values()) - for evaluation_data, asset_partitions in context.previous_tick_results: - for ap in asset_partitions: - # evaluated data from this tick takes precedence over data from the previous tick - if ap in evaluated_asset_partitions: - continue - elif should_use_past_data_fn(ap): - asset_partitions_by_evaluation_data[evaluation_data].add(ap) - return list(asset_partitions_by_evaluation_data.items()) + from .asset_automation_evaluator import AssetSubsetWithMetdata + + mapping = defaultdict(lambda: context.empty_subset()) + for evaluation_data, asset_partitions in asset_partitions_by_evaluation_data.items(): + mapping[ + frozenset(evaluation_data.metadata.items()) + ] = AssetSubset.from_asset_partitions_set( + context.asset_key, context.partitions_def, asset_partitions + ) + + # get the set of all things we have metadata for + has_metadata_subset = context.empty_subset() + for evaluation_data, subset in mapping.items(): + has_metadata_subset |= subset + + # don't use information from the previous tick if we have explicit metadata for it or + # we've explicitly said to ignore it + ignore_subset = has_metadata_subset | ignore_subset + + for elt in context.previous_tick_subsets_with_metadata: + carry_forward_subset = elt.subset - ignore_subset + if carry_forward_subset.size > 0: + mapping[elt.frozen_metadata] |= carry_forward_subset + + # for now, an asset is in the "true" subset if and only if we have some metadata for it + true_subset = reduce(operator.or_, mapping.values(), context.empty_subset()) + return ( + true_subset, + [ + AssetSubsetWithMetdata(subset, dict(metadata)) + for metadata, subset in mapping.items() + ], + ) @abstractmethod def evaluate_for_asset( @@ -309,7 +332,7 @@ def missed_cron_ticks( missed_ticks.append(dt) return missed_ticks - def get_asset_partitions_to_request( + def get_new_asset_partitions_to_request( self, context: AssetAutomationConditionEvaluationContext ) -> AbstractSet[AssetKeyPartitionKey]: missed_ticks = self.missed_cron_ticks(context) @@ -375,18 +398,16 @@ def get_asset_partitions_to_request( def evaluate_for_asset( self, context: AssetAutomationConditionEvaluationContext ) -> RuleEvaluationResults: - asset_partitions_to_request = self.get_asset_partitions_to_request(context) - asset_partitions_by_evaluation_data = defaultdict(set) - if asset_partitions_to_request: - asset_partitions_by_evaluation_data[None].update(asset_partitions_to_request) - - return self.add_evaluation_data_from_previous_tick( - context, - asset_partitions_by_evaluation_data, - should_use_past_data_fn=lambda ap: ap - not in context.materialized_requested_or_discarded_since_previous_tick_subset, + new_asset_partitions_to_request = self.get_new_asset_partitions_to_request(context) + asset_subset_to_request = AssetSubset.from_asset_partitions_set( + context.asset_key, context.partitions_def, new_asset_partitions_to_request + ) | ( + context.previous_tick_true_subset + - context.materialized_requested_or_discarded_since_previous_tick_subset ) + return asset_subset_to_request, [] + @whitelist_for_serdes @experimental @@ -579,8 +600,7 @@ def evaluate_for_asset( return self.add_evaluation_data_from_previous_tick( context, asset_partitions_by_evaluation_data, - should_use_past_data_fn=lambda ap: ap - not in context.materialized_requested_or_discarded_since_previous_tick_subset, + ignore_subset=context.materialized_requested_or_discarded_since_previous_tick_subset, ) @@ -601,8 +621,6 @@ def evaluate_for_asset( previously discarded. Currently only applies to root asset partitions and asset partitions with updated parents. """ - asset_partitions_by_evaluation_data = defaultdict(set) - missing_asset_partitions = set( context.asset_context.never_materialized_requested_or_discarded_root_subset.asset_partitions ) @@ -614,15 +632,14 @@ def evaluate_for_asset( ): missing_asset_partitions |= {candidate} - if missing_asset_partitions: - asset_partitions_by_evaluation_data[None] = missing_asset_partitions - - return self.add_evaluation_data_from_previous_tick( - context, - asset_partitions_by_evaluation_data, - should_use_past_data_fn=lambda ap: ap not in missing_asset_partitions - and ap not in context.materialized_requested_or_discarded_since_previous_tick_subset, + newly_missing_subset = AssetSubset.from_asset_partitions_set( + context.asset_key, context.partitions_def, missing_asset_partitions + ) + missing_subset = newly_missing_subset | ( + context.previous_tick_true_subset + - context.materialized_requested_or_discarded_since_previous_tick_subset ) + return missing_subset, [] @whitelist_for_serdes @@ -668,7 +685,7 @@ def evaluate_for_asset( return self.add_evaluation_data_from_previous_tick( context, asset_partitions_by_evaluation_data, - should_use_past_data_fn=lambda ap: ap not in subset_to_evaluate, + ignore_subset=subset_to_evaluate, ) @@ -717,7 +734,7 @@ def evaluate_for_asset( return self.add_evaluation_data_from_previous_tick( context, asset_partitions_by_evaluation_data, - should_use_past_data_fn=lambda ap: ap not in subset_to_evaluate, + ignore_subset=subset_to_evaluate, ) @@ -803,7 +820,7 @@ def evaluate_for_asset( return self.add_evaluation_data_from_previous_tick( context, asset_partitions_by_evaluation_data, - should_use_past_data_fn=lambda ap: ap not in subset_to_evaluate, + ignore_subset=subset_to_evaluate, ) @@ -845,7 +862,7 @@ def evaluate_for_asset( return self.add_evaluation_data_from_previous_tick( context, asset_partitions_by_evaluation_data, - should_use_past_data_fn=lambda ap: ap not in subset_to_evaluate, + ignore_subset=subset_to_evaluate, ) @@ -873,14 +890,14 @@ def evaluate_for_asset( ).get_asset_subset(context.asset_key, context.asset_context.asset_graph) if backfilling_subset.size == 0: - return [] + return context.empty_subset(), [] if self.all_partitions: true_subset = context.candidate_subset else: true_subset = context.candidate_subset & backfilling_subset - return [(None, true_subset.asset_partitions)] + return true_subset, [] @whitelist_for_serdes @@ -905,6 +922,7 @@ def evaluate_for_asset( key=lambda x: sort_key_for_asset_partition(context.asset_graph, x), )[self.limit :] ) - if rate_limited_asset_partitions: - return [(None, rate_limited_asset_partitions)] - return [] + + return AssetSubset.from_asset_partitions_set( + context.asset_key, context.partitions_def, rate_limited_asset_partitions + ), [] diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py index aceb712324c6d..701d6b988c2fd 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py @@ -1,4 +1,4 @@ -from abc import ABC +from abc import ABC, abstractproperty from enum import Enum from typing import ( TYPE_CHECKING, @@ -16,6 +16,7 @@ import dagster._check as check from dagster._core.definitions.asset_subset import AssetSubset from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey +from dagster._core.definitions.metadata import MetadataMapping, MetadataValue from dagster._serdes.serdes import ( NamedTupleSerializer, UnpackContext, @@ -28,6 +29,7 @@ from .partition import SerializedPartitionsSubset if TYPE_CHECKING: + from dagster._core.definitions.asset_automation_evaluator import AssetSubsetWithMetdata from dagster._core.instance import DynamicPartitionsStore @@ -57,7 +59,45 @@ class AutoMaterializeRuleSnapshot(NamedTuple): class AutoMaterializeRuleEvaluationData(ABC): - pass + @abstractproperty + def metadata(self) -> MetadataMapping: + raise NotImplementedError() + + @staticmethod + def from_metadata(metadata: MetadataMapping) -> Optional["AutoMaterializeRuleEvaluationData"]: + """Temporary workaround to convert the generic metadata mapping into the old format.""" + if not metadata: + return None + elif "text" in metadata: + text_value = cast(str, metadata["text"].value) + return TextRuleEvaluationData(text_value) + + waiting_on_ancestors = frozenset( + { + cast(AssetKey, v.value) + for k, v in metadata.items() + if k.startswith("waiting_on_ancestor") + } + ) + if waiting_on_ancestors: + return WaitingOnAssetsRuleEvaluationData(waiting_on_asset_keys=waiting_on_ancestors) + + updated_parents = frozenset( + {cast(AssetKey, v.value) for k, v in metadata.items() if k.startswith("updated_parent")} + ) + will_update_parents = frozenset( + { + cast(AssetKey, v.value) + for k, v in metadata.items() + if k.startswith("will_update_parent") + } + ) + if updated_parents or will_update_parents: + return ParentUpdatedRuleEvaluationData( + updated_asset_keys=updated_parents, will_update_asset_keys=will_update_parents + ) + + return None @whitelist_for_serdes @@ -65,7 +105,9 @@ class TextRuleEvaluationData( AutoMaterializeRuleEvaluationData, NamedTuple("_TextRuleEvaluationData", [("text", str)]), ): - pass + @property + def metadata(self) -> MetadataMapping: + return {"text": MetadataValue.text(self.text)} @whitelist_for_serdes @@ -79,7 +121,18 @@ class ParentUpdatedRuleEvaluationData( ], ), ): - pass + @property + def metadata(self) -> MetadataMapping: + return { + **{ + f"updated_parent_{i+1}": MetadataValue.asset(k) + for i, k in enumerate(self.updated_asset_keys) + }, + **{ + f"will_update_parent_{i+1}": MetadataValue.asset(k) + for i, k in enumerate(self.will_update_asset_keys) + }, + } @whitelist_for_serdes @@ -90,12 +143,17 @@ class WaitingOnAssetsRuleEvaluationData( [("waiting_on_asset_keys", FrozenSet[AssetKey])], ), ): - pass + @property + def metadata(self) -> MetadataMapping: + return { + **{ + f"waiting_on_ancestor_{i+1}": MetadataValue.asset(k) + for i, k in enumerate(self.waiting_on_asset_keys) + }, + } -RuleEvaluationResults = Sequence[ - Tuple[Optional[AutoMaterializeRuleEvaluationData], AbstractSet[AssetKeyPartitionKey]] -] +RuleEvaluationResults = Tuple[AssetSubset, Sequence["AssetSubsetWithMetdata"]] @whitelist_for_serdes @@ -214,7 +272,12 @@ def get_rule_evaluation_results( self, rule_snapshot: AutoMaterializeRuleSnapshot, asset_graph: AssetGraph ) -> RuleEvaluationResults: """For a given rule snapshot, returns the calculated evaluations for that rule.""" - results = [] + from dagster._core.definitions.asset_automation_evaluator import AssetSubsetWithMetdata + + true_subset = AssetSubset.empty( + self.asset_key, asset_graph.get_partitions_def(self.asset_key) + ) + subsets_with_metadata = [] for rule_evaluation, serialized_subset in self.partition_subsets_by_condition: # filter for the same rule if rule_evaluation.rule_snapshot != rule_snapshot: @@ -223,8 +286,15 @@ def get_rule_evaluation_results( rule_evaluation, serialized_subset, asset_graph ) if deserialized_result: - results.append((deserialized_result[0], deserialized_result[1].asset_partitions)) - return results + evaluation_data, subset = deserialized_result + metadata = evaluation_data.metadata if evaluation_data else {} + + true_subset |= subset + subsets_with_metadata.append( + AssetSubsetWithMetdata(subset=subset, metadata=metadata) + ) + + return true_subset, subsets_with_metadata def _get_subset_with_decision_type( self, *, decision_type: AutoMaterializeDecisionType, asset_graph: AssetGraph diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py index 2238cf36c9fc2..fb632b631b9d3 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py @@ -12,6 +12,8 @@ import pendulum +from dagster._core.definitions.asset_automation_evaluator import AssetSubsetWithMetdata +from dagster._core.definitions.asset_subset import AssetSubset from dagster._core.definitions.events import AssetKeyPartitionKey from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._utils.schedules import cron_string_iterator @@ -165,7 +167,7 @@ def freshness_evaluation_results_for_asset_key( if not context.asset_graph.get_downstream_freshness_policies( asset_key=asset_key ) or context.asset_graph.is_partitioned(asset_key): - return [] + return context.empty_subset(), [] # figure out the current contents of this asset current_data_time = context.data_time_resolver.get_current_data_time(asset_key, current_time) @@ -178,7 +180,7 @@ def freshness_evaluation_results_for_asset_key( # if executing the asset on this tick would not change its data time, then return if current_data_time == expected_data_time: - return [] + return context.empty_subset(), [] # calculate the data times you would expect after all currently-executing runs # were to successfully complete @@ -208,7 +210,6 @@ def freshness_evaluation_results_for_asset_key( current_time=current_time, ) - asset_partition = AssetKeyPartitionKey(asset_key, None) if ( execution_period is not None and execution_period.start <= current_time @@ -217,6 +218,9 @@ def freshness_evaluation_results_for_asset_key( and expected_data_time >= execution_period.start and evaluation_data is not None ): - return [(evaluation_data, {asset_partition})] + all_subset = AssetSubset.all(asset_key, None) + return AssetSubset.all(asset_key, None), [ + AssetSubsetWithMetdata(all_subset, evaluation_data.metadata) + ] else: - return [] + return context.empty_subset(), []