From 922ec6aab21e2675323d8992591f97600008a6bf Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Fri, 8 Dec 2023 14:06:51 -0800 Subject: [PATCH] AssetAutomationCondition -> AssetCondition --- ...mation_evaluator.py => asset_condition.py} | 183 ++++++++---------- ... => asset_condition_evaluation_context.py} | 62 +++--- .../_core/definitions/asset_daemon_context.py | 24 ++- .../_core/definitions/asset_daemon_cursor.py | 16 +- .../definitions/auto_materialize_policy.py | 23 ++- .../definitions/auto_materialize_rule.py | 84 ++++---- .../auto_materialize_rule_evaluation.py | 4 +- .../freshness_based_auto_materialize.py | 8 +- 8 files changed, 181 insertions(+), 223 deletions(-) rename python_modules/dagster/dagster/_core/definitions/{asset_automation_evaluator.py => asset_condition.py} (69%) rename python_modules/dagster/dagster/_core/definitions/{asset_automation_condition_context.py => asset_condition_evaluation_context.py} (86%) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py b/python_modules/dagster/dagster/_core/definitions/asset_condition.py similarity index 69% rename from python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py rename to python_modules/dagster/dagster/_core/definitions/asset_condition.py index 393ff582408be..d40ffa4709328 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_automation_evaluator.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition.py @@ -13,7 +13,6 @@ ) import dagster._check as check -from dagster._core.definitions.asset_daemon_cursor import AssetDaemonAssetCursor from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.auto_materialize_rule_evaluation import ( AutoMaterializeAssetEvaluation, @@ -24,9 +23,8 @@ from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey from dagster._core.definitions.metadata import MetadataMapping, MetadataValue -from .asset_automation_condition_context import ( - AssetAutomationConditionEvaluationContext, - AssetAutomationEvaluationContext, +from .asset_condition_evaluation_context import ( + AssetConditionEvaluationContext, ) from .asset_subset import AssetSubset @@ -36,7 +34,7 @@ from .auto_materialize_rule import AutoMaterializeRule -class AutomationConditionNodeSnapshot(NamedTuple): +class AssetConditionSnapshot(NamedTuple): """A serializable snapshot of a node in the AutomationCondition tree.""" class_name: str @@ -62,17 +60,17 @@ def frozen_metadata(self) -> FrozenSet[Tuple[str, MetadataValue]]: return frozenset(self.metadata.items()) -class ConditionEvaluation(NamedTuple): +class AssetConditionEvaluation(NamedTuple): """Internal representation of the results of evaluating a node in the evaluation tree.""" - condition_snapshot: AutomationConditionNodeSnapshot + condition_snapshot: AssetConditionSnapshot true_subset: AssetSubset candidate_subset: AssetSubset subsets_with_metadata: Sequence[AssetSubsetWithMetdata] = [] - child_evaluations: Sequence["ConditionEvaluation"] = [] + child_evaluations: Sequence["AssetConditionEvaluation"] = [] def all_results( - self, condition: "AutomationCondition" + self, condition: "AssetCondition" ) -> Sequence[Tuple[AutoMaterializeRuleEvaluation, AbstractSet[AssetKeyPartitionKey]]]: """This method is a placeholder to allow us to convert this into a shape that other parts of the system understand. @@ -112,7 +110,7 @@ def all_results( results = [*results, *child.all_results(condition.children[i])] return results - def skip_subset_size(self, condition: "AutomationCondition") -> int: + def skip_subset_size(self, condition: "AssetCondition") -> int: # backcompat way to calculate the set of skipped partitions for legacy policies if not condition.is_legacy: return 0 @@ -121,7 +119,7 @@ def skip_subset_size(self, condition: "AutomationCondition") -> int: skip_evaluation = not_skip_evaluation.child_evaluations[0] return skip_evaluation.true_subset.size - def discard_subset(self, condition: "AutomationCondition") -> Optional[AssetSubset]: + def discard_subset(self, condition: "AssetCondition") -> Optional[AssetSubset]: not_discard_condition = condition.not_discard_condition if not not_discard_condition or len(self.child_evaluations) != 3: return None @@ -130,11 +128,11 @@ def discard_subset(self, condition: "AutomationCondition") -> Optional[AssetSubs discard_evaluation = not_discard_evaluation.child_evaluations[0] return discard_evaluation.true_subset - def discard_subset_size(self, condition: "AutomationCondition") -> int: + def discard_subset_size(self, condition: "AssetCondition") -> int: discard_subset = self.discard_subset(condition) return discard_subset.size if discard_subset else 0 - def for_child(self, child_condition: "AutomationCondition") -> Optional["ConditionEvaluation"]: + def for_child(self, child_condition: "AssetCondition") -> Optional["AssetConditionEvaluation"]: """Returns the evaluation of a given child condition by finding the child evaluation that has an identical hash to the given condition. """ @@ -154,11 +152,9 @@ def to_evaluation( """This method is a placeholder to allow us to convert this into a shape that other parts of the system understand. """ - condition = ( - check.not_none(asset_graph.get_auto_materialize_policy(asset_key)) - .to_auto_materialize_policy_evaluator() - .condition - ) + condition = check.not_none( + asset_graph.get_auto_materialize_policy(asset_key) + ).to_asset_condition() return AutoMaterializeAssetEvaluation.from_rule_evaluation_results( asset_key=asset_key, @@ -175,14 +171,14 @@ def from_evaluation_and_rule( evaluation: AutoMaterializeAssetEvaluation, asset_graph: AssetGraph, rule: "AutoMaterializeRule", - ) -> "ConditionEvaluation": + ) -> "AssetConditionEvaluation": asset_key = evaluation.asset_key partitions_def = asset_graph.get_partitions_def(asset_key) true_subset, subsets_with_metadata = evaluation.get_rule_evaluation_results( rule.to_snapshot(), asset_graph ) - return ConditionEvaluation( + return AssetConditionEvaluation( condition_snapshot=RuleCondition(rule=rule).snapshot, true_subset=true_subset, candidate_subset=AssetSubset.empty(asset_key, partitions_def) @@ -193,10 +189,10 @@ def from_evaluation_and_rule( @staticmethod def from_evaluation( - condition: "AutomationCondition", + condition: "AssetCondition", evaluation: Optional[AutoMaterializeAssetEvaluation], asset_graph: AssetGraph, - ) -> Optional["ConditionEvaluation"]: + ) -> Optional["AssetConditionEvaluation"]: """This method is a placeholder to allow us to convert the serialized objects the system uses into a more-convenient internal representation. """ @@ -222,26 +218,26 @@ def from_evaluation( and skip_condition.rule.to_snapshot() in (evaluation.rule_snapshots or set()) ] children = [ - ConditionEvaluation( + AssetConditionEvaluation( condition_snapshot=materialize_condition.snapshot, true_subset=empty_subset, candidate_subset=empty_subset, child_evaluations=[ - ConditionEvaluation.from_evaluation_and_rule(evaluation, asset_graph, rule) + AssetConditionEvaluation.from_evaluation_and_rule(evaluation, asset_graph, rule) for rule in materialize_rules ], ), - ConditionEvaluation( + AssetConditionEvaluation( condition_snapshot=not_skip_condition.snapshot, true_subset=empty_subset, candidate_subset=empty_subset, child_evaluations=[ - ConditionEvaluation( + AssetConditionEvaluation( condition_snapshot=skip_condition.snapshot, true_subset=empty_subset, candidate_subset=empty_subset, child_evaluations=[ - ConditionEvaluation.from_evaluation_and_rule( + AssetConditionEvaluation.from_evaluation_and_rule( evaluation, asset_graph, rule ) for rule in skip_rules @@ -254,19 +250,19 @@ def from_evaluation( discard_condition = condition.not_discard_condition.children[0] if isinstance(discard_condition, RuleCondition): children.append( - ConditionEvaluation( + AssetConditionEvaluation( condition_snapshot=condition.not_discard_condition.snapshot, true_subset=empty_subset, candidate_subset=empty_subset, child_evaluations=[ - ConditionEvaluation.from_evaluation_and_rule( + AssetConditionEvaluation.from_evaluation_and_rule( evaluation, asset_graph, discard_condition.rule ) ], ) ) - return ConditionEvaluation( + return AssetConditionEvaluation( condition_snapshot=condition.snapshot, true_subset=evaluation.get_requested_subset(asset_graph), candidate_subset=empty_subset, @@ -274,30 +270,30 @@ def from_evaluation( ) -class AutomationCondition(ABC): +class AssetCondition(ABC): """An AutomationCondition represents some state of the world that can influence if an asset partition should be materialized or not. AutomationConditions can be combined together to create new conditions using the `&` (and), `|` (or), and `~` (not) operators. """ @abstractmethod - def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> ConditionEvaluation: + def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: raise NotImplementedError() - def __and__(self, other: "AutomationCondition") -> "AutomationCondition": + def __and__(self, other: "AssetCondition") -> "AssetCondition": # group AndAutomationConditions together - if isinstance(self, AndAutomationCondition): - return AndAutomationCondition(children=[*self.children, other]) - return AndAutomationCondition(children=[self, other]) + if isinstance(self, AndAssetCondition): + return AndAssetCondition(children=[*self.children, other]) + return AndAssetCondition(children=[self, other]) - def __or__(self, other: "AutomationCondition") -> "AutomationCondition": + def __or__(self, other: "AssetCondition") -> "AssetCondition": # group OrAutomationConditions together - if isinstance(self, OrAutomationCondition): - return OrAutomationCondition(children=[*self.children, other]) - return OrAutomationCondition(children=[self, other]) + if isinstance(self, OrAssetCondition): + return OrAssetCondition(children=[*self.children, other]) + return OrAssetCondition(children=[self, other]) - def __invert__(self) -> "AutomationCondition": - return NotAutomationCondition(children=[self]) + def __invert__(self) -> "AssetCondition": + return NotAssetCondition(children=[self]) @property def is_legacy(self) -> bool: @@ -305,32 +301,32 @@ def is_legacy(self) -> bool: do certain types of backwards-compatible operations on it. """ return ( - isinstance(self, AndAutomationCondition) + isinstance(self, AndAssetCondition) and len(self.children) in {2, 3} - and isinstance(self.children[0], OrAutomationCondition) - and isinstance(self.children[1], NotAutomationCondition) + and isinstance(self.children[0], OrAssetCondition) + and isinstance(self.children[1], NotAssetCondition) # the third child is the discard condition, which is optional - and (len(self.children) == 2 or isinstance(self.children[2], NotAutomationCondition)) + and (len(self.children) == 2 or isinstance(self.children[2], NotAssetCondition)) ) @property - def children(self) -> Sequence["AutomationCondition"]: + def children(self) -> Sequence["AssetCondition"]: return [] @property - def indexed_children(self) -> Sequence[Tuple[int, "AutomationCondition"]]: + def indexed_children(self) -> Sequence[Tuple[int, "AssetCondition"]]: return list(enumerate(self.children)) @property - def not_discard_condition(self) -> Optional["AutomationCondition"]: + def not_discard_condition(self) -> Optional["AssetCondition"]: if not self.is_legacy or not len(self.children) == 3: return None return self.children[-1] @functools.cached_property - def snapshot(self) -> AutomationConditionNodeSnapshot: + def snapshot(self) -> AssetConditionSnapshot: """Returns a snapshot of this condition that can be used for serialization.""" - return AutomationConditionNodeSnapshot( + return AssetConditionSnapshot( class_name=self.__class__.__name__, description=str(self), child_hashes=[child.snapshot.hash for child in self.children], @@ -339,19 +335,19 @@ def snapshot(self) -> AutomationConditionNodeSnapshot: class RuleCondition( NamedTuple("_RuleCondition", [("rule", "AutoMaterializeRule")]), - AutomationCondition, + AssetCondition, ): """This class represents the condition that a particular AutoMaterializeRule is satisfied.""" - def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> ConditionEvaluation: - context.asset_context.daemon_context._verbose_log_fn( # noqa + def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: + context.root_context.daemon_context._verbose_log_fn( # noqa f"Evaluating rule: {self.rule.to_snapshot()}" ) true_subset, subsets_with_metadata = self.rule.evaluate_for_asset(context) - context.asset_context.daemon_context._verbose_log_fn( # noqa + context.root_context.daemon_context._verbose_log_fn( # noqa f"Rule returned {true_subset.size} partitions" ) - return ConditionEvaluation( + return AssetConditionEvaluation( condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, @@ -359,21 +355,23 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit ) -class AndAutomationCondition( - NamedTuple("_AndAutomationCondition", [("children", Sequence[AutomationCondition])]), - AutomationCondition, +class AndAssetCondition( + NamedTuple("_AndAssetCondition", [("children", Sequence[AssetCondition])]), + AssetCondition, ): """This class represents the condition that all of its children evaluate to true.""" - def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> ConditionEvaluation: - child_evaluations: List[ConditionEvaluation] = [] + def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: + child_evaluations: List[AssetConditionEvaluation] = [] true_subset = context.candidate_subset - for child in self.children: - child_context = context.for_child(condition=child, candidate_subset=true_subset) + for index, child in self.indexed_children: + child_context = context.for_child( + condition=child, candidate_subset=true_subset, child_index=index + ) result = child.evaluate(child_context) child_evaluations.append(result) true_subset &= result.true_subset - return ConditionEvaluation( + return AssetConditionEvaluation( condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, @@ -381,23 +379,23 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit ) -class OrAutomationCondition( - NamedTuple("_OrAutomationCondition", [("children", Sequence[AutomationCondition])]), - AutomationCondition, +class OrAssetCondition( + NamedTuple("_OrAssetCondition", [("children", Sequence[AssetCondition])]), + AssetCondition, ): """This class represents the condition that any of its children evaluate to true.""" - def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> ConditionEvaluation: - child_evaluations: List[ConditionEvaluation] = [] + def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: + child_evaluations: List[AssetConditionEvaluation] = [] true_subset = context.empty_subset() - for child in self.children: + for index, child in self.indexed_children: child_context = context.for_child( - condition=child, candidate_subset=context.candidate_subset + condition=child, candidate_subset=context.candidate_subset, child_index=index ) result = child.evaluate(child_context) child_evaluations.append(result) true_subset |= result.true_subset - return ConditionEvaluation( + return AssetConditionEvaluation( condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, @@ -405,55 +403,30 @@ def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> Condit ) -class NotAutomationCondition( - NamedTuple("_NotAutomationCondition", [("children", Sequence[AutomationCondition])]), - AutomationCondition, +class NotAssetCondition( + NamedTuple("_NotAssetCondition", [("children", Sequence[AssetCondition])]), + AssetCondition, ): """This class represents the condition that none of its children evaluate to true.""" - def __new__(cls, children: Sequence[AutomationCondition]): + def __new__(cls, children: Sequence[AssetCondition]): check.invariant(len(children) == 1) return super().__new__(cls, children) @property - def child(self) -> AutomationCondition: + def child(self) -> AssetCondition: return self.children[0] - def evaluate(self, context: AssetAutomationConditionEvaluationContext) -> ConditionEvaluation: + def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: child_context = context.for_child( - condition=self.child, candidate_subset=context.candidate_subset + condition=self.child, candidate_subset=context.candidate_subset, child_index=0 ) result = self.child.evaluate(child_context) true_subset = context.candidate_subset - result.true_subset - return ConditionEvaluation( + return AssetConditionEvaluation( condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, child_evaluations=[result], ) - - -class AssetAutomationEvaluator(NamedTuple): - """For now, this is an internal class that is used to help transition from the old format to the - new. Upstack, the original AutoMaterializePolicy class will be replaced with this. - """ - - condition: AutomationCondition - - def evaluate( - self, context: AssetAutomationEvaluationContext - ) -> Tuple[ConditionEvaluation, AssetDaemonAssetCursor]: - """Evaluates the auto materialize policy of a given asset. - - Returns: - - A ConditionEvaluation object representing information about this evaluation. If - `report_num_skipped` is set to `True`, then this will attempt to calculate the number of - skipped partitions in a backwards-compatible way. This can only be done for policies that - are in the format `(a | b | ...) & ~(c | d | ...). - - A new AssetDaemonAssetCursor that represents the state of the world after this evaluation. - """ - condition_context = context.get_root_condition_context() - condition_evaluation = self.condition.evaluate(condition_context) - - return condition_evaluation, context.get_new_asset_cursor(evaluation=condition_evaluation) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_automation_condition_context.py b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py similarity index 86% rename from python_modules/dagster/dagster/_core/definitions/asset_automation_condition_context.py rename to python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py index 67d83eaac8582..c8a77a7e01d53 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_automation_condition_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py @@ -15,27 +15,27 @@ from .asset_subset import AssetSubset if TYPE_CHECKING: - from dagster._core.definitions.asset_automation_evaluator import AssetSubsetWithMetdata + from dagster._core.definitions.asset_condition import AssetSubsetWithMetdata - from .asset_automation_evaluator import AutomationCondition, ConditionEvaluation + from .asset_condition import AssetCondition, AssetConditionEvaluation from .asset_daemon_context import AssetDaemonContext @dataclass(frozen=True) -class AssetAutomationEvaluationContext: +class RootAssetConditionEvaluationContext: """Context object containing methods and properties used for evaluating the entire state of an asset's automation rules. """ asset_key: AssetKey asset_cursor: Optional[AssetDaemonAssetCursor] - root_condition: "AutomationCondition" + root_condition: "AssetCondition" instance_queryer: CachingInstanceQueryer data_time_resolver: CachingDataTimeResolver daemon_context: "AssetDaemonContext" - evaluation_results_by_key: Mapping[AssetKey, "ConditionEvaluation"] + evaluation_results_by_key: Mapping[AssetKey, "AssetConditionEvaluation"] expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]] @property @@ -52,7 +52,7 @@ def evaluation_time(self) -> datetime.datetime: return self.instance_queryer.evaluation_time @functools.cached_property - def latest_evaluation(self) -> Optional["ConditionEvaluation"]: + def latest_evaluation(self) -> Optional["AssetConditionEvaluation"]: if not self.asset_cursor: return None return self.asset_cursor.latest_evaluation @@ -175,9 +175,9 @@ def will_update_asset_partition(self, asset_partition: AssetKeyPartitionKey) -> def empty_subset(self) -> AssetSubset: return AssetSubset.empty(self.asset_key, self.partitions_def) - def get_root_condition_context(self) -> "AssetAutomationConditionEvaluationContext": - return AssetAutomationConditionEvaluationContext( - asset_context=self, + def get_root_condition_context(self) -> "AssetConditionEvaluationContext": + return AssetConditionEvaluationContext( + root_context=self, condition=self.root_condition, candidate_subset=AssetSubset.all( asset_key=self.asset_key, @@ -188,7 +188,9 @@ def get_root_condition_context(self) -> "AssetAutomationConditionEvaluationConte latest_evaluation=self.latest_evaluation, ) - def get_new_asset_cursor(self, evaluation: "ConditionEvaluation") -> AssetDaemonAssetCursor: + def get_new_asset_cursor( + self, evaluation: "AssetConditionEvaluation" + ) -> AssetDaemonAssetCursor: """Returns a new AssetDaemonAssetCursor based on the current cursor and the results of this tick's evaluation. """ @@ -213,33 +215,33 @@ def get_new_asset_cursor(self, evaluation: "ConditionEvaluation") -> AssetDaemon @dataclass(frozen=True) -class AssetAutomationConditionEvaluationContext: - """Context object containing methods and properties used for evaluating a particular AutomationCondition.""" +class AssetConditionEvaluationContext: + """Context object containing methods and properties used for evaluating a particular AssetCondition.""" - asset_context: AssetAutomationEvaluationContext - condition: "AutomationCondition" + root_context: RootAssetConditionEvaluationContext + condition: "AssetCondition" candidate_subset: AssetSubset - latest_evaluation: Optional["ConditionEvaluation"] + latest_evaluation: Optional["AssetConditionEvaluation"] @property def asset_key(self) -> AssetKey: - return self.asset_context.asset_key + return self.root_context.asset_key @property def partitions_def(self) -> Optional[PartitionsDefinition]: - return self.asset_context.partitions_def + return self.root_context.partitions_def @property def asset_cursor(self) -> Optional[AssetDaemonAssetCursor]: - return self.asset_context.asset_cursor + return self.root_context.asset_cursor @property def asset_graph(self) -> AssetGraph: - return self.asset_context.asset_graph + return self.root_context.asset_graph @property def instance_queryer(self) -> CachingInstanceQueryer: - return self.asset_context.instance_queryer + return self.root_context.instance_queryer @property def max_storage_id(self) -> Optional[int]: @@ -264,9 +266,9 @@ def parent_has_updated_subset(self) -> AssetSubset: return AssetSubset.from_asset_partitions_set( self.asset_key, self.partitions_def, - self.asset_context.instance_queryer.asset_partitions_with_newly_updated_parents( + self.root_context.instance_queryer.asset_partitions_with_newly_updated_parents( latest_storage_id=self.max_storage_id, - child_asset_key=self.asset_context.asset_key, + child_asset_key=self.root_context.asset_key, map_old_time_partitions=False, ), ) @@ -277,7 +279,7 @@ def candidate_parent_has_or_will_update_subset(self) -> AssetSubset: the previous tick, or will update on this tick. """ return self.candidate_subset & ( - self.parent_has_updated_subset | self.asset_context.parent_will_update_subset + self.parent_has_updated_subset | self.root_context.parent_will_update_subset ) @property @@ -292,12 +294,12 @@ def candidates_not_evaluated_on_previous_tick_subset(self) -> AssetSubset: @property def materialized_since_previous_tick_subset(self) -> AssetSubset: """Returns the set of asset partitions that were materialized since the previous tick.""" - return self.asset_context.materialized_since_previous_tick_subset + return self.root_context.materialized_since_previous_tick_subset @property def materialized_requested_or_discarded_since_previous_tick_subset(self) -> AssetSubset: """Returns the set of asset partitions that were materialized since the previous tick.""" - return self.asset_context.materialized_requested_or_discarded_since_previous_tick_subset + return self.root_context.materialized_requested_or_discarded_since_previous_tick_subset @property def previous_tick_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetdata"]: @@ -305,13 +307,13 @@ def previous_tick_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetdat return self.latest_evaluation.subsets_with_metadata if self.latest_evaluation else [] def empty_subset(self) -> AssetSubset: - return self.asset_context.empty_subset() + return self.root_context.empty_subset() def for_child( - self, condition: "AutomationCondition", candidate_subset: AssetSubset - ) -> "AssetAutomationConditionEvaluationContext": - return AssetAutomationConditionEvaluationContext( - asset_context=self.asset_context, + self, condition: "AssetCondition", candidate_subset: AssetSubset, child_index: int + ) -> "AssetConditionEvaluationContext": + return AssetConditionEvaluationContext( + root_context=self.root_context, condition=condition, candidate_subset=candidate_subset, latest_evaluation=self.latest_evaluation.for_child(condition) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 323e4bd6657b3..a152fbcf92e7f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -35,8 +35,8 @@ from ... import PartitionKeyRange from ..storage.tags import ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG -from .asset_automation_condition_context import AssetAutomationEvaluationContext -from .asset_automation_evaluator import ConditionEvaluation +from .asset_condition import AssetConditionEvaluation +from .asset_condition_evaluation_context import RootAssetConditionEvaluationContext from .asset_daemon_cursor import AssetDaemonAssetCursor, AssetDaemonCursor from .asset_graph import AssetGraph from .auto_materialize_rule import AutoMaterializeRule @@ -220,9 +220,9 @@ def get_new_latest_storage_id(self) -> Optional[int]: def evaluate_asset( self, asset_key: AssetKey, - evaluation_results_by_key: Mapping[AssetKey, ConditionEvaluation], + evaluation_results_by_key: Mapping[AssetKey, AssetConditionEvaluation], expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]], - ) -> Tuple[ConditionEvaluation, AssetDaemonAssetCursor, Optional[datetime.datetime]]: + ) -> Tuple[AssetConditionEvaluation, AssetDaemonAssetCursor, Optional[datetime.datetime]]: """Evaluates the auto materialize policy of a given asset key. Params: @@ -236,21 +236,25 @@ def evaluate_asset( """ # convert the legacy AutoMaterializePolicy to an Evaluator - auto_materialize_policy_evaluator = check.not_none( + asset_condition = check.not_none( self.asset_graph.auto_materialize_policies_by_key.get(asset_key) - ).to_auto_materialize_policy_evaluator() + ).to_asset_condition() - context = AssetAutomationEvaluationContext( + context = RootAssetConditionEvaluationContext( asset_key=asset_key, asset_cursor=self.cursor.asset_cursor_for_key(asset_key, self.asset_graph), - root_condition=auto_materialize_policy_evaluator.condition, + root_condition=asset_condition, instance_queryer=self.instance_queryer, data_time_resolver=self.data_time_resolver, daemon_context=self, evaluation_results_by_key=evaluation_results_by_key, expected_data_time_mapping=expected_data_time_mapping, ) - evaluation, asset_cursor = auto_materialize_policy_evaluator.evaluate(context) + condition_context = context.get_root_condition_context() + + evaluation = asset_condition.evaluate(condition_context) + asset_cursor = context.get_new_asset_cursor(evaluation=evaluation) + expected_data_time = get_expected_data_time_for_asset_key( context, will_materialize=evaluation.true_subset.size > 0 ) @@ -269,7 +273,7 @@ def get_auto_materialize_asset_evaluations( """ asset_cursors: List[AssetDaemonAssetCursor] = [] - evaluation_results_by_key: Dict[AssetKey, ConditionEvaluation] = {} + evaluation_results_by_key: Dict[AssetKey, AssetConditionEvaluation] = {} legacy_evaluation_results_by_key: Dict[AssetKey, AutoMaterializeAssetEvaluation] = {} expected_data_time_mapping: Dict[AssetKey, Optional[datetime.datetime]] = defaultdict() to_request: Set[AssetKeyPartitionKey] = set() diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index 64918c2a94be8..2cc609ab7099a 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -25,7 +25,7 @@ from .partition import PartitionsSubset if TYPE_CHECKING: - from .asset_automation_evaluator import ConditionEvaluation + from .asset_condition import AssetConditionEvaluation class AssetDaemonAssetCursor(NamedTuple): @@ -36,7 +36,7 @@ class AssetDaemonAssetCursor(NamedTuple): asset_key: AssetKey latest_storage_id: Optional[int] latest_evaluation_timestamp: Optional[float] - latest_evaluation: Optional["ConditionEvaluation"] + latest_evaluation: Optional["AssetConditionEvaluation"] materialized_requested_or_discarded_subset: AssetSubset @@ -73,7 +73,7 @@ def was_previously_handled(self, asset_key: AssetKey) -> bool: def asset_cursor_for_key( self, asset_key: AssetKey, asset_graph: AssetGraph ) -> AssetDaemonAssetCursor: - from .asset_automation_evaluator import ConditionEvaluation + from .asset_condition import AssetConditionEvaluation partitions_def = asset_graph.get_partitions_def(asset_key) handled_partitions_subset = self.handled_root_partitions_by_asset_key.get(asset_key) @@ -83,16 +83,14 @@ def asset_cursor_for_key( handled_subset = AssetSubset(asset_key=asset_key, value=True) else: handled_subset = AssetSubset.empty(asset_key, partitions_def) - condition = ( - check.not_none(asset_graph.get_auto_materialize_policy(asset_key)) - .to_auto_materialize_policy_evaluator() - .condition - ) + condition = check.not_none( + asset_graph.get_auto_materialize_policy(asset_key) + ).to_asset_condition() return AssetDaemonAssetCursor( asset_key=asset_key, latest_storage_id=self.latest_storage_id, latest_evaluation_timestamp=self.latest_evaluation_timestamp, - latest_evaluation=ConditionEvaluation.from_evaluation( + latest_evaluation=AssetConditionEvaluation.from_evaluation( condition=condition, evaluation=self.latest_evaluation_by_asset_key.get(asset_key), asset_graph=asset_graph, diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py index b5aac93e1cc22..1e2375258baf0 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py @@ -11,7 +11,7 @@ ) if TYPE_CHECKING: - from dagster._core.definitions.asset_automation_evaluator import AssetAutomationEvaluator + from dagster._core.definitions.asset_condition import AssetCondition from dagster._core.definitions.auto_materialize_rule import ( AutoMaterializeRule, AutoMaterializeRuleSnapshot, @@ -253,24 +253,23 @@ def policy_type(self) -> AutoMaterializePolicyType: def rule_snapshots(self) -> Sequence["AutoMaterializeRuleSnapshot"]: return [rule.to_snapshot() for rule in self.rules] - def to_auto_materialize_policy_evaluator(self) -> "AssetAutomationEvaluator": + def to_asset_condition(self) -> "AssetCondition": """Converts a set of materialize / skip rules into a single binary expression.""" - from .asset_automation_evaluator import ( - AndAutomationCondition, - AssetAutomationEvaluator, - NotAutomationCondition, - OrAutomationCondition, + from .asset_condition import ( + AndAssetCondition, + NotAssetCondition, + OrAssetCondition, RuleCondition, ) from .auto_materialize_rule import DiscardOnMaxMaterializationsExceededRule - materialize_condition = OrAutomationCondition( + materialize_condition = OrAssetCondition( children=[ RuleCondition(rule) for rule in sorted(self.materialize_rules, key=lambda rule: rule.description) ] ) - skip_condition = OrAutomationCondition( + skip_condition = OrAssetCondition( children=[ RuleCondition(rule) for rule in sorted(self.skip_rules, key=lambda rule: rule.description) @@ -278,13 +277,13 @@ def to_auto_materialize_policy_evaluator(self) -> "AssetAutomationEvaluator": ) children = [ materialize_condition, - NotAutomationCondition([skip_condition]), + NotAssetCondition([skip_condition]), ] if self.max_materializations_per_minute: discard_condition = RuleCondition( DiscardOnMaxMaterializationsExceededRule(self.max_materializations_per_minute) ) - children.append(NotAutomationCondition([discard_condition])) + children.append(NotAssetCondition([discard_condition])) # results in an expression of the form (m1 | m2 | ... | mn) & ~(s1 | s2 | ... | sn) & ~d - return AssetAutomationEvaluator(condition=AndAutomationCondition(children)) + return AndAssetCondition(children) diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 6a2debaab8239..dfd0d16ce75ea 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -44,7 +44,7 @@ reverse_cron_string_iterator, ) -from .asset_automation_condition_context import AssetAutomationConditionEvaluationContext +from .asset_condition_evaluation_context import AssetConditionEvaluationContext from .asset_graph import sort_key_for_asset_partition @@ -74,7 +74,7 @@ def description(self) -> str: def add_evaluation_data_from_previous_tick( self, - context: AssetAutomationConditionEvaluationContext, + context: AssetConditionEvaluationContext, asset_partitions_by_evaluation_data: Mapping[ AutoMaterializeRuleEvaluationData, Set[AssetKeyPartitionKey] ], @@ -90,7 +90,7 @@ def add_evaluation_data_from_previous_tick( ignore_subset: An AssetSubset which represents information that we should *not* carry forward from the previous tick. """ - from .asset_automation_evaluator import AssetSubsetWithMetdata + from .asset_condition import AssetSubsetWithMetdata mapping = defaultdict(lambda: context.empty_subset()) for evaluation_data, asset_partitions in asset_partitions_by_evaluation_data.items(): @@ -125,9 +125,7 @@ def add_evaluation_data_from_previous_tick( ) @abstractmethod - def evaluate_for_asset( - self, context: AssetAutomationConditionEvaluationContext - ) -> RuleEvaluationResults: + def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: """The core evaluation function for the rule. This function takes in a context object and returns a mapping from evaluated rules to the set of asset partitions that the rule applies to. @@ -286,10 +284,8 @@ def decision_type(self) -> AutoMaterializeDecisionType: def description(self) -> str: return "required to meet this or downstream asset's freshness policy" - def evaluate_for_asset( - self, context: AssetAutomationConditionEvaluationContext - ) -> RuleEvaluationResults: - return freshness_evaluation_results_for_asset_key(context.asset_context) + def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: + return freshness_evaluation_results_for_asset_key(context.root_context) @whitelist_for_serdes @@ -309,13 +305,13 @@ def description(self) -> str: return f"not materialized since last cron schedule tick of '{self.cron_schedule}' (timezone: {self.timezone})" def missed_cron_ticks( - self, context: AssetAutomationConditionEvaluationContext + self, context: AssetConditionEvaluationContext ) -> Sequence[datetime.datetime]: """Returns the cron ticks which have been missed since the previous cursor was generated.""" if not context.latest_evaluation_timestamp: previous_dt = next( reverse_cron_string_iterator( - end_timestamp=context.asset_context.evaluation_time.timestamp(), + end_timestamp=context.root_context.evaluation_time.timestamp(), cron_string=self.cron_schedule, execution_timezone=self.timezone, ) @@ -327,13 +323,13 @@ def missed_cron_ticks( cron_string=self.cron_schedule, execution_timezone=self.timezone, ): - if dt > context.asset_context.evaluation_time: + if dt > context.root_context.evaluation_time: break missed_ticks.append(dt) return missed_ticks def get_new_asset_partitions_to_request( - self, context: AssetAutomationConditionEvaluationContext + self, context: AssetConditionEvaluationContext ) -> AbstractSet[AssetKeyPartitionKey]: missed_ticks = self.missed_cron_ticks(context) @@ -349,7 +345,7 @@ def get_new_asset_partitions_to_request( return { AssetKeyPartitionKey(context.asset_key, partition_key) for partition_key in partitions_def.get_partition_keys( - current_time=context.asset_context.evaluation_time, + current_time=context.root_context.evaluation_time, dynamic_partitions_store=context.instance_queryer, ) } @@ -395,9 +391,7 @@ def get_new_asset_partitions_to_request( for time_partition_key in missed_time_partition_keys } - def evaluate_for_asset( - self, context: AssetAutomationConditionEvaluationContext - ) -> RuleEvaluationResults: + def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: new_asset_partitions_to_request = self.get_new_asset_partitions_to_request(context) asset_subset_to_request = AssetSubset.from_asset_partitions_set( context.asset_key, context.partitions_def, new_asset_partitions_to_request @@ -432,7 +426,7 @@ def description(self) -> str: def passes( self, - context: AssetAutomationConditionEvaluationContext, + context: AssetConditionEvaluationContext, asset_partitions: Iterable[AssetKeyPartitionKey], ) -> Iterable[AssetKeyPartitionKey]: if self.latest_run_required_tags is None: @@ -442,7 +436,7 @@ def passes( asset_partitions_by_latest_run_id: Dict[str, Set[AssetKeyPartitionKey]] = defaultdict(set) for asset_partition in asset_partitions: - if context.asset_context.will_update_asset_partition(asset_partition): + if context.root_context.will_update_asset_partition(asset_partition): will_update_asset_partitions.add(asset_partition) else: record = context.instance_queryer.get_latest_materialization_or_observation_record( @@ -477,7 +471,7 @@ def passes( self.latest_run_required_tags.items() <= { AUTO_MATERIALIZE_TAG: "true", - **context.asset_context.daemon_context.auto_materialize_run_tags, + **context.root_context.daemon_context.auto_materialize_run_tags, }.items() ): return will_update_asset_partitions | updated_partitions_with_required_tags @@ -511,9 +505,7 @@ def description(self) -> str: else: return base - def evaluate_for_asset( - self, context: AssetAutomationConditionEvaluationContext - ) -> RuleEvaluationResults: + def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: """Evaluates the set of asset partitions of this asset whose parents have been updated, or will update on this tick. """ @@ -538,7 +530,7 @@ def evaluate_for_asset( parent_asset_partitions, # do a precise check for updated parents, factoring in data versions, as long as # we're within reasonable limits on the number of partitions to check - respect_materialization_data_versions=context.asset_context.daemon_context.respect_materialization_data_versions + respect_materialization_data_versions=context.root_context.daemon_context.respect_materialization_data_versions and len(parent_asset_partitions) + subset_to_evaluate.size < 100, # ignore self-dependencies when checking for updated parents, to avoid historical # rematerializations from causing a chain of materializations to be kicked off @@ -548,7 +540,7 @@ def evaluate_for_asset( asset_partitions_by_updated_parents[parent].add(asset_partition) for parent in parent_asset_partitions: - if context.asset_context.will_update_asset_partition(parent): + if context.root_context.will_update_asset_partition(parent): asset_partitions_by_will_update_parents[parent].add(asset_partition) updated_and_will_update_parents = ( @@ -614,15 +606,13 @@ def decision_type(self) -> AutoMaterializeDecisionType: def description(self) -> str: return "materialization is missing" - def evaluate_for_asset( - self, context: AssetAutomationConditionEvaluationContext - ) -> RuleEvaluationResults: + def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: """Evaluates the set of asset partitions for this asset which are missing and were not previously discarded. Currently only applies to root asset partitions and asset partitions with updated parents. """ missing_asset_partitions = set( - context.asset_context.never_materialized_requested_or_discarded_root_subset.asset_partitions + context.root_context.never_materialized_requested_or_discarded_root_subset.asset_partitions ) # in addition to missing root asset partitions, check any asset partitions with updated # parents to see if they're missing @@ -652,9 +642,7 @@ def decision_type(self) -> AutoMaterializeDecisionType: def description(self) -> str: return "waiting on upstream data to be up to date" - def evaluate_for_asset( - self, context: AssetAutomationConditionEvaluationContext - ) -> RuleEvaluationResults: + def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: asset_partitions_by_evaluation_data = defaultdict(set) # only need to evaluate net-new candidates and candidates whose parents have changed @@ -667,7 +655,7 @@ def evaluate_for_asset( # find the root cause of why this asset partition's parents are outdated (if any) for ( parent - ) in context.asset_context.get_parents_that_will_not_be_materialized_on_current_tick( + ) in context.root_context.get_parents_that_will_not_be_materialized_on_current_tick( asset_partition=candidate ): if context.instance_queryer.have_ignorable_partition_mapping_for_outdated( @@ -701,7 +689,7 @@ def description(self) -> str: def evaluate_for_asset( self, - context: AssetAutomationConditionEvaluationContext, + context: AssetConditionEvaluationContext, ) -> RuleEvaluationResults: asset_partitions_by_evaluation_data = defaultdict(set) @@ -714,13 +702,13 @@ def evaluate_for_asset( missing_parent_asset_keys = set() for ( parent - ) in context.asset_context.get_parents_that_will_not_be_materialized_on_current_tick( + ) in context.root_context.get_parents_that_will_not_be_materialized_on_current_tick( asset_partition=candidate ): # ignore non-observable sources, which will never have a materialization or observation - if context.asset_context.asset_graph.is_source( + if context.root_context.asset_graph.is_source( parent.asset_key - ) and not context.asset_context.asset_graph.is_observable(parent.asset_key): + ) and not context.root_context.asset_graph.is_observable(parent.asset_key): continue if not context.instance_queryer.asset_partition_has_materialization_or_observation( parent @@ -770,7 +758,7 @@ def description(self) -> str: def evaluate_for_asset( self, - context: AssetAutomationConditionEvaluationContext, + context: AssetConditionEvaluationContext, ) -> RuleEvaluationResults: asset_partitions_by_evaluation_data = defaultdict(set) @@ -791,10 +779,10 @@ def evaluate_for_asset( context.instance_queryer.get_parent_asset_partitions_updated_after_child( candidate, parent_partitions, - context.asset_context.daemon_context.respect_materialization_data_versions, + context.root_context.daemon_context.respect_materialization_data_versions, ignored_parent_keys=set(), ) - | context.asset_context.parent_will_update_subset.asset_partitions + | context.root_context.parent_will_update_subset.asset_partitions ) if self.require_update_for_all_parent_partitions: @@ -836,9 +824,7 @@ def decision_type(self) -> AutoMaterializeDecisionType: def description(self) -> str: return "required parent partitions do not exist" - def evaluate_for_asset( - self, context: AssetAutomationConditionEvaluationContext - ) -> RuleEvaluationResults: + def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: asset_partitions_by_evaluation_data = defaultdict(set) subset_to_evaluate = ( @@ -882,12 +868,10 @@ def description(self) -> str: else: return "targeted by an in-progress backfill" - def evaluate_for_asset( - self, context: AssetAutomationConditionEvaluationContext - ) -> RuleEvaluationResults: + def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: backfilling_subset = ( context.instance_queryer.get_active_backfill_target_asset_graph_subset() - ).get_asset_subset(context.asset_key, context.asset_context.asset_graph) + ).get_asset_subset(context.asset_key, context.root_context.asset_graph) if backfilling_subset.size == 0: return context.empty_subset(), [] @@ -912,9 +896,7 @@ def decision_type(self) -> AutoMaterializeDecisionType: def description(self) -> str: return f"exceeds {self.limit} materialization(s) per minute" - def evaluate_for_asset( - self, context: AssetAutomationConditionEvaluationContext - ) -> RuleEvaluationResults: + def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: # the set of asset partitions which exceed the limit rate_limited_asset_partitions = set( sorted( diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py index 701d6b988c2fd..726d35dd25d8e 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py @@ -29,7 +29,7 @@ from .partition import SerializedPartitionsSubset if TYPE_CHECKING: - from dagster._core.definitions.asset_automation_evaluator import AssetSubsetWithMetdata + from dagster._core.definitions.asset_condition import AssetSubsetWithMetdata from dagster._core.instance import DynamicPartitionsStore @@ -272,7 +272,7 @@ def get_rule_evaluation_results( self, rule_snapshot: AutoMaterializeRuleSnapshot, asset_graph: AssetGraph ) -> RuleEvaluationResults: """For a given rule snapshot, returns the calculated evaluations for that rule.""" - from dagster._core.definitions.asset_automation_evaluator import AssetSubsetWithMetdata + from dagster._core.definitions.asset_condition import AssetSubsetWithMetdata true_subset = AssetSubset.empty( self.asset_key, asset_graph.get_partitions_def(self.asset_key) diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py index fb632b631b9d3..e3468e1c4e4a2 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py @@ -12,14 +12,14 @@ import pendulum -from dagster._core.definitions.asset_automation_evaluator import AssetSubsetWithMetdata +from dagster._core.definitions.asset_condition import AssetSubsetWithMetdata from dagster._core.definitions.asset_subset import AssetSubset from dagster._core.definitions.events import AssetKeyPartitionKey from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._utils.schedules import cron_string_iterator if TYPE_CHECKING: - from .asset_automation_condition_context import AssetAutomationEvaluationContext + from .asset_condition_evaluation_context import RootAssetConditionEvaluationContext from .auto_materialize_rule_evaluation import RuleEvaluationResults, TextRuleEvaluationData @@ -111,7 +111,7 @@ def get_execution_period_and_evaluation_data_for_policies( def get_expected_data_time_for_asset_key( - context: "AssetAutomationEvaluationContext", will_materialize: bool + context: "RootAssetConditionEvaluationContext", will_materialize: bool ) -> Optional[datetime.datetime]: """Returns the data time that you would expect this asset to have if you were to execute it on this tick. @@ -154,7 +154,7 @@ def get_expected_data_time_for_asset_key( def freshness_evaluation_results_for_asset_key( - context: "AssetAutomationEvaluationContext", + context: "RootAssetConditionEvaluationContext", ) -> "RuleEvaluationResults": """Returns a set of AssetKeyPartitionKeys to materialize in order to abide by the given FreshnessPolicies.