From 42b318251824934c242a689aa58c6028a339375c Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Fri, 8 Dec 2023 13:32:27 -0800 Subject: [PATCH] Add snapshot class --- .../definitions/asset_automation_evaluator.py | 83 ++++++++++++++----- 1 file changed, 60 insertions(+), 23 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py b/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py index b5ef67da6d141..6fad476eea6b8 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py @@ -1,4 +1,6 @@ import dataclasses +import functools +import hashlib from abc import ABC, abstractmethod from typing import TYPE_CHECKING, AbstractSet, List, NamedTuple, Optional, Sequence, Tuple @@ -24,10 +26,25 @@ from .auto_materialize_rule import AutoMaterializeRule, RuleEvaluationResults +class AutomationConditionNodeSnapshot(NamedTuple): + """A serializable snapshot of a node in the AutomationCondition tree.""" + + class_name: str + description: str + child_hashes: Sequence[str] + + @property + def hash(self) -> str: + """Returns a unique hash for this node in the tree.""" + return hashlib.md5( + "".join([self.class_name, self.description, *self.child_hashes]).encode("utf-8") + ).hexdigest() + + class ConditionEvaluation(NamedTuple): """Internal representation of the results of evaluating a node in the evaluation tree.""" - condition: "AutomationCondition" + condition_snapshot: AutomationConditionNodeSnapshot true_subset: AssetSubset candidate_subset: AssetSubset @@ -40,18 +57,17 @@ class ConditionEvaluation(NamedTuple): Tuple[AutoMaterializeRuleEvaluation, AbstractSet[AssetKeyPartitionKey]] ] = [] - @property def all_results( - self, + self, condition: "AutomationCondition" ) -> Sequence[Tuple[AutoMaterializeRuleEvaluation, AbstractSet[AssetKeyPartitionKey]]]: """This method is a placeholder to allow us to convert this into a shape that other parts of the system understand. """ - if isinstance(self.condition, RuleCondition): + if isinstance(condition, RuleCondition): results = [ ( AutoMaterializeRuleEvaluation( - rule_snapshot=self.condition.rule.to_snapshot(), + rule_snapshot=condition.rule.to_snapshot(), evaluation_data=evaluation_data, ), subset, @@ -60,15 +76,19 @@ def all_results( ] else: results = [] - for child in self.child_evaluations: - results = [*results, *child.all_results] + for i, child in enumerate(self.child_evaluations): + results = [*results, *child.all_results(condition.children[i])] return results def for_child(self, child_condition: "AutomationCondition") -> Optional["ConditionEvaluation"]: - """Returns the evaluation of a given child condition.""" + """Returns the evaluation of a given child condition by finding the child evaluation that + has an identical hash to the given condition. + """ + child_hash = child_condition.snapshot.hash for child_evaluation in self.child_evaluations: - if child_evaluation.condition == child_condition: + if child_evaluation.condition_snapshot.hash == child_hash: return child_evaluation + return None def to_evaluation( @@ -80,8 +100,13 @@ def to_evaluation( """This method is a placeholder to allow us to convert this into a shape that other parts of the system understand. """ + condition = ( + check.not_none(asset_graph.get_auto_materialize_policy(asset_key)) + .to_auto_materialize_policy_evaluator() + .condition + ) # backcompat way to calculate the set of skipped partitions for legacy policies - if self.condition.is_legacy and len(self.child_evaluations) == 2: + if condition.is_legacy and len(self.child_evaluations) == 2: # the first child is the materialize condition, the second child is the negation of # the skip condition _, nor_skip_evaluation = self.child_evaluations @@ -97,7 +122,10 @@ def to_evaluation( return AutoMaterializeAssetEvaluation.from_rule_evaluation_results( asset_key=asset_key, asset_graph=asset_graph, - asset_partitions_by_rule_evaluation=[*self.all_results, *self.discard_results], + asset_partitions_by_rule_evaluation=[ + *self.all_results(condition), + *self.discard_results, + ], num_requested=(self.true_subset - discard_subset).size, num_skipped=skipped_subset_size, num_discarded=discard_subset.size, @@ -114,7 +142,7 @@ def from_evaluation_and_rule( partitions_def = asset_graph.get_partitions_def(asset_key) empty_subset = AssetSubset.empty(asset_key, partitions_def) return ConditionEvaluation( - condition=RuleCondition(rule=rule), + condition_snapshot=RuleCondition(rule=rule).snapshot, true_subset=empty_subset, candidate_subset=empty_subset if rule.decision_type == AutoMaterializeDecisionType.MATERIALIZE @@ -154,7 +182,7 @@ def from_evaluation( ] children = [ ConditionEvaluation( - condition=materialize_condition, + condition_snapshot=materialize_condition.snapshot, true_subset=empty_subset, candidate_subset=empty_subset, child_evaluations=[ @@ -163,7 +191,7 @@ def from_evaluation( ], ), ConditionEvaluation( - condition=skip_condition, + condition_snapshot=skip_condition.snapshot, true_subset=empty_subset, candidate_subset=empty_subset, child_evaluations=[ @@ -173,7 +201,7 @@ def from_evaluation( ), ] return ConditionEvaluation( - condition=condition, + condition_snapshot=condition.snapshot, true_subset=evaluation.get_requested_subset(asset_graph), discard_subset=evaluation.get_discarded_subset(asset_graph), candidate_subset=empty_subset, @@ -187,10 +215,6 @@ class AutomationCondition(ABC): new conditions using the `&` (and), `|` (or), and `~` (not) operators. """ - @property - def children(self) -> Sequence["AutomationCondition"]: - return [] - @abstractmethod def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> ConditionEvaluation: raise NotImplementedError() @@ -222,6 +246,19 @@ def is_legacy(self) -> bool: and isinstance(self.children[1], NotAutomationCondition) ) + @property + def children(self) -> Sequence["AutomationCondition"]: + return [] + + @functools.cached_property + def snapshot(self) -> AutomationConditionNodeSnapshot: + """Returns a snapshot of this condition that can be used for serialization.""" + return AutomationConditionNodeSnapshot( + class_name=self.__class__.__name__, + description=str(self), + child_hashes=[child.snapshot.hash for child in self.children], + ) + class RuleCondition( NamedTuple("_RuleCondition", [("rule", "AutoMaterializeRule")]), @@ -243,7 +280,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit f"Rule returned {true_subset.size} partitions" ) return ConditionEvaluation( - condition=self, + condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, results=results, @@ -265,7 +302,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit child_evaluations.append(result) true_subset &= result.true_subset return ConditionEvaluation( - condition=self, + condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, child_evaluations=child_evaluations, @@ -289,7 +326,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit child_evaluations.append(result) true_subset |= result.true_subset return ConditionEvaluation( - condition=self, + condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, child_evaluations=child_evaluations, @@ -318,7 +355,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit true_subset = context.candidate_subset - result.true_subset return ConditionEvaluation( - condition=self, + condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, child_evaluations=[result],