From a20df8371505c8f07d02592b26f172a8a9c380a2 Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Fri, 19 Jan 2024 16:22:35 -0500 Subject: [PATCH] [amp-refactor][2/n] Create snapshot class for condition objects (#18613) ## Summary & Motivation We want to make this ConditionEvaluation class serializable, and in order to do so, we'll want to serialize snapshots of our conditions rather than the objects themselves. This brings up an interesting question: the ConditionEvaluation class has information that we want to call up from previous ticks, and it has a recursive structure. On the next tick, we need to be able to locate which part of the recursive structure we should explore next (this is the `for_child` call). However, there are three issues: 1. A given condition can have multiple identical sub-expressions, i.e. `(A & (B | C) & (B | C))` is technically valid, so we can't use a pure equality test to detect which sub-expression to go to next 2. If each node in the condition evaluation tree needs to have a snapshot class which contains all of ITS children, then the NamedTuple will end up being size O(N^2) where N is the number of nodes in the tree, as each node will need to have some limited representation of all nodes below it. 3. A condition can change between ticks, meaning that the entire structure may be different on the current tick vs. what we have a snapshot of. I solve all of these issues by not including any information about children on the snapshot itself, and instead letting that get encoded in the recursive structure of the ConditionEvaluation. Then, to retrieve a specific child, we just rely on the index of that child in the structure. Some examples of what would happen if the condition changed between ticks: Serialized: `A | B | C` Current: `A | B | D | C` Result: A and B get mapped to their previous tick's condition evaluation, D gets no data (because it's of a different type than C), C gets no data (because it's in a new index) Serialized: `A | B | C` Current: `A | B` Result: A and B get mapped to their previous tick's condition evaluation Serialized: `(A | B) & (C | D | E)` Current: `(A | B) & (C | D | E | F)` Result: All things serialized on the previous tick get mapped correctly Serialized: `(A | B) & (C | D | E)` Current: `X & (A | B) & (C | D | E)` Result: Nothing serialized on the previous tick gets mapped correctly The last case is definitely the most concerning one -- it's possible that we could create a fancier way of detecting that sort of situation in some cases (this would come in the form of fancier `for_child` logic), but it seems hard to do in a fully-general way. ## How I Tested These Changes --- .../definitions/asset_automation_evaluator.py | 83 ++++++++++++++----- 1 file changed, 60 insertions(+), 23 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py b/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py index b5ef67da6d141..6fad476eea6b8 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py @@ -1,4 +1,6 @@ import dataclasses +import functools +import hashlib from abc import ABC, abstractmethod from typing import TYPE_CHECKING, AbstractSet, List, NamedTuple, Optional, Sequence, Tuple @@ -24,10 +26,25 @@ from .auto_materialize_rule import AutoMaterializeRule, RuleEvaluationResults +class AutomationConditionNodeSnapshot(NamedTuple): + """A serializable snapshot of a node in the AutomationCondition tree.""" + + class_name: str + description: str + child_hashes: Sequence[str] + + @property + def hash(self) -> str: + """Returns a unique hash for this node in the tree.""" + return hashlib.md5( + "".join([self.class_name, self.description, *self.child_hashes]).encode("utf-8") + ).hexdigest() + + class ConditionEvaluation(NamedTuple): """Internal representation of the results of evaluating a node in the evaluation tree.""" - condition: "AutomationCondition" + condition_snapshot: AutomationConditionNodeSnapshot true_subset: AssetSubset candidate_subset: AssetSubset @@ -40,18 +57,17 @@ class ConditionEvaluation(NamedTuple): Tuple[AutoMaterializeRuleEvaluation, AbstractSet[AssetKeyPartitionKey]] ] = [] - @property def all_results( - self, + self, condition: "AutomationCondition" ) -> Sequence[Tuple[AutoMaterializeRuleEvaluation, AbstractSet[AssetKeyPartitionKey]]]: """This method is a placeholder to allow us to convert this into a shape that other parts of the system understand. """ - if isinstance(self.condition, RuleCondition): + if isinstance(condition, RuleCondition): results = [ ( AutoMaterializeRuleEvaluation( - rule_snapshot=self.condition.rule.to_snapshot(), + rule_snapshot=condition.rule.to_snapshot(), evaluation_data=evaluation_data, ), subset, @@ -60,15 +76,19 @@ def all_results( ] else: results = [] - for child in self.child_evaluations: - results = [*results, *child.all_results] + for i, child in enumerate(self.child_evaluations): + results = [*results, *child.all_results(condition.children[i])] return results def for_child(self, child_condition: "AutomationCondition") -> Optional["ConditionEvaluation"]: - """Returns the evaluation of a given child condition.""" + """Returns the evaluation of a given child condition by finding the child evaluation that + has an identical hash to the given condition. + """ + child_hash = child_condition.snapshot.hash for child_evaluation in self.child_evaluations: - if child_evaluation.condition == child_condition: + if child_evaluation.condition_snapshot.hash == child_hash: return child_evaluation + return None def to_evaluation( @@ -80,8 +100,13 @@ def to_evaluation( """This method is a placeholder to allow us to convert this into a shape that other parts of the system understand. """ + condition = ( + check.not_none(asset_graph.get_auto_materialize_policy(asset_key)) + .to_auto_materialize_policy_evaluator() + .condition + ) # backcompat way to calculate the set of skipped partitions for legacy policies - if self.condition.is_legacy and len(self.child_evaluations) == 2: + if condition.is_legacy and len(self.child_evaluations) == 2: # the first child is the materialize condition, the second child is the negation of # the skip condition _, nor_skip_evaluation = self.child_evaluations @@ -97,7 +122,10 @@ def to_evaluation( return AutoMaterializeAssetEvaluation.from_rule_evaluation_results( asset_key=asset_key, asset_graph=asset_graph, - asset_partitions_by_rule_evaluation=[*self.all_results, *self.discard_results], + asset_partitions_by_rule_evaluation=[ + *self.all_results(condition), + *self.discard_results, + ], num_requested=(self.true_subset - discard_subset).size, num_skipped=skipped_subset_size, num_discarded=discard_subset.size, @@ -114,7 +142,7 @@ def from_evaluation_and_rule( partitions_def = asset_graph.get_partitions_def(asset_key) empty_subset = AssetSubset.empty(asset_key, partitions_def) return ConditionEvaluation( - condition=RuleCondition(rule=rule), + condition_snapshot=RuleCondition(rule=rule).snapshot, true_subset=empty_subset, candidate_subset=empty_subset if rule.decision_type == AutoMaterializeDecisionType.MATERIALIZE @@ -154,7 +182,7 @@ def from_evaluation( ] children = [ ConditionEvaluation( - condition=materialize_condition, + condition_snapshot=materialize_condition.snapshot, true_subset=empty_subset, candidate_subset=empty_subset, child_evaluations=[ @@ -163,7 +191,7 @@ def from_evaluation( ], ), ConditionEvaluation( - condition=skip_condition, + condition_snapshot=skip_condition.snapshot, true_subset=empty_subset, candidate_subset=empty_subset, child_evaluations=[ @@ -173,7 +201,7 @@ def from_evaluation( ), ] return ConditionEvaluation( - condition=condition, + condition_snapshot=condition.snapshot, true_subset=evaluation.get_requested_subset(asset_graph), discard_subset=evaluation.get_discarded_subset(asset_graph), candidate_subset=empty_subset, @@ -187,10 +215,6 @@ class AutomationCondition(ABC): new conditions using the `&` (and), `|` (or), and `~` (not) operators. """ - @property - def children(self) -> Sequence["AutomationCondition"]: - return [] - @abstractmethod def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> ConditionEvaluation: raise NotImplementedError() @@ -222,6 +246,19 @@ def is_legacy(self) -> bool: and isinstance(self.children[1], NotAutomationCondition) ) + @property + def children(self) -> Sequence["AutomationCondition"]: + return [] + + @functools.cached_property + def snapshot(self) -> AutomationConditionNodeSnapshot: + """Returns a snapshot of this condition that can be used for serialization.""" + return AutomationConditionNodeSnapshot( + class_name=self.__class__.__name__, + description=str(self), + child_hashes=[child.snapshot.hash for child in self.children], + ) + class RuleCondition( NamedTuple("_RuleCondition", [("rule", "AutoMaterializeRule")]), @@ -243,7 +280,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit f"Rule returned {true_subset.size} partitions" ) return ConditionEvaluation( - condition=self, + condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, results=results, @@ -265,7 +302,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit child_evaluations.append(result) true_subset &= result.true_subset return ConditionEvaluation( - condition=self, + condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, child_evaluations=child_evaluations, @@ -289,7 +326,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit child_evaluations.append(result) true_subset |= result.true_subset return ConditionEvaluation( - condition=self, + condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, child_evaluations=child_evaluations, @@ -318,7 +355,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit true_subset = context.candidate_subset - result.true_subset return ConditionEvaluation( - condition=self, + condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, child_evaluations=[result],