From ed2b2caee0b4a9b3c7f074726119aedcfdef3007 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Tue, 12 Dec 2023 15:00:05 -0800 Subject: [PATCH] Simplify Context --- .../_core/definitions/asset_condition.py | 44 +-- .../asset_condition_evaluation_context.py | 277 +++++++++--------- .../_core/definitions/asset_daemon_context.py | 11 +- .../_core/definitions/asset_daemon_cursor.py | 12 +- .../definitions/auto_materialize_rule.py | 38 ++- .../auto_materialize_rule_evaluation.py | 43 +-- .../freshness_based_auto_materialize.py | 6 +- .../dagster/_utils/test/schedule_storage.py | 10 +- scripts/run-pyright.py | 4 +- 9 files changed, 225 insertions(+), 220 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition.py b/python_modules/dagster/dagster/_core/definitions/asset_condition.py index c23a368685b9d..95951555bfbc3 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition.py @@ -32,14 +32,7 @@ class AssetConditionSnapshot(NamedTuple): class_name: str description: str - child_hashes: Sequence[str] - - @property - def hash(self) -> str: - """Returns a unique hash for this node in the tree.""" - return hashlib.md5( - "".join([self.class_name, self.description, *self.child_hashes]).encode("utf-8") - ).hexdigest() + unique_id: str @whitelist_for_serdes @@ -92,9 +85,9 @@ def for_child(self, child_condition: "AssetCondition") -> Optional["AssetConditi """Returns the evaluation of a given child condition by finding the child evaluation that has an identical hash to the given condition. """ - child_hash = child_condition.snapshot.hash + child_unique_id = child_condition.snapshot.unique_id for child_evaluation in self.child_evaluations: - if child_evaluation.condition_snapshot.hash == child_hash: + if child_evaluation.condition_snapshot.unique_id == child_unique_id: return child_evaluation return None @@ -127,6 +120,14 @@ class AssetCondition(ABC): new conditions using the `&` (and), `|` (or), and `~` (not) operators. """ + @property + def unique_id(self) -> str: + parts = [ + self.__class__.__name__, + *[child.unique_id for child in self.children], + ] + return hashlib.md5("".join(parts).encode()).hexdigest() + @abstractmethod def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: raise NotImplementedError() @@ -164,10 +165,6 @@ def is_legacy(self) -> bool: def children(self) -> Sequence["AssetCondition"]: return [] - @property - def indexed_children(self) -> Sequence[Tuple[int, "AssetCondition"]]: - return list(enumerate(self.children)) - @property def not_discard_condition(self) -> Optional["AssetCondition"]: if not self.is_legacy or not len(self.children) == 3: @@ -180,7 +177,7 @@ def snapshot(self) -> AssetConditionSnapshot: return AssetConditionSnapshot( class_name=self.__class__.__name__, description=str(self), - child_hashes=[child.snapshot.hash for child in self.children], + unique_id=self.unique_id, ) @@ -190,6 +187,11 @@ class RuleCondition( ): """This class represents the condition that a particular AutoMaterializeRule is satisfied.""" + @property + def unique_id(self) -> str: + parts = [self.rule.__class__.__name__, self.rule.description] + return hashlib.md5("".join(parts).encode()).hexdigest() + def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: context.root_context.daemon_context._verbose_log_fn( # noqa f"Evaluating rule: {self.rule.to_snapshot()}" @@ -215,10 +217,8 @@ class AndAssetCondition( def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: child_evaluations: List[AssetConditionEvaluation] = [] true_subset = context.candidate_subset - for index, child in self.indexed_children: - child_context = context.for_child( - condition=child, candidate_subset=true_subset, child_index=index - ) + for child in self.children: + child_context = context.for_child(condition=child, candidate_subset=true_subset) result = child.evaluate(child_context) child_evaluations.append(result) true_subset &= result.true_subset @@ -239,9 +239,9 @@ class OrAssetCondition( def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: child_evaluations: List[AssetConditionEvaluation] = [] true_subset = context.empty_subset() - for index, child in self.indexed_children: + for child in self.children: child_context = context.for_child( - condition=child, candidate_subset=context.candidate_subset, child_index=index + condition=child, candidate_subset=context.candidate_subset ) result = child.evaluate(child_context) child_evaluations.append(result) @@ -270,7 +270,7 @@ def child(self) -> AssetCondition: def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: child_context = context.for_child( - condition=self.child, candidate_subset=context.candidate_subset, child_index=0 + condition=self.child, candidate_subset=context.candidate_subset ) result = self.child.evaluate(child_context) true_subset = context.candidate_subset - result.true_subset diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py index 565fdbd579665..e8593ab139ec3 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py @@ -1,7 +1,8 @@ +import dataclasses import datetime import functools from dataclasses import dataclass -from typing import TYPE_CHECKING, AbstractSet, Mapping, Optional, Sequence +from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Mapping, Optional, Sequence from dagster._core.definitions.data_time import CachingDataTimeResolver from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey @@ -21,15 +22,28 @@ from .asset_daemon_context import AssetDaemonContext +def root_property(fn: Callable[[Any], Any]) -> Callable[[Any], Any]: + """Ensures that a given property is always accessed via the root context, ensuring that any + cached properties are accessed correctly. + """ + + def wrapped(self: Any) -> Any: + return fn(self.root_context) + + return wrapped + + @dataclass(frozen=True) -class RootAssetConditionEvaluationContext: +class AssetConditionEvaluationContext: """Context object containing methods and properties used for evaluating the entire state of an asset's automation rules. """ asset_key: AssetKey + condition: "AssetCondition" asset_cursor: Optional[AssetDaemonAssetCursor] - root_condition: "AssetCondition" + previous_evaluation: Optional["AssetConditionEvaluation"] + candidate_subset: AssetSubset instance_queryer: CachingInstanceQueryer data_time_resolver: CachingDataTimeResolver @@ -38,6 +52,55 @@ class RootAssetConditionEvaluationContext: evaluation_results_by_key: Mapping[AssetKey, "AssetConditionEvaluation"] expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]] + root_ref: Optional["AssetConditionEvaluationContext"] = None + + @staticmethod + def create( + asset_key: AssetKey, + condition: "AssetCondition", + asset_cursor: Optional[AssetDaemonAssetCursor], + instance_queryer: CachingInstanceQueryer, + data_time_resolver: CachingDataTimeResolver, + daemon_context: "AssetDaemonContext", + evaluation_results_by_key: Mapping[AssetKey, "AssetConditionEvaluation"], + expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]], + ) -> "AssetConditionEvaluationContext": + return AssetConditionEvaluationContext( + asset_key=asset_key, + condition=condition, + asset_cursor=asset_cursor, + previous_evaluation=asset_cursor.previous_evaluation if asset_cursor else None, + candidate_subset=AssetSubset.all( + asset_key, + instance_queryer.asset_graph.get_partitions_def(asset_key), + instance_queryer, + instance_queryer.evaluation_time, + ), + data_time_resolver=data_time_resolver, + instance_queryer=instance_queryer, + daemon_context=daemon_context, + evaluation_results_by_key=evaluation_results_by_key, + expected_data_time_mapping=expected_data_time_mapping, + ) + + def for_child( + self, condition: "AssetCondition", candidate_subset: AssetSubset + ) -> "AssetConditionEvaluationContext": + return dataclasses.replace( + self, + condition=condition, + candidate_subset=candidate_subset, + previous_evaluation=self.previous_evaluation.for_child(condition) + if self.previous_evaluation + else None, + root_ref=self.root_context, + ) + + @property + def root_context(self) -> "AssetConditionEvaluationContext": + """A reference to the context of the root condition for this evaluation.""" + return self.root_ref or self + @property def asset_graph(self) -> AssetGraph: return self.instance_queryer.asset_graph @@ -51,13 +114,20 @@ def evaluation_time(self) -> datetime.datetime: """Returns the time at which this rule is being evaluated.""" return self.instance_queryer.evaluation_time - @functools.cached_property - def latest_evaluation(self) -> Optional["AssetConditionEvaluation"]: + @property + def previous_max_storage_id(self) -> Optional[int]: + if not self.asset_cursor: + return None + return self.asset_cursor.previous_max_storage_id + + @property + def previous_evaluation_timestamp(self) -> Optional[float]: if not self.asset_cursor: return None - return self.asset_cursor.latest_evaluation + return self.asset_cursor.previous_evaluation_timestamp @functools.cached_property + @root_property def parent_will_update_subset(self) -> AssetSubset: """Returns the set of asset partitions whose parents will be updated on this tick, and which can be materialized in the same run as this asset. @@ -73,14 +143,16 @@ def parent_will_update_subset(self) -> AssetSubset: subset |= parent_subset._replace(asset_key=self.asset_key) return subset - @property + @functools.cached_property + @root_property def previous_tick_requested_subset(self) -> AssetSubset: """Returns the set of asset partitions that were requested on the previous tick.""" - if not self.latest_evaluation: + if not self.previous_evaluation: return self.empty_subset() - return self.latest_evaluation.true_subset + return self.previous_evaluation.true_subset @functools.cached_property + @root_property def materialized_since_previous_tick_subset(self) -> AssetSubset: """Returns the set of asset partitions that were materialized since the previous tick.""" return AssetSubset.from_asset_partitions_set( @@ -89,23 +161,27 @@ def materialized_since_previous_tick_subset(self) -> AssetSubset: self.instance_queryer.get_asset_partitions_updated_after_cursor( self.asset_key, asset_partitions=None, - after_cursor=self.asset_cursor.latest_storage_id if self.asset_cursor else None, + after_cursor=self.asset_cursor.previous_max_storage_id + if self.asset_cursor + else None, respect_materialization_data_versions=False, ), ) @functools.cached_property + @root_property def materialized_requested_or_discarded_since_previous_tick_subset(self) -> AssetSubset: """Returns the set of asset partitions that were materialized since the previous tick.""" - if not self.latest_evaluation: + if not self.previous_evaluation: return self.materialized_since_previous_tick_subset return ( self.materialized_since_previous_tick_subset - | self.latest_evaluation.true_subset - | (self.latest_evaluation.discard_subset(self.root_condition) or self.empty_subset()) + | self.previous_evaluation.true_subset + | (self.previous_evaluation.discard_subset(self.condition) or self.empty_subset()) ) @functools.cached_property + @root_property def never_materialized_requested_or_discarded_root_subset(self) -> AssetSubset: if self.asset_key not in self.asset_graph.root_materializable_or_observable_asset_keys: return self.empty_subset() @@ -122,6 +198,52 @@ def never_materialized_requested_or_discarded_root_subset(self) -> AssetSubset: ) return unhandled_subset - self.materialized_since_previous_tick_subset + @property + @root_property + def parent_has_updated_subset(self) -> AssetSubset: + """Returns the set of asset partitions whose parents have updated since the last time this + condition was evaluated. + """ + return AssetSubset.from_asset_partitions_set( + self.asset_key, + self.partitions_def, + self.root_context.instance_queryer.asset_partitions_with_newly_updated_parents( + latest_storage_id=self.previous_max_storage_id, + child_asset_key=self.root_context.asset_key, + map_old_time_partitions=False, + ), + ) + + @property + def candidate_parent_has_or_will_update_subset(self) -> AssetSubset: + """Returns the set of candidates for this tick which have parents that have updated since + the previous tick, or will update on this tick. + """ + return self.candidate_subset & ( + self.parent_has_updated_subset | self.root_context.parent_will_update_subset + ) + + @property + def candidates_not_evaluated_on_previous_tick_subset(self) -> AssetSubset: + """Returns the set of candidates for this tick which were not candidates on the previous + tick. + """ + if not self.previous_evaluation or not self.previous_evaluation.candidate_subset: + return self.candidate_subset + return self.candidate_subset - self.previous_evaluation.candidate_subset + + @property + def previous_tick_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetadata"]: + """Returns the RuleEvaluationResults calculated on the previous tick for this condition.""" + return self.previous_evaluation.subsets_with_metadata if self.previous_evaluation else [] + + @property + def previous_tick_true_subset(self) -> AssetSubset: + """Returns the set of asset partitions that were true for this condition on the previous tick.""" + if not self.previous_evaluation: + return self.empty_subset() + return self.previous_evaluation.true_subset + def materializable_in_same_run(self, child_key: AssetKey, parent_key: AssetKey) -> bool: """Returns whether a child asset can be materialized in the same run as a parent asset.""" from dagster._core.definitions.external_asset_graph import ExternalAssetGraph @@ -175,19 +297,6 @@ def will_update_asset_partition(self, asset_partition: AssetKeyPartitionKey) -> def empty_subset(self) -> AssetSubset: return AssetSubset.empty(self.asset_key, self.partitions_def) - def get_root_condition_context(self) -> "AssetConditionEvaluationContext": - return AssetConditionEvaluationContext( - root_context=self, - condition=self.root_condition, - candidate_subset=AssetSubset.all( - asset_key=self.asset_key, - partitions_def=self.partitions_def, - dynamic_partitions_store=self.instance_queryer, - current_time=self.instance_queryer.evaluation_time, - ), - latest_evaluation=self.latest_evaluation, - ) - def get_new_asset_cursor( self, evaluation: "AssetConditionEvaluation" ) -> AssetDaemonAssetCursor: @@ -203,120 +312,12 @@ def get_new_asset_cursor( previous_handled_subset | self.materialized_requested_or_discarded_since_previous_tick_subset | evaluation.true_subset - | (evaluation.discard_subset(self.root_condition) or self.empty_subset()) + | (evaluation.discard_subset(self.condition) or self.empty_subset()) ) return AssetDaemonAssetCursor( asset_key=self.asset_key, - latest_storage_id=self.daemon_context.get_new_latest_storage_id(), - latest_evaluation=evaluation, - latest_evaluation_timestamp=self.evaluation_time.timestamp(), + previous_max_storage_id=self.daemon_context.get_new_latest_storage_id(), + previous_evaluation=evaluation, + previous_evaluation_timestamp=self.evaluation_time.timestamp(), materialized_requested_or_discarded_subset=new_handled_subset, ) - - -@dataclass(frozen=True) -class AssetConditionEvaluationContext: - """Context object containing methods and properties used for evaluating a particular AssetCondition.""" - - root_context: RootAssetConditionEvaluationContext - condition: "AssetCondition" - candidate_subset: AssetSubset - latest_evaluation: Optional["AssetConditionEvaluation"] - - @property - def asset_key(self) -> AssetKey: - return self.root_context.asset_key - - @property - def partitions_def(self) -> Optional[PartitionsDefinition]: - return self.root_context.partitions_def - - @property - def asset_cursor(self) -> Optional[AssetDaemonAssetCursor]: - return self.root_context.asset_cursor - - @property - def asset_graph(self) -> AssetGraph: - return self.root_context.asset_graph - - @property - def instance_queryer(self) -> CachingInstanceQueryer: - return self.root_context.instance_queryer - - @property - def max_storage_id(self) -> Optional[int]: - return self.asset_cursor.latest_storage_id if self.asset_cursor else None - - @property - def latest_evaluation_timestamp(self) -> Optional[float]: - return self.asset_cursor.latest_evaluation_timestamp if self.asset_cursor else None - - @property - def previous_tick_true_subset(self) -> AssetSubset: - """Returns the set of asset partitions that were true on the previous tick.""" - if not self.latest_evaluation: - return self.empty_subset() - return self.latest_evaluation.true_subset - - @property - def parent_has_updated_subset(self) -> AssetSubset: - """Returns the set of asset partitions whose parents have updated since the last time this - condition was evaluated. - """ - return AssetSubset.from_asset_partitions_set( - self.asset_key, - self.partitions_def, - self.root_context.instance_queryer.asset_partitions_with_newly_updated_parents( - latest_storage_id=self.max_storage_id, - child_asset_key=self.root_context.asset_key, - map_old_time_partitions=False, - ), - ) - - @property - def candidate_parent_has_or_will_update_subset(self) -> AssetSubset: - """Returns the set of candidates for this tick which have parents that have updated since - the previous tick, or will update on this tick. - """ - return self.candidate_subset & ( - self.parent_has_updated_subset | self.root_context.parent_will_update_subset - ) - - @property - def candidates_not_evaluated_on_previous_tick_subset(self) -> AssetSubset: - """Returns the set of candidates for this tick which were not candidates on the previous - tick. - """ - if not self.latest_evaluation or not self.latest_evaluation.candidate_subset: - return self.candidate_subset - return self.candidate_subset - self.latest_evaluation.candidate_subset - - @property - def materialized_since_previous_tick_subset(self) -> AssetSubset: - """Returns the set of asset partitions that were materialized since the previous tick.""" - return self.root_context.materialized_since_previous_tick_subset - - @property - def materialized_requested_or_discarded_since_previous_tick_subset(self) -> AssetSubset: - """Returns the set of asset partitions that were materialized since the previous tick.""" - return self.root_context.materialized_requested_or_discarded_since_previous_tick_subset - - @property - def previous_tick_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetadata"]: - """Returns the RuleEvaluationResults calculated on the previous tick for this condition.""" - return self.latest_evaluation.subsets_with_metadata if self.latest_evaluation else [] - - def empty_subset(self) -> AssetSubset: - return self.root_context.empty_subset() - - def for_child( - self, condition: "AssetCondition", candidate_subset: AssetSubset, child_index: int - ) -> "AssetConditionEvaluationContext": - return AssetConditionEvaluationContext( - root_context=self.root_context, - condition=condition, - candidate_subset=candidate_subset, - latest_evaluation=self.latest_evaluation.for_child(condition) - if self.latest_evaluation - else None, - ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 651fd99ad5d7b..36feb348b25ee 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -36,7 +36,9 @@ from ... import PartitionKeyRange from ..storage.tags import ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG from .asset_condition import AssetConditionEvaluation -from .asset_condition_evaluation_context import RootAssetConditionEvaluationContext +from .asset_condition_evaluation_context import ( + AssetConditionEvaluationContext, +) from .asset_daemon_cursor import AssetDaemonAssetCursor, AssetDaemonCursor from .asset_graph import AssetGraph from .auto_materialize_rule import AutoMaterializeRule @@ -239,19 +241,18 @@ def evaluate_asset( self.asset_graph.auto_materialize_policies_by_key.get(asset_key) ).to_asset_condition() - context = RootAssetConditionEvaluationContext( + context = AssetConditionEvaluationContext.create( asset_key=asset_key, asset_cursor=self.cursor.asset_cursor_for_key(asset_key, self.asset_graph), - root_condition=asset_condition, + condition=asset_condition, instance_queryer=self.instance_queryer, data_time_resolver=self.data_time_resolver, daemon_context=self, evaluation_results_by_key=evaluation_results_by_key, expected_data_time_mapping=expected_data_time_mapping, ) - condition_context = context.get_root_condition_context() - evaluation = asset_condition.evaluate(condition_context) + evaluation = asset_condition.evaluate(context) asset_cursor = context.get_new_asset_cursor(evaluation=evaluation) expected_data_time = get_expected_data_time_for_asset_key( diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index 9a3765eb4147d..ee7fae4163a00 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -31,9 +31,9 @@ class AssetDaemonAssetCursor(NamedTuple): """ asset_key: AssetKey - latest_storage_id: Optional[int] - latest_evaluation_timestamp: Optional[float] - latest_evaluation: Optional["AssetConditionEvaluation"] + previous_max_storage_id: Optional[int] + previous_evaluation_timestamp: Optional[float] + previous_evaluation: Optional["AssetConditionEvaluation"] materialized_requested_or_discarded_subset: AssetSubset @@ -80,9 +80,9 @@ def asset_cursor_for_key( handled_subset = AssetSubset.empty(asset_key, partitions_def) return AssetDaemonAssetCursor( asset_key=asset_key, - latest_storage_id=self.latest_storage_id, - latest_evaluation_timestamp=self.latest_evaluation_timestamp, - latest_evaluation=self.latest_evaluation_by_asset_key.get(asset_key), + previous_max_storage_id=self.latest_storage_id, + previous_evaluation_timestamp=self.latest_evaluation_timestamp, + previous_evaluation=self.latest_evaluation_by_asset_key.get(asset_key), materialized_requested_or_discarded_subset=handled_subset, ) diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 65134b80c3fe3..7fa3da5bb6b06 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -308,10 +308,10 @@ def missed_cron_ticks( self, context: AssetConditionEvaluationContext ) -> Sequence[datetime.datetime]: """Returns the cron ticks which have been missed since the previous cursor was generated.""" - if not context.latest_evaluation_timestamp: + if not context.previous_evaluation_timestamp: previous_dt = next( reverse_cron_string_iterator( - end_timestamp=context.root_context.evaluation_time.timestamp(), + end_timestamp=context.evaluation_time.timestamp(), cron_string=self.cron_schedule, execution_timezone=self.timezone, ) @@ -319,11 +319,11 @@ def missed_cron_ticks( return [previous_dt] missed_ticks = [] for dt in cron_string_iterator( - start_timestamp=context.latest_evaluation_timestamp, + start_timestamp=context.previous_evaluation_timestamp, cron_string=self.cron_schedule, execution_timezone=self.timezone, ): - if dt > context.root_context.evaluation_time: + if dt > context.evaluation_time: break missed_ticks.append(dt) return missed_ticks @@ -345,7 +345,7 @@ def get_new_asset_partitions_to_request( return { AssetKeyPartitionKey(context.asset_key, partition_key) for partition_key in partitions_def.get_partition_keys( - current_time=context.root_context.evaluation_time, + current_time=context.evaluation_time, dynamic_partitions_store=context.instance_queryer, ) } @@ -436,7 +436,7 @@ def passes( asset_partitions_by_latest_run_id: Dict[str, Set[AssetKeyPartitionKey]] = defaultdict(set) for asset_partition in asset_partitions: - if context.root_context.will_update_asset_partition(asset_partition): + if context.will_update_asset_partition(asset_partition): will_update_asset_partitions.add(asset_partition) else: record = context.instance_queryer.get_latest_materialization_or_observation_record( @@ -471,7 +471,7 @@ def passes( self.latest_run_required_tags.items() <= { AUTO_MATERIALIZE_TAG: "true", - **context.root_context.daemon_context.auto_materialize_run_tags, + **context.daemon_context.auto_materialize_run_tags, }.items() ): return will_update_asset_partitions | updated_partitions_with_required_tags @@ -530,7 +530,7 @@ def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEv parent_asset_partitions, # do a precise check for updated parents, factoring in data versions, as long as # we're within reasonable limits on the number of partitions to check - respect_materialization_data_versions=context.root_context.daemon_context.respect_materialization_data_versions + respect_materialization_data_versions=context.daemon_context.respect_materialization_data_versions and len(parent_asset_partitions) + subset_to_evaluate.size < 100, # ignore self-dependencies when checking for updated parents, to avoid historical # rematerializations from causing a chain of materializations to be kicked off @@ -540,7 +540,7 @@ def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEv asset_partitions_by_updated_parents[parent].add(asset_partition) for parent in parent_asset_partitions: - if context.root_context.will_update_asset_partition(parent): + if context.will_update_asset_partition(parent): asset_partitions_by_will_update_parents[parent].add(asset_partition) updated_and_will_update_parents = ( @@ -612,7 +612,7 @@ def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEv with updated parents. """ missing_asset_partitions = set( - context.root_context.never_materialized_requested_or_discarded_root_subset.asset_partitions + context.never_materialized_requested_or_discarded_root_subset.asset_partitions ) # in addition to missing root asset partitions, check any asset partitions with updated # parents to see if they're missing @@ -653,9 +653,7 @@ def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEv for candidate in subset_to_evaluate.asset_partitions: outdated_ancestors = set() # find the root cause of why this asset partition's parents are outdated (if any) - for ( - parent - ) in context.root_context.get_parents_that_will_not_be_materialized_on_current_tick( + for parent in context.get_parents_that_will_not_be_materialized_on_current_tick( asset_partition=candidate ): if context.instance_queryer.have_ignorable_partition_mapping_for_outdated( @@ -700,15 +698,13 @@ def evaluate_for_asset( ) for candidate in subset_to_evaluate.asset_partitions: missing_parent_asset_keys = set() - for ( - parent - ) in context.root_context.get_parents_that_will_not_be_materialized_on_current_tick( + for parent in context.get_parents_that_will_not_be_materialized_on_current_tick( asset_partition=candidate ): # ignore non-observable sources, which will never have a materialization or observation - if context.root_context.asset_graph.is_source( + if context.asset_graph.is_source( parent.asset_key - ) and not context.root_context.asset_graph.is_observable(parent.asset_key): + ) and not context.asset_graph.is_observable(parent.asset_key): continue if not context.instance_queryer.asset_partition_has_materialization_or_observation( parent @@ -779,10 +775,10 @@ def evaluate_for_asset( context.instance_queryer.get_parent_asset_partitions_updated_after_child( candidate, parent_partitions, - context.root_context.daemon_context.respect_materialization_data_versions, + context.daemon_context.respect_materialization_data_versions, ignored_parent_keys=set(), ) - | context.root_context.parent_will_update_subset.asset_partitions + | context.parent_will_update_subset.asset_partitions ) if self.require_update_for_all_parent_partitions: @@ -871,7 +867,7 @@ def description(self) -> str: def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEvaluationResults: backfilling_subset = ( context.instance_queryer.get_active_backfill_target_asset_graph_subset() - ).get_asset_subset(context.asset_key, context.root_context.asset_graph) + ).get_asset_subset(context.asset_key, context.asset_graph) if backfilling_subset.size == 0: return context.empty_subset(), [] diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py index 0bf9e7ae5231c..8d79a567fe617 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py @@ -1,3 +1,4 @@ +import hashlib import operator from abc import ABC, abstractproperty from collections import defaultdict @@ -145,10 +146,13 @@ def _asset_condition_snapshot_from_rule_snapshot( ) -> "AssetConditionSnapshot": from .asset_condition import AssetConditionSnapshot, RuleCondition + unique_id_parts = [rule_snapshot.class_name, rule_snapshot.description] + unique_id = hashlib.md5("".join(unique_id_parts).encode()).hexdigest() + return AssetConditionSnapshot( class_name=RuleCondition.__name__, description=rule_snapshot.description, - child_hashes=[], + unique_id=unique_id, ) def _get_child_rule_evaluation( @@ -162,16 +166,10 @@ def _get_child_rule_evaluation( ) -> "AssetConditionEvaluation": from .asset_condition import ( AssetConditionEvaluation, - AssetConditionSnapshot, AssetSubsetWithMetadata, - RuleCondition, ) - condition_snapshot = AssetConditionSnapshot( - class_name=RuleCondition.__name__, - description=rule_snapshot.description, - child_hashes=[], - ) + condition_snapshot = self._asset_condition_snapshot_from_rule_snapshot(rule_snapshot) if is_partitioned: # for partitioned assets, we can't deserialize SerializedPartitionsSubset into an @@ -235,12 +233,13 @@ def _get_child_decision_type_evaluation( return None evaluation = child_evaluations[0] else: + unique_id_parts = [ + OrAssetCondition.__name__, + *[e.condition_snapshot.unique_id for e in child_evaluations], + ] + unique_id = hashlib.md5("".join(unique_id_parts).encode()).hexdigest() decision_type_snapshot = AssetConditionSnapshot( - class_name=OrAssetCondition.__name__, - description="", - child_hashes=[ - child_eval.condition_snapshot.hash for child_eval in child_evaluations - ], + class_name=OrAssetCondition.__name__, description="", unique_id=unique_id ) initial = ( AssetSubset(asset_key, DefaultPartitionsSubset(set())) @@ -261,11 +260,14 @@ def _get_child_decision_type_evaluation( return evaluation # non-materialize conditions are inverted + unique_id_parts = [ + NotAssetCondition.__name__, + evaluation.condition_snapshot.unique_id, + ] + unique_id = hashlib.md5("".join(unique_id_parts).encode()).hexdigest() return AssetConditionEvaluation( condition_snapshot=AssetConditionSnapshot( - class_name=NotAssetCondition.__name__, - description="", - child_hashes=[evaluation.condition_snapshot.hash], + class_name=NotAssetCondition.__name__, description="", unique_id=unique_id ), # for partitioned assets, we don't bother calculating the true subset, as we can't # properly deserialize the inner results @@ -329,10 +331,13 @@ def unpack( ) # the top level condition is the AND of all the sub-conditions + unique_id_parts = [ + AndAssetCondition.__name__, + *[e.condition_snapshot.unique_id for e in child_evaluations], + ] + unique_id = hashlib.md5("".join(unique_id_parts).encode()).hexdigest() condition_snapshot = AssetConditionSnapshot( - class_name=AndAssetCondition.__name__, - description="", - child_hashes=[evaluation.condition_snapshot.hash for evaluation in child_evaluations], + class_name=AndAssetCondition.__name__, description="", unique_id=unique_id ) return AssetConditionEvaluation( diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py index b394dd208feab..97c9a6fa86808 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py @@ -18,7 +18,7 @@ from dagster._utils.schedules import cron_string_iterator if TYPE_CHECKING: - from .asset_condition_evaluation_context import RootAssetConditionEvaluationContext + from .asset_condition_evaluation_context import AssetConditionEvaluationContext from .auto_materialize_rule_evaluation import RuleEvaluationResults, TextRuleEvaluationData @@ -110,7 +110,7 @@ def get_execution_period_and_evaluation_data_for_policies( def get_expected_data_time_for_asset_key( - context: "RootAssetConditionEvaluationContext", will_materialize: bool + context: "AssetConditionEvaluationContext", will_materialize: bool ) -> Optional[datetime.datetime]: """Returns the data time that you would expect this asset to have if you were to execute it on this tick. @@ -153,7 +153,7 @@ def get_expected_data_time_for_asset_key( def freshness_evaluation_results_for_asset_key( - context: "RootAssetConditionEvaluationContext", + context: "AssetConditionEvaluationContext", ) -> "RuleEvaluationResults": """Returns a set of AssetKeyPartitionKeys to materialize in order to abide by the given FreshnessPolicies. diff --git a/python_modules/dagster/dagster/_utils/test/schedule_storage.py b/python_modules/dagster/dagster/_utils/test/schedule_storage.py index b7aae23e9caf7..504d4455949ac 100644 --- a/python_modules/dagster/dagster/_utils/test/schedule_storage.py +++ b/python_modules/dagster/dagster/_utils/test/schedule_storage.py @@ -709,7 +709,7 @@ def test_auto_materialize_asset_evaluations(self, storage) -> None: if not self.can_store_auto_materialize_asset_evaluations(): pytest.skip("Storage cannot store auto materialize asset evaluations") - condition_snapshot = AssetConditionSnapshot("foo", "bar", []) + condition_snapshot = AssetConditionSnapshot("foo", "bar", "") for _ in range(2): # test idempotency storage.add_auto_materialize_asset_evaluations( @@ -794,13 +794,13 @@ def test_auto_materialize_asset_evaluations(self, storage) -> None: # add a mix of keys - one that already is using the unique index and one that is not eval_one = AssetConditionEvaluation( - condition_snapshot=AssetConditionSnapshot("foo", "bar", []), + condition_snapshot=AssetConditionSnapshot("foo", "bar", ""), true_subset=AssetSubset(asset_key=AssetKey("asset_one"), value=True), candidate_subset=None, ).with_run_ids(set()) eval_asset_three = AssetConditionEvaluation( - condition_snapshot=AssetConditionSnapshot("foo", "bar", []), + condition_snapshot=AssetConditionSnapshot("foo", "bar", ""), true_subset=AssetSubset(asset_key=AssetKey("asset_three"), value=True), candidate_subset=None, ).with_run_ids(set()) @@ -844,7 +844,7 @@ def test_auto_materialize_asset_evaluations_with_partitions(self, storage) -> No evaluation_id=10, asset_evaluations=[ AssetConditionEvaluation( - condition_snapshot=AssetConditionSnapshot("foo", "bar", []), + condition_snapshot=AssetConditionSnapshot("foo", "bar", ""), true_subset=asset_subset, candidate_subset=None, subsets_with_metadata=[asset_subset_with_metadata], @@ -870,7 +870,7 @@ def test_purge_asset_evaluations(self, storage) -> None: evaluation_id=11, asset_evaluations=[ AssetConditionEvaluation( - condition_snapshot=AssetConditionSnapshot("foo", "bar", []), + condition_snapshot=AssetConditionSnapshot("foo", "bar", ""), true_subset=AssetSubset(asset_key=AssetKey("asset_one"), value=True), candidate_subset=None, subsets_with_metadata=[], diff --git a/scripts/run-pyright.py b/scripts/run-pyright.py index 2cbd6bb7c9bf3..4d705fe2ed503 100755 --- a/scripts/run-pyright.py +++ b/scripts/run-pyright.py @@ -203,7 +203,9 @@ def get_params(args: argparse.Namespace) -> Params: elif args.diff: mode = "path" targets = ( - subprocess.check_output(["git", "diff", "--name-only", "origin/master"]) + subprocess.check_output( + ["git", "diff", "--name-only", "origin/master", "--diff-filter=d"] + ) .decode("utf-8") .splitlines() )