Skip to content

Commit

Permalink
Simplify Context
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 8, 2024
1 parent bc7d7a7 commit 163e1b3
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 220 deletions.
44 changes: 22 additions & 22 deletions python_modules/dagster/dagster/_core/definitions/asset_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,7 @@ class AssetConditionSnapshot(NamedTuple):

class_name: str
description: str
child_hashes: Sequence[str]

@property
def hash(self) -> str:
"""Returns a unique hash for this node in the tree."""
return hashlib.md5(
"".join([self.class_name, self.description, *self.child_hashes]).encode("utf-8")
).hexdigest()
unique_id: str


@whitelist_for_serdes
Expand Down Expand Up @@ -92,9 +85,9 @@ def for_child(self, child_condition: "AssetCondition") -> Optional["AssetConditi
"""Returns the evaluation of a given child condition by finding the child evaluation that
has an identical hash to the given condition.
"""
child_hash = child_condition.snapshot.hash
child_unique_id = child_condition.snapshot.unique_id
for child_evaluation in self.child_evaluations:
if child_evaluation.condition_snapshot.hash == child_hash:
if child_evaluation.condition_snapshot.unique_id == child_unique_id:
return child_evaluation

return None
Expand Down Expand Up @@ -127,6 +120,14 @@ class AssetCondition(ABC):
new conditions using the `&` (and), `|` (or), and `~` (not) operators.
"""

@property
def unique_id(self) -> str:
parts = [
self.__class__.__name__,
*[child.unique_id for child in self.children],
]
return hashlib.md5("".join(parts).encode()).hexdigest()

@abstractmethod
def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation:
raise NotImplementedError()
Expand Down Expand Up @@ -164,10 +165,6 @@ def is_legacy(self) -> bool:
def children(self) -> Sequence["AssetCondition"]:
return []

@property
def indexed_children(self) -> Sequence[Tuple[int, "AssetCondition"]]:
return list(enumerate(self.children))

@property
def not_discard_condition(self) -> Optional["AssetCondition"]:
if not self.is_legacy or not len(self.children) == 3:
Expand All @@ -180,7 +177,7 @@ def snapshot(self) -> AssetConditionSnapshot:
return AssetConditionSnapshot(
class_name=self.__class__.__name__,
description=str(self),
child_hashes=[child.snapshot.hash for child in self.children],
unique_id=self.unique_id,
)


Expand All @@ -190,6 +187,11 @@ class RuleCondition(
):
"""This class represents the condition that a particular AutoMaterializeRule is satisfied."""

@property
def unique_id(self) -> str:
parts = [self.rule.__class__.__name__, self.rule.description]
return hashlib.md5("".join(parts).encode()).hexdigest()

def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation:
context.root_context.daemon_context._verbose_log_fn( # noqa
f"Evaluating rule: {self.rule.to_snapshot()}"
Expand All @@ -215,10 +217,8 @@ class AndAssetCondition(
def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation:
child_evaluations: List[AssetConditionEvaluation] = []
true_subset = context.candidate_subset
for index, child in self.indexed_children:
child_context = context.for_child(
condition=child, candidate_subset=true_subset, child_index=index
)
for child in self.children:
child_context = context.for_child(condition=child, candidate_subset=true_subset)
result = child.evaluate(child_context)
child_evaluations.append(result)
true_subset &= result.true_subset
Expand All @@ -239,9 +239,9 @@ class OrAssetCondition(
def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation:
child_evaluations: List[AssetConditionEvaluation] = []
true_subset = context.empty_subset()
for index, child in self.indexed_children:
for child in self.children:
child_context = context.for_child(
condition=child, candidate_subset=context.candidate_subset, child_index=index
condition=child, candidate_subset=context.candidate_subset
)
result = child.evaluate(child_context)
child_evaluations.append(result)
Expand Down Expand Up @@ -270,7 +270,7 @@ def child(self) -> AssetCondition:

def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation:
child_context = context.for_child(
condition=self.child, candidate_subset=context.candidate_subset, child_index=0
condition=self.child, candidate_subset=context.candidate_subset
)
result = self.child.evaluate(child_context)
true_subset = context.candidate_subset - result.true_subset
Expand Down
Loading

0 comments on commit 163e1b3

Please sign in to comment.