Skip to content

Commit

Permalink
Add snapshot class
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 8, 2024
1 parent 85f1959 commit 42b3182
Showing 1 changed file with 60 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import dataclasses
import functools
import hashlib
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, AbstractSet, List, NamedTuple, Optional, Sequence, Tuple

Expand All @@ -24,10 +26,25 @@
from .auto_materialize_rule import AutoMaterializeRule, RuleEvaluationResults


class AutomationConditionNodeSnapshot(NamedTuple):
"""A serializable snapshot of a node in the AutomationCondition tree."""

class_name: str
description: str
child_hashes: Sequence[str]

@property
def hash(self) -> str:
"""Returns a unique hash for this node in the tree."""
return hashlib.md5(
"".join([self.class_name, self.description, *self.child_hashes]).encode("utf-8")
).hexdigest()


class ConditionEvaluation(NamedTuple):
"""Internal representation of the results of evaluating a node in the evaluation tree."""

condition: "AutomationCondition"
condition_snapshot: AutomationConditionNodeSnapshot
true_subset: AssetSubset
candidate_subset: AssetSubset

Expand All @@ -40,18 +57,17 @@ class ConditionEvaluation(NamedTuple):
Tuple[AutoMaterializeRuleEvaluation, AbstractSet[AssetKeyPartitionKey]]
] = []

@property
def all_results(
self,
self, condition: "AutomationCondition"
) -> Sequence[Tuple[AutoMaterializeRuleEvaluation, AbstractSet[AssetKeyPartitionKey]]]:
"""This method is a placeholder to allow us to convert this into a shape that other parts
of the system understand.
"""
if isinstance(self.condition, RuleCondition):
if isinstance(condition, RuleCondition):
results = [
(
AutoMaterializeRuleEvaluation(
rule_snapshot=self.condition.rule.to_snapshot(),
rule_snapshot=condition.rule.to_snapshot(),
evaluation_data=evaluation_data,
),
subset,
Expand All @@ -60,15 +76,19 @@ def all_results(
]
else:
results = []
for child in self.child_evaluations:
results = [*results, *child.all_results]
for i, child in enumerate(self.child_evaluations):
results = [*results, *child.all_results(condition.children[i])]
return results

def for_child(self, child_condition: "AutomationCondition") -> Optional["ConditionEvaluation"]:
"""Returns the evaluation of a given child condition."""
"""Returns the evaluation of a given child condition by finding the child evaluation that
has an identical hash to the given condition.
"""
child_hash = child_condition.snapshot.hash
for child_evaluation in self.child_evaluations:
if child_evaluation.condition == child_condition:
if child_evaluation.condition_snapshot.hash == child_hash:
return child_evaluation

return None

def to_evaluation(
Expand All @@ -80,8 +100,13 @@ def to_evaluation(
"""This method is a placeholder to allow us to convert this into a shape that other parts
of the system understand.
"""
condition = (
check.not_none(asset_graph.get_auto_materialize_policy(asset_key))
.to_auto_materialize_policy_evaluator()
.condition
)
# backcompat way to calculate the set of skipped partitions for legacy policies
if self.condition.is_legacy and len(self.child_evaluations) == 2:
if condition.is_legacy and len(self.child_evaluations) == 2:
# the first child is the materialize condition, the second child is the negation of
# the skip condition
_, nor_skip_evaluation = self.child_evaluations
Expand All @@ -97,7 +122,10 @@ def to_evaluation(
return AutoMaterializeAssetEvaluation.from_rule_evaluation_results(
asset_key=asset_key,
asset_graph=asset_graph,
asset_partitions_by_rule_evaluation=[*self.all_results, *self.discard_results],
asset_partitions_by_rule_evaluation=[
*self.all_results(condition),
*self.discard_results,
],
num_requested=(self.true_subset - discard_subset).size,
num_skipped=skipped_subset_size,
num_discarded=discard_subset.size,
Expand All @@ -114,7 +142,7 @@ def from_evaluation_and_rule(
partitions_def = asset_graph.get_partitions_def(asset_key)
empty_subset = AssetSubset.empty(asset_key, partitions_def)
return ConditionEvaluation(
condition=RuleCondition(rule=rule),
condition_snapshot=RuleCondition(rule=rule).snapshot,
true_subset=empty_subset,
candidate_subset=empty_subset
if rule.decision_type == AutoMaterializeDecisionType.MATERIALIZE
Expand Down Expand Up @@ -154,7 +182,7 @@ def from_evaluation(
]
children = [
ConditionEvaluation(
condition=materialize_condition,
condition_snapshot=materialize_condition.snapshot,
true_subset=empty_subset,
candidate_subset=empty_subset,
child_evaluations=[
Expand All @@ -163,7 +191,7 @@ def from_evaluation(
],
),
ConditionEvaluation(
condition=skip_condition,
condition_snapshot=skip_condition.snapshot,
true_subset=empty_subset,
candidate_subset=empty_subset,
child_evaluations=[
Expand All @@ -173,7 +201,7 @@ def from_evaluation(
),
]
return ConditionEvaluation(
condition=condition,
condition_snapshot=condition.snapshot,
true_subset=evaluation.get_requested_subset(asset_graph),
discard_subset=evaluation.get_discarded_subset(asset_graph),
candidate_subset=empty_subset,
Expand All @@ -187,10 +215,6 @@ class AutomationCondition(ABC):
new conditions using the `&` (and), `|` (or), and `~` (not) operators.
"""

@property
def children(self) -> Sequence["AutomationCondition"]:
return []

@abstractmethod
def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> ConditionEvaluation:
raise NotImplementedError()
Expand Down Expand Up @@ -222,6 +246,19 @@ def is_legacy(self) -> bool:
and isinstance(self.children[1], NotAutomationCondition)
)

@property
def children(self) -> Sequence["AutomationCondition"]:
return []

@functools.cached_property
def snapshot(self) -> AutomationConditionNodeSnapshot:
"""Returns a snapshot of this condition that can be used for serialization."""
return AutomationConditionNodeSnapshot(
class_name=self.__class__.__name__,
description=str(self),
child_hashes=[child.snapshot.hash for child in self.children],
)


class RuleCondition(
NamedTuple("_RuleCondition", [("rule", "AutoMaterializeRule")]),
Expand All @@ -243,7 +280,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit
f"Rule returned {true_subset.size} partitions"
)
return ConditionEvaluation(
condition=self,
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
results=results,
Expand All @@ -265,7 +302,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit
child_evaluations.append(result)
true_subset &= result.true_subset
return ConditionEvaluation(
condition=self,
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
child_evaluations=child_evaluations,
Expand All @@ -289,7 +326,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit
child_evaluations.append(result)
true_subset |= result.true_subset
return ConditionEvaluation(
condition=self,
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
child_evaluations=child_evaluations,
Expand Down Expand Up @@ -318,7 +355,7 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit
true_subset = context.candidate_subset - result.true_subset

return ConditionEvaluation(
condition=self,
condition_snapshot=self.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
child_evaluations=[result],
Expand Down

0 comments on commit 42b3182

Please sign in to comment.