Skip to content

Commit

Permalink
[amp-refactor][2/n] Create snapshot class for condition objects (#18613)
Browse files Browse the repository at this point in the history
## Summary & Motivation

We want to make this ConditionEvaluation class serializable, and in order to do so, we'll want to serialize snapshots of our conditions rather than the objects themselves.

This brings up an interesting question: the ConditionEvaluation class has information that we want to call up from previous ticks, and it has a recursive structure. On the next tick, we need to be able to locate which part of the recursive structure we should explore next (this is the `for_child` call). However, there are three issues:

1. A given condition can have multiple identical sub-expressions, i.e. `(A & (B | C) & (B | C))` is technically valid, so we can't use a pure equality test to detect which sub-expression to go to next
2. If each node in the condition evaluation tree needs to have a snapshot class which contains all of ITS children, then the NamedTuple will end up being size O(N^2) where N is the number of nodes in the tree, as each node will need to have some limited representation of all nodes below it.
3. A condition can change between ticks, meaning that the entire structure may be different on the current tick vs. what we have a snapshot of.

I solve all of these issues by not including any information about children on the snapshot itself, and instead letting that get encoded in the recursive structure of the ConditionEvaluation. Then, to retrieve a specific child, we just rely on the index of that child in the structure. Some examples of what would happen if the condition changed between ticks:

Serialized: `A | B | C`
Current: `A | B | D | C`
Result: A and B get mapped to their previous tick's condition evaluation, D gets no data (because it's of a different type than C), C gets no data (because it's in a new index)

Serialized: `A | B | C`
Current: `A | B`
Result: A and B get mapped to their previous tick's condition evaluation

Serialized: `(A | B) & (C | D | E)`
Current: `(A | B) & (C | D | E | F)`
Result: All things serialized on the previous tick get mapped correctly

Serialized: `(A | B) & (C | D | E)`
Current:  `X & (A | B) & (C | D | E)`
Result: Nothing serialized on the previous tick gets mapped correctly

The last case is definitely the most concerning one -- it's possible that we could create a fancier way of detecting that sort of situation in some cases (this would come in the form of fancier `for_child` logic), but it seems hard to do in a fully-general way.

## How I Tested These Changes
  • Loading branch information
OwenKephart authored Jan 19, 2024
1 parent 06b7568 commit a20df83
Showing 1 changed file with 60 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import dataclasses
import functools
import hashlib
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, AbstractSet, List, NamedTuple, Optional, Sequence, Tuple

Expand All @@ -24,10 +26,25 @@
from .auto_materialize_rule import AutoMaterializeRule, RuleEvaluationResults


class AutomationConditionNodeSnapshot(NamedTuple):
"""A serializable snapshot of a node in the AutomationCondition tree."""

class_name: str
description: str
child_hashes: Sequence[str]

@property
def hash(self) -> str:
"""Returns a unique hash for this node in the tree."""
return hashlib.md5(
"".join([self.class_name, self.description, *self.child_hashes]).encode("utf-8")
).hexdigest()


class ConditionEvaluation(NamedTuple):
"""Internal representation of the results of evaluating a node in the evaluation tree."""

condition: "AutomationCondition"
condition_snapshot: AutomationConditionNodeSnapshot
true_subset: AssetSubset
candidate_subset: AssetSubset

Expand All @@ -40,18 +57,17 @@ class ConditionEvaluation(NamedTuple):
Tuple[AutoMaterializeRuleEvaluation, AbstractSet[AssetKeyPartitionKey]]
] = []

@property
def all_results(
self,
self, condition: "AutomationCondition"
) -> Sequence[Tuple[AutoMaterializeRuleEvaluation, AbstractSet[AssetKeyPartitionKey]]]:
"""This method is a placeholder to allow us to convert this into a shape that other parts
of the system understand.
"""
if isinstance(self.condition, RuleCondition):
if isinstance(condition, RuleCondition):
results = [
(
AutoMaterializeRuleEvaluation(
rule_snapshot=self.condition.rule.to_snapshot(),
rule_snapshot=condition.rule.to_snapshot(),
evaluation_data=evaluation_data,
),
subset,
Expand All @@ -60,15 +76,19 @@ def all_results(
]
else:
results = []
for child in self.child_evaluations:
results = [*results, *child.all_results]
for i, child in enumerate(self.child_evaluations):
results = [*results, *child.all_results(condition.children[i])]
return results

def for_child(self, child_condition: "AutomationCondition") -> Optional["ConditionEvaluation"]:
"""Returns the evaluation of a given child condition."""
"""Returns the evaluation of a given child condition by finding the child evaluation that
has an identical hash to the given condition.
"""
child_hash = child_condition.snapshot.hash
for child_evaluation in self.child_evaluations:
if child_evaluation.condition == child_condition:
if child_evaluation.condition_snapshot.hash == child_hash:
return child_evaluation

return None

def to_evaluation(
Expand All @@ -80,8 +100,13 @@ def to_evaluation(
"""This method is a placeholder to allow us to convert this into a shape that other parts
of the system understand.
"""
condition = (
check.not_none(asset_graph.get_auto_materialize_policy(asset_key))
.to_auto_materialize_policy_evaluator()
.condition
)
# backcompat way to calculate the set of skipped partitions for legacy policies
if self.condition.is_legacy and len(self.child_evaluations) == 2:
if condition.is_legacy and len(self.child_evaluations) == 2:
# the first child is the materialize condition, the second child is the negation of
# the skip condition
_, nor_skip_evaluation = self.child_evaluations
Expand All @@ -97,7 +122,10 @@ def to_evaluation(
return AutoMaterializeAssetEvaluation.from_rule_evaluation_results(
asset_key=asset_key,
asset_graph=asset_graph,
asset_partitions_by_rule_evaluation=[*self.all_results, *self.discard_results],
asset_partitions_by_rule_evaluation=[
*self.all_results(condition),
*self.discard_results,
],
num_requested=(self.true_subset - discard_subset).size,
num_skipped=skipped_subset_size,
num_discarded=discard_subset.size,
Expand All @@ -114,7 +142,7 @@ def from_evaluation_and_rule(
partitions_def = asset_graph.get_partitions_def(asset_key)
empty_subset = AssetSubset.empty(asset_key, partitions_def)
return ConditionEvaluation(
condition=RuleCondition(rule=rule),
condition_snapshot=RuleCondition(rule=rule).snapshot,
true_subset=empty_subset,
candidate_subset=empty_subset
if rule.decision_type == AutoMaterializeDecisionType.MATERIALIZE
Expand Down Expand Up @@ -154,7 +182,7 @@ def from_evaluation(
]
children = [
ConditionEvaluation(
condition=materialize_condition,
condition_snapshot=materialize_condition.snapshot,
true_subset=empty_subset,
candidate_subset=empty_subset,
child_evaluations=[
Expand All @@ -163,7 +191,7 @@ def from_evaluation(
],
),
ConditionEvaluation(
condition=skip_condition,
condition_snapshot=skip_condition.snapshot,
true_subset=empty_subset,
candidate_subset=empty_subset,
child_evaluations=[
Expand All @@ -173,7 +201,7 @@ def from_evaluation(
),
]
return ConditionEvaluation(
condition=condition,
condition_snapshot=condition.snapshot,
true_subset=evaluation.get_requested_subset(asset_graph),
discard_subset=evaluation.get_discarded_subset(asset_graph),
candidate_subset=empty_subset,
Expand All @@ -187,10 +215,6 @@ class AutomationCondition(ABC):
new conditions using the `&` (and), `|` (or), and `~` (not) operators.
"""

@property
def children(self) -> Sequence["AutomationCondition"]:
return []

@abstractmethod
def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> ConditionEvaluation:
raise NotImplementedError()
Expand Down Expand Up @@ -222,6 +246,19 @@ def is_legacy(self) -> bool:
and isinstance(self.children[1], NotAutomationCondition)
)

@property
def children(self) -> Sequence["AutomationCondition"]:
return []

@functools.cached_property
def snapshot(self) -> AutomationConditionNodeSnapshot:
"""Returns a snapshot of this condition that can be used for serialization."""
return AutomationConditionNodeSnapshot(
class_name=self.__class__.__name__,
description=str(self),
child_hashes=[child.snapshot.hash for child in self.children],
)


class RuleCondition(
NamedTuple("_RuleCondition", [("rule", "AutoMaterializeRule")]),
Expand All @@ -243,7 +280,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit
f"Rule returned {true_subset.size} partitions"
)
return ConditionEvaluation(
condition=self,
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
results=results,
Expand All @@ -265,7 +302,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit
child_evaluations.append(result)
true_subset &= result.true_subset
return ConditionEvaluation(
condition=self,
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
child_evaluations=child_evaluations,
Expand All @@ -289,7 +326,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit
child_evaluations.append(result)
true_subset |= result.true_subset
return ConditionEvaluation(
condition=self,
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
child_evaluations=child_evaluations,
Expand Down Expand Up @@ -318,7 +355,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit
true_subset = context.candidate_subset - result.true_subset

return ConditionEvaluation(
condition=self,
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
child_evaluations=[result],
Expand Down

0 comments on commit a20df83

Please sign in to comment.