From 60e1c31f2367601a9156c94c1bf6e84cc5373916 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Thu, 4 Jan 2024 17:16:26 -0500 Subject: [PATCH] Rename AssetConditionCursor --- .../_core/definitions/asset_condition.py | 253 ++++++++++-------- .../asset_condition_evaluation_context.py | 83 +++--- .../_core/definitions/asset_daemon_context.py | 80 +++--- .../_core/definitions/asset_daemon_cursor.py | 142 +++++----- .../definitions/auto_materialize_rule.py | 76 +++--- .../auto_materialize_rule_evaluation.py | 3 +- .../dagster/_utils/test/schedule_storage.py | 11 + .../asset_daemon_scenario.py | 4 +- 8 files changed, 340 insertions(+), 312 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition.py b/python_modules/dagster/dagster/_core/definitions/asset_condition.py index 4a9e8f4d5da0e..82726fdb97b34 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition.py @@ -4,7 +4,7 @@ from typing import ( TYPE_CHECKING, AbstractSet, - Any, + Dict, FrozenSet, List, Mapping, @@ -12,6 +12,8 @@ Optional, Sequence, Tuple, + Type, + TypeVar, Union, ) @@ -22,12 +24,7 @@ from dagster._core.definitions.metadata import MetadataMapping, MetadataValue from dagster._core.definitions.partition import AllPartitionsSubset from dagster._serdes.serdes import ( - FieldSerializer, PackableValue, - UnpackContext, - WhitelistMap, - pack_value, - unpack_value, whitelist_for_serdes, ) @@ -38,6 +35,9 @@ from .auto_materialize_rule import AutoMaterializeRule +T = TypeVar("T") + + @whitelist_for_serdes class HistoricalAllPartitionsSubsetSentinel(NamedTuple): """Serializable indicator that this value was an AllPartitionsSubset at serialization time, but @@ -77,58 +77,100 @@ def get_serializable_candidate_subset( return candidate_subset -class CandidateSubsetSerializer(FieldSerializer): - def pack( - self, - candidate_subset: AssetSubset, - whitelist_map: WhitelistMap, - descent_path: str, - ) -> Optional[Mapping[str, Any]]: - # On all ticks, the root condition starts with an AllPartitionsSubset as the candidate - # subset. This would be wasteful to calculate and serialize in its entirety, so we instead - # store this as `None` and reconstruct it as needed. - # This does mean that if new partitions are added between serialization time and read time, - # the candidate subset will contain those new partitions. - return pack_value( - get_serializable_candidate_subset(candidate_subset), whitelist_map, descent_path +class AssetConditionResult(NamedTuple): + condition: "AssetCondition" + start_timestamp: float + end_timestamp: float + + true_subset: AssetSubset + candidate_subset: AssetSubset + subsets_with_metadata: Sequence[AssetSubsetWithMetadata] + extra_state: PackableValue + + child_results: Sequence["AssetConditionResult"] + + @staticmethod + def create_from_children( + context: "AssetConditionEvaluationContext", + true_subset: AssetSubset, + child_results: Sequence["AssetConditionResult"], + ) -> "AssetConditionResult": + """Returns a new AssetConditionEvaluation from the given child results.""" + return AssetConditionResult( + condition=context.condition, + start_timestamp=context.start_timestamp, + end_timestamp=pendulum.now("UTC").timestamp(), + true_subset=true_subset, + candidate_subset=context.candidate_subset, + subsets_with_metadata=[], + child_results=child_results, + extra_state=None, ) - def unpack( - self, - serialized_candidate_subset: Optional[Mapping[str, Any]], - whitelist_map: WhitelistMap, - context: UnpackContext, - ) -> Union[AssetSubset, HistoricalAllPartitionsSubsetSentinel]: - return unpack_value( - serialized_candidate_subset, - (AssetSubset, HistoricalAllPartitionsSubsetSentinel), - whitelist_map, - context, + @staticmethod + def create( + context: "AssetConditionEvaluationContext", + true_subset: AssetSubset, + subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = [], + extra_state: PackableValue = None, + ) -> "AssetConditionResult": + """Returns a new AssetConditionEvaluation from the given parameters.""" + return AssetConditionResult( + condition=context.condition, + start_timestamp=context.start_timestamp, + end_timestamp=pendulum.now("UTC").timestamp(), + true_subset=true_subset, + candidate_subset=context.candidate_subset, + subsets_with_metadata=subsets_with_metadata, + child_results=[], + extra_state=extra_state, ) -@whitelist_for_serdes(field_serializers={"candidate_subset": CandidateSubsetSerializer}) +@whitelist_for_serdes class AssetConditionEvaluation(NamedTuple): - """Internal representation of the results of evaluating a node in the evaluation tree.""" + """Serializable representation of the results of evaluating a node in the evaluation tree.""" condition_snapshot: AssetConditionSnapshot - true_subset: AssetSubset - candidate_subset: Union[AssetSubset, HistoricalAllPartitionsSubsetSentinel] start_timestamp: Optional[float] end_timestamp: Optional[float] - subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = [] - child_evaluations: Sequence["AssetConditionEvaluation"] = [] + + true_subset: AssetSubset + candidate_subset: Union[AssetSubset, HistoricalAllPartitionsSubsetSentinel] + subsets_with_metadata: Sequence[AssetSubsetWithMetadata] + + child_evaluations: Sequence["AssetConditionEvaluation"] @property def asset_key(self) -> AssetKey: return self.true_subset.asset_key + @staticmethod + def from_result(result: AssetConditionResult) -> "AssetConditionEvaluation": + return AssetConditionEvaluation( + condition_snapshot=result.condition.snapshot, + start_timestamp=result.start_timestamp, + end_timestamp=result.end_timestamp, + true_subset=result.true_subset, + candidate_subset=get_serializable_candidate_subset(result.candidate_subset), + subsets_with_metadata=result.subsets_with_metadata, + child_evaluations=[ + AssetConditionEvaluation.from_result(child_result) + for child_result in result.child_results + ], + ) + def equivalent_to_stored_evaluation(self, other: Optional["AssetConditionEvaluation"]) -> bool: - """Returns if all fields other than `run_ids` are equal.""" + """Returns if this evaluation is functionally equivalent to the given stored evaluation. + This is used to determine if it is necessary to store this new evaluation in the database. + """ return ( other is not None and self.condition_snapshot == other.condition_snapshot - and self.true_subset == other.true_subset + # if any partitions are requested, then the state of the world must have meaninfully + # changed since the previous evaluation + and self.true_subset.size == 0 + and other.true_subset.size == 0 # the candidate subset gets modified during serialization and get_serializable_candidate_subset(self.candidate_subset) == get_serializable_candidate_subset(other.candidate_subset) @@ -175,65 +217,66 @@ def with_run_ids(self, run_ids: AbstractSet[str]) -> "AssetConditionEvaluationWi return AssetConditionEvaluationWithRunIds(evaluation=self, run_ids=frozenset(run_ids)) -class AssetConditionEvaluationResult(NamedTuple): - """Return value for the evaluate method of an AssetCondition.""" +@whitelist_for_serdes +class AssetConditionEvaluationState(NamedTuple): + """Incremental state calculated during the evaluation of an AssetCondition. This may be used + on the subsequent evaluation to make the computation more efficient. + + Attributes: + evaluation: The computed AssetConditionEvaluation. + evaluation_timestamp: The evaluation_timestamp at which the evaluation was performed. + max_storage_id: The maximum storage ID over all events used in this computation. + extra_state_by_unique_id: A mapping from the unique ID of each condition in the evaluation + tree to the extra state that was calculated for it, if any. + """ - condition: "AssetCondition" evaluation: AssetConditionEvaluation - extra_values_by_unique_id: Mapping[str, PackableValue] + evaluation_timestamp: Optional[float] + + max_storage_id: Optional[int] + extra_state_by_unique_id: Mapping[str, PackableValue] + + @property + def asset_key(self) -> AssetKey: + return self.evaluation.asset_key @property def true_subset(self) -> AssetSubset: return self.evaluation.true_subset - @staticmethod - def create_from_children( - context: "AssetConditionEvaluationContext", - true_subset: AssetSubset, - child_results: Sequence["AssetConditionEvaluationResult"], - ) -> "AssetConditionEvaluationResult": - """Returns a new AssetConditionEvaluationResult from the given child results.""" - return AssetConditionEvaluationResult( - condition=context.condition, - evaluation=AssetConditionEvaluation( - context.condition.snapshot, - true_subset=true_subset, - candidate_subset=context.candidate_subset, - start_timestamp=context.start_timestamp, - end_timestamp=pendulum.now("UTC").timestamp(), - subsets_with_metadata=[], - child_evaluations=[child_result.evaluation for child_result in child_results], - ), - extra_values_by_unique_id=dict( - item - for child_result in child_results - for item in child_result.extra_values_by_unique_id.items() - ), - ) - @staticmethod def create( - context: "AssetConditionEvaluationContext", - true_subset: AssetSubset, - subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = [], - extra_value: PackableValue = None, - ) -> "AssetConditionEvaluationResult": - """Returns a new AssetConditionEvaluationResult from the given parameters.""" - return AssetConditionEvaluationResult( - condition=context.condition, - evaluation=AssetConditionEvaluation( - context.condition.snapshot, - true_subset=true_subset, - start_timestamp=context.start_timestamp, - end_timestamp=pendulum.now("UTC").timestamp(), - candidate_subset=context.candidate_subset, - subsets_with_metadata=subsets_with_metadata, - ), - extra_values_by_unique_id={context.condition.unique_id: extra_value} - if extra_value - else {}, + context: "AssetConditionEvaluationContext", root_result: AssetConditionResult + ) -> "AssetConditionEvaluationState": + """Convenience constructor to generate an AssetConditionEvaluationState from the root result + and the context in which it was evaluated. + """ + + # flatten the extra state into a single dict + def _flatten_extra_state(r: AssetConditionResult) -> Mapping[str, PackableValue]: + extra_state: Dict[str, PackableValue] = ( + {r.condition.unique_id: r.extra_state} if r.extra_state else {} + ) + for child in r.child_results: + extra_state.update(_flatten_extra_state(child)) + return extra_state + + return AssetConditionEvaluationState( + evaluation=AssetConditionEvaluation.from_result(root_result), + evaluation_timestamp=context.evaluation_time.timestamp(), + max_storage_id=context.new_max_storage_id, + extra_state_by_unique_id=_flatten_extra_state(root_result), ) + def get_extra_state(self, condition: "AssetCondition", as_type: Type[T]) -> Optional[T]: + """Returns the value from the extras dict for the given condition, if it exists and is of + the expected type. Otherwise, returns None. + """ + extra_state = self.extra_state_by_unique_id.get(condition.unique_id) + if isinstance(extra_state, as_type): + return extra_state + return None + @whitelist_for_serdes class AssetConditionEvaluationWithRunIds(NamedTuple): @@ -268,9 +311,7 @@ def unique_id(self) -> str: return hashlib.md5("".join(parts).encode()).hexdigest() @abstractmethod - def evaluate( - self, context: "AssetConditionEvaluationContext" - ) -> AssetConditionEvaluationResult: + def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult: raise NotImplementedError() @abstractproperty @@ -341,9 +382,7 @@ def unique_id(self) -> str: def description(self) -> str: return self.rule.description - def evaluate( - self, context: "AssetConditionEvaluationContext" - ) -> AssetConditionEvaluationResult: + def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult: context.root_context.daemon_context._verbose_log_fn( # noqa f"Evaluating rule: {self.rule.to_snapshot()}" ) @@ -365,19 +404,15 @@ class AndAssetCondition( def description(self) -> str: return "All of" - def evaluate( - self, context: "AssetConditionEvaluationContext" - ) -> AssetConditionEvaluationResult: - child_results: List[AssetConditionEvaluationResult] = [] + def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult: + child_results: List[AssetConditionResult] = [] true_subset = context.candidate_subset for child in self.children: child_context = context.for_child(condition=child, candidate_subset=true_subset) child_result = child.evaluate(child_context) child_results.append(child_result) true_subset &= child_result.true_subset - return AssetConditionEvaluationResult.create_from_children( - context, true_subset, child_results - ) + return AssetConditionResult.create_from_children(context, true_subset, child_results) class OrAssetCondition( @@ -390,10 +425,8 @@ class OrAssetCondition( def description(self) -> str: return "Any of" - def evaluate( - self, context: "AssetConditionEvaluationContext" - ) -> AssetConditionEvaluationResult: - child_results: List[AssetConditionEvaluationResult] = [] + def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult: + child_results: List[AssetConditionResult] = [] true_subset = context.empty_subset() for child in self.children: child_context = context.for_child( @@ -402,9 +435,7 @@ def evaluate( child_result = child.evaluate(child_context) child_results.append(child_result) true_subset |= child_result.true_subset - return AssetConditionEvaluationResult.create_from_children( - context, true_subset, child_results - ) + return AssetConditionResult.create_from_children(context, true_subset, child_results) class NotAssetCondition( @@ -425,15 +456,11 @@ def description(self) -> str: def child(self) -> AssetCondition: return self.children[0] - def evaluate( - self, context: "AssetConditionEvaluationContext" - ) -> AssetConditionEvaluationResult: + def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult: child_context = context.for_child( condition=self.child, candidate_subset=context.candidate_subset ) child_result = self.child.evaluate(child_context) true_subset = context.candidate_subset - child_result.true_subset - return AssetConditionEvaluationResult.create_from_children( - context, true_subset, [child_result] - ) + return AssetConditionResult.create_from_children(context, true_subset, [child_result]) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py index 5296f8693f15d..a5a1d78e5f928 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py @@ -26,12 +26,16 @@ from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping from dagster._utils.caching_instance_queryer import CachingInstanceQueryer -from .asset_daemon_cursor import AssetConditionCursor from .asset_graph import AssetGraph from .asset_subset import AssetSubset if TYPE_CHECKING: - from .asset_condition import AssetCondition, AssetConditionEvaluation, AssetSubsetWithMetadata + from .asset_condition import ( + AssetCondition, + AssetConditionEvaluation, + AssetConditionEvaluationState, + AssetSubsetWithMetadata, + ) from .asset_daemon_context import AssetDaemonContext @@ -54,15 +58,15 @@ class AssetConditionEvaluationContext: asset_key: AssetKey condition: "AssetCondition" - cursor: AssetConditionCursor - previous_condition_evaluation: Optional["AssetConditionEvaluation"] + previous_evaluation_state: Optional["AssetConditionEvaluationState"] + previous_evaluation: Optional["AssetConditionEvaluation"] candidate_subset: AssetSubset instance_queryer: CachingInstanceQueryer data_time_resolver: CachingDataTimeResolver daemon_context: "AssetDaemonContext" - evaluation_results_by_key: Mapping[AssetKey, "AssetConditionEvaluation"] + evaluation_state_by_key: Mapping[AssetKey, "AssetConditionEvaluationState"] expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]] start_timestamp: float @@ -72,11 +76,11 @@ class AssetConditionEvaluationContext: def create( asset_key: AssetKey, condition: "AssetCondition", - cursor: AssetConditionCursor, + previous_evaluation_state: Optional["AssetConditionEvaluationState"], instance_queryer: CachingInstanceQueryer, data_time_resolver: CachingDataTimeResolver, daemon_context: "AssetDaemonContext", - evaluation_results_by_key: Mapping[AssetKey, "AssetConditionEvaluation"], + evaluation_state_by_key: Mapping[AssetKey, "AssetConditionEvaluationState"], expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]], ) -> "AssetConditionEvaluationContext": partitions_def = instance_queryer.asset_graph.get_partitions_def(asset_key) @@ -84,8 +88,10 @@ def create( return AssetConditionEvaluationContext( asset_key=asset_key, condition=condition, - cursor=cursor, - previous_condition_evaluation=cursor.previous_evaluation, + previous_evaluation_state=previous_evaluation_state, + previous_evaluation=previous_evaluation_state.evaluation + if previous_evaluation_state + else None, candidate_subset=AssetSubset.all( asset_key, partitions_def, @@ -95,7 +101,7 @@ def create( data_time_resolver=data_time_resolver, instance_queryer=instance_queryer, daemon_context=daemon_context, - evaluation_results_by_key=evaluation_results_by_key, + evaluation_state_by_key=evaluation_state_by_key, expected_data_time_mapping=expected_data_time_mapping, start_timestamp=pendulum.now("UTC").timestamp(), ) @@ -106,8 +112,8 @@ def for_child( return dataclasses.replace( self, condition=condition, - previous_condition_evaluation=self.previous_condition_evaluation.for_child(condition) - if self.previous_condition_evaluation + previous_evaluation=self.previous_evaluation.for_child(condition) + if self.previous_evaluation else None, candidate_subset=candidate_subset, root_ref=self.root_context, @@ -134,23 +140,31 @@ def evaluation_time(self) -> datetime.datetime: @property def previous_max_storage_id(self) -> Optional[int]: - return self.cursor.previous_max_storage_id + return ( + self.previous_evaluation_state.max_storage_id + if self.previous_evaluation_state + else None + ) @property def previous_evaluation_timestamp(self) -> Optional[float]: - return self.cursor.previous_evaluation_timestamp + return ( + self.previous_evaluation_state.evaluation_timestamp + if self.previous_evaluation_state + else None + ) @property def previous_true_subset(self) -> AssetSubset: - if self.previous_condition_evaluation is None: + if self.previous_evaluation is None: return self.empty_subset() - return self.previous_condition_evaluation.true_subset + return self.previous_evaluation.true_subset @property def previous_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetadata"]: - if self.previous_condition_evaluation is None: + if self.previous_evaluation is None: return [] - return self.previous_condition_evaluation.subsets_with_metadata + return self.previous_evaluation.subsets_with_metadata @functools.cached_property @root_property @@ -162,10 +176,10 @@ def parent_will_update_subset(self) -> AssetSubset: for parent_key in self.asset_graph.get_parents(self.asset_key): if not self.materializable_in_same_run(self.asset_key, parent_key): continue - parent_result = self.evaluation_results_by_key.get(parent_key) - if not parent_result: + parent_info = self.evaluation_state_by_key.get(parent_key) + if not parent_info: continue - parent_subset = parent_result.true_subset + parent_subset = parent_info.true_subset subset |= parent_subset._replace(asset_key=self.asset_key) return subset @@ -179,7 +193,7 @@ def materialized_since_previous_tick_subset(self) -> AssetSubset: self.instance_queryer.get_asset_partitions_updated_after_cursor( self.asset_key, asset_partitions=None, - after_cursor=self.cursor.previous_max_storage_id, + after_cursor=self.previous_max_storage_id, respect_materialization_data_versions=False, ), ) @@ -188,11 +202,15 @@ def materialized_since_previous_tick_subset(self) -> AssetSubset: @root_property def previous_tick_requested_subset(self) -> AssetSubset: """The set of asset partitions that were requested (or discarded) on the previous tick.""" - previous_evaluation = self.cursor.previous_evaluation - if previous_evaluation is None: + if ( + self.previous_evaluation_state is None + or self.previous_evaluation_state.evaluation is None + ): return self.empty_subset() - return previous_evaluation.get_requested_or_discarded_subset(self.condition) + return self.previous_evaluation_state.evaluation.get_requested_or_discarded_subset( + self.condition + ) @property def materialized_requested_or_discarded_since_previous_tick_subset(self) -> AssetSubset: @@ -211,7 +229,7 @@ def _parent_has_updated_subset_and_new_latest_storage_id( asset_partitions, cursor, ) = self.root_context.instance_queryer.asset_partitions_with_newly_updated_parents_and_new_cursor( - latest_storage_id=self.cursor.previous_max_storage_id, + latest_storage_id=self.previous_max_storage_id, child_asset_key=self.root_context.asset_key, map_old_time_partitions=False, ) @@ -247,17 +265,16 @@ def candidates_not_evaluated_on_previous_tick_subset(self) -> AssetSubset: """ from .asset_condition import HistoricalAllPartitionsSubsetSentinel - if not self.previous_condition_evaluation: + if not self.previous_evaluation: return self.candidate_subset # when the candidate_subset is HistoricalAllPartitionsSubsetSentinel, this indicates that the # entire asset was evaluated for this condition on the previous tick, and so no candidates # were *not* evaluated on the previous tick elif isinstance( - self.previous_condition_evaluation.candidate_subset, - HistoricalAllPartitionsSubsetSentinel, + self.previous_evaluation.candidate_subset, HistoricalAllPartitionsSubsetSentinel ): return self.empty_subset() - return self.candidate_subset - self.previous_condition_evaluation.candidate_subset + return self.candidate_subset - self.previous_evaluation.candidate_subset def materializable_in_same_run(self, child_key: AssetKey, parent_key: AssetKey) -> bool: """Returns whether a child asset can be materialized in the same run as a parent asset.""" @@ -304,10 +321,10 @@ def get_parents_that_will_not_be_materialized_on_current_tick( } def will_update_asset_partition(self, asset_partition: AssetKeyPartitionKey) -> bool: - parent_evaluation = self.evaluation_results_by_key.get(asset_partition.asset_key) - if not parent_evaluation: + parent_evaluation_state = self.evaluation_state_by_key.get(asset_partition.asset_key) + if not parent_evaluation_state: return False - return asset_partition in parent_evaluation.true_subset + return asset_partition in parent_evaluation_state.true_subset def add_evaluation_data_from_previous_tick( self, diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index c95480ddc42ec..6ec62ecb6a880 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -34,11 +34,11 @@ from ... import PartitionKeyRange from ..storage.tags import ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG -from .asset_condition import AssetConditionEvaluation +from .asset_condition import AssetConditionEvaluation, AssetConditionEvaluationState from .asset_condition_evaluation_context import ( AssetConditionEvaluationContext, ) -from .asset_daemon_cursor import AssetConditionCursor, AssetDaemonCursor +from .asset_daemon_cursor import AssetDaemonCursor from .asset_graph import AssetGraph from .auto_materialize_rule import AutoMaterializeRule from .backfill_policy import BackfillPolicy, BackfillPolicyType @@ -176,9 +176,9 @@ def prefetch(self) -> None: def evaluate_asset( self, asset_key: AssetKey, - evaluation_results_by_key: Mapping[AssetKey, AssetConditionEvaluation], + evaluation_state_by_key: Mapping[AssetKey, AssetConditionEvaluationState], expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]], - ) -> Tuple[AssetConditionEvaluation, AssetConditionCursor, Optional[datetime.datetime]]: + ) -> Tuple[AssetConditionEvaluationState, Optional[datetime.datetime]]: """Evaluates the auto materialize policy of a given asset key. Params: @@ -196,48 +196,34 @@ def evaluate_asset( self.asset_graph.auto_materialize_policies_by_key.get(asset_key) ).to_asset_condition() - asset_cursor = self.cursor.get_asset_cursor(asset_key) + asset_cursor = self.cursor.get_previous_evaluation_state(asset_key) context = AssetConditionEvaluationContext.create( asset_key=asset_key, - cursor=asset_cursor, + previous_evaluation_state=asset_cursor, condition=asset_condition, instance_queryer=self.instance_queryer, data_time_resolver=self.data_time_resolver, daemon_context=self, - evaluation_results_by_key=evaluation_results_by_key, + evaluation_state_by_key=evaluation_state_by_key, expected_data_time_mapping=expected_data_time_mapping, ) - evaluation_result = asset_condition.evaluate(context) - - new_asset_cursor = AssetConditionCursor( - asset_key=asset_key, - previous_max_storage_id=context.new_max_storage_id, - previous_evaluation_timestamp=context.evaluation_time.timestamp(), - previous_evaluation=evaluation_result.evaluation, - extra_values_by_unique_id=evaluation_result.extra_values_by_unique_id, - ) + result = asset_condition.evaluate(context) expected_data_time = get_expected_data_time_for_asset_key( - context, will_materialize=evaluation_result.true_subset.size > 0 + context, will_materialize=result.true_subset.size > 0 ) - return evaluation_result.evaluation, new_asset_cursor, expected_data_time + return AssetConditionEvaluationState.create(context, result), expected_data_time def get_asset_condition_evaluations( self, - ) -> Tuple[ - Sequence[AssetConditionEvaluation], - Sequence[AssetConditionCursor], - AbstractSet[AssetKeyPartitionKey], - ]: + ) -> Tuple[Sequence[AssetConditionEvaluationState], AbstractSet[AssetKeyPartitionKey]]: """Returns a mapping from asset key to the AutoMaterializeAssetEvaluation for that key, a sequence of new per-asset cursors, and the set of all asset partitions that should be materialized or discarded this tick. """ - asset_cursors: List[AssetConditionCursor] = [] - - evaluation_results_by_key: Dict[AssetKey, AssetConditionEvaluation] = {} + evaluation_state_by_key: Dict[AssetKey, AssetConditionEvaluationState] = {} expected_data_time_mapping: Dict[AssetKey, Optional[datetime.datetime]] = defaultdict() to_request: Set[AssetKeyPartitionKey] = set() @@ -256,14 +242,14 @@ def get_asset_condition_evaluations( f" {asset_key.to_user_string()} ({num_checked_assets}/{num_auto_materialize_asset_keys})" ) - (evaluation, asset_cursor_for_asset, expected_data_time) = self.evaluate_asset( - asset_key, evaluation_results_by_key, expected_data_time_mapping + (evaluation_state, expected_data_time) = self.evaluate_asset( + asset_key, evaluation_state_by_key, expected_data_time_mapping ) - num_requested = evaluation.true_subset.size + num_requested = evaluation_state.true_subset.size log_fn = self._logger.info if num_requested > 0 else self._logger.debug - to_request_asset_partitions = evaluation.true_subset.asset_partitions + to_request_asset_partitions = evaluation_state.true_subset.asset_partitions to_request_str = ",".join( [(ap.partition_key or "No partition") for ap in to_request_asset_partitions] ) @@ -274,9 +260,8 @@ def get_asset_condition_evaluations( f" requested ({to_request_str}) ({format(time.time()-start_time, '.3f')} seconds)" ) - evaluation_results_by_key[asset_key] = evaluation + evaluation_state_by_key[asset_key] = evaluation_state expected_data_time_mapping[asset_key] = expected_data_time - asset_cursors.append(asset_cursor_for_asset) # if we need to materialize any partitions of a non-subsettable multi-asset, we need to # materialize all of them @@ -285,18 +270,21 @@ def get_asset_condition_evaluations( expected_data_time_mapping[neighbor_key] = expected_data_time # make sure that the true_subset of the neighbor is accurate - if neighbor_key in evaluation_results_by_key: - neighbor_evaluation = evaluation_results_by_key[neighbor_key] - evaluation_results_by_key[neighbor_key] = neighbor_evaluation._replace( - true_subset=neighbor_evaluation.true_subset - | evaluation.true_subset._replace(asset_key=neighbor_key) + if neighbor_key in evaluation_state_by_key: + neighbor_evaluation = evaluation_state_by_key[neighbor_key] + evaluation_state_by_key[neighbor_key] = neighbor_evaluation._replace( + evaluation=neighbor_evaluation.evaluation._replace( + true_subset=neighbor_evaluation.true_subset._replace( + asset_key=neighbor_key + ) + ) ) to_request |= { ap._replace(asset_key=neighbor_key) - for ap in evaluation.true_subset.asset_partitions + for ap in evaluation_state.true_subset.asset_partitions } - return (list(evaluation_results_by_key.values()), asset_cursors, to_request) + return (list(evaluation_state_by_key.values()), to_request) def evaluate( self, @@ -314,7 +302,7 @@ def evaluate( else [] ) - evaluations, asset_cursors, to_request = self.get_asset_condition_evaluations() + evaluation_state, to_request = self.get_asset_condition_evaluations() run_requests = [ *build_run_requests( @@ -329,7 +317,7 @@ def evaluate( run_requests, self.cursor.with_updates( evaluation_id=self._evaluation_id, - asset_cursors=asset_cursors, + evaluation_state=evaluation_state, newly_observe_requested_asset_keys=[ asset_key for run_request in auto_observe_run_requests @@ -337,12 +325,12 @@ def evaluate( ], evaluation_timestamp=self.instance_queryer.evaluation_time.timestamp(), ), - # only record evaluations where something changed + # only record evaluation results where something changed [ - evaluation - for evaluation in evaluations - if not evaluation.equivalent_to_stored_evaluation( - self.cursor.get_previous_evaluation(evaluation.asset_key) + es.evaluation + for es in evaluation_state + if not es.evaluation.equivalent_to_stored_evaluation( + self.cursor.get_previous_evaluation(es.asset_key) ) ], ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index 29301525ee795..163f0714e0a45 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -6,14 +6,11 @@ NamedTuple, Optional, Sequence, - Type, - TypeVar, ) from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.asset_subset import AssetSubset from dagster._core.definitions.events import AssetKey -from dagster._core.definitions.partition import PartitionsDefinition from dagster._serdes.serdes import ( FieldSerializer, JsonSerializableValue, @@ -29,9 +26,11 @@ from .asset_graph import AssetGraph if TYPE_CHECKING: - from .asset_condition import AssetCondition, AssetConditionEvaluation, AssetConditionSnapshot - -T = TypeVar("T") + from .asset_condition import ( + AssetConditionEvaluation, + AssetConditionEvaluationState, + AssetConditionSnapshot, + ) @whitelist_for_serdes @@ -44,46 +43,6 @@ class AssetConditionCursorExtras(NamedTuple): extras: Mapping[str, PackableValue] -@whitelist_for_serdes -class AssetConditionCursor(NamedTuple): - """Represents the evaluated state of an AssetConditionCursor at a certain point in time. This - information can be used to make future evaluations more efficient. - """ - - asset_key: AssetKey - previous_evaluation: Optional["AssetConditionEvaluation"] - previous_max_storage_id: Optional[int] - previous_evaluation_timestamp: Optional[float] - - extra_values_by_unique_id: Mapping[str, PackableValue] - - @staticmethod - def empty(asset_key: AssetKey) -> "AssetConditionCursor": - return AssetConditionCursor( - asset_key=asset_key, - previous_evaluation=None, - previous_max_storage_id=None, - previous_evaluation_timestamp=None, - extra_values_by_unique_id={}, - ) - - def get_extras_value(self, condition: "AssetCondition", as_type: Type[T]) -> Optional[T]: - """Returns the value from the extras dict for the given condition, if it exists and is of - the expected type. Otherwise, returns None. - """ - extras_value = self.extra_values_by_unique_id.get(condition.unique_id) - if isinstance(extras_value, as_type): - return extras_value - return None - - def get_previous_requested_or_discarded_subset( - self, condition: "AssetCondition", partitions_def: Optional[PartitionsDefinition] - ) -> AssetSubset: - if not self.previous_evaluation: - return AssetSubset.empty(self.asset_key, partitions_def) - return self.previous_evaluation.get_requested_or_discarded_subset(condition) - - class ObserveRequestTimestampSerializer(FieldSerializer): def pack( self, @@ -112,12 +71,12 @@ class AssetDaemonCursor(NamedTuple): Attributes: evaluation_id (int): The ID of the evaluation that produced this cursor. - asset_cursors (Sequence[AssetConditionCursor]): The state of each asset that the daemon - is responsible for handling. + previous_evaluation_state (Sequence[AssetConditionEvaluationInfo]): The evaluation info + recorded for each asset on the previous tick. """ evaluation_id: int - asset_cursors: Sequence[AssetConditionCursor] + previous_evaluation_state: Sequence["AssetConditionEvaluationState"] last_observe_request_timestamp_by_asset_key: Mapping[AssetKey, float] @@ -125,37 +84,44 @@ class AssetDaemonCursor(NamedTuple): def empty(evaluation_id: int = 0) -> "AssetDaemonCursor": return AssetDaemonCursor( evaluation_id=evaluation_id, - asset_cursors=[], + previous_evaluation_state=[], last_observe_request_timestamp_by_asset_key={}, ) @property @functools.lru_cache(maxsize=1) - def asset_cursors_by_key(self) -> Mapping[AssetKey, AssetConditionCursor]: - """Efficient lookup of asset cursors by asset key.""" - return {cursor.asset_key: cursor for cursor in self.asset_cursors} + def previous_evaluation_state_by_key( + self, + ) -> Mapping[AssetKey, "AssetConditionEvaluationState"]: + """Efficient lookup of previous evaluation info by asset key.""" + return { + evaluation_state.asset_key: evaluation_state + for evaluation_state in self.previous_evaluation_state + } - def get_asset_cursor(self, asset_key: AssetKey) -> AssetConditionCursor: + def get_previous_evaluation_state( + self, asset_key: AssetKey + ) -> Optional["AssetConditionEvaluationState"]: """Returns the AssetConditionCursor associated with the given asset key. If no stored cursor exists, returns an empty cursor. """ - return self.asset_cursors_by_key.get(asset_key) or AssetConditionCursor.empty(asset_key) + return self.previous_evaluation_state_by_key.get(asset_key) def get_previous_evaluation(self, asset_key: AssetKey) -> Optional["AssetConditionEvaluation"]: """Returns the previous AssetConditionEvaluation for a given asset key, if it exists.""" - cursor = self.get_asset_cursor(asset_key) - return cursor.previous_evaluation if cursor else None + previous_evaluation_state = self.get_previous_evaluation_state(asset_key) + return previous_evaluation_state.evaluation if previous_evaluation_state else None def with_updates( self, evaluation_id: int, evaluation_timestamp: float, newly_observe_requested_asset_keys: Sequence[AssetKey], - asset_cursors: Sequence[AssetConditionCursor], + evaluation_state: Sequence["AssetConditionEvaluationState"], ) -> "AssetDaemonCursor": return self._replace( evaluation_id=evaluation_id, - asset_cursors=asset_cursors, + previous_evaluation_state=evaluation_state, last_observe_request_timestamp_by_asset_key={ **self.last_observe_request_timestamp_by_asset_key, **{ @@ -172,24 +138,25 @@ def __hash__(self) -> int: # BACKCOMPAT -def get_backcompat_asset_condition_cursor( - asset_key: AssetKey, +def get_backcompat_asset_condition_evaluation_state( + latest_evaluation: "AssetConditionEvaluation", latest_storage_id: Optional[int], latest_timestamp: Optional[float], - latest_evaluation: Optional["AssetConditionEvaluation"], handled_root_subset: Optional[AssetSubset], -) -> AssetConditionCursor: +) -> "AssetConditionEvaluationState": """Generates an AssetDaemonCursor from information available on the old cursor format.""" - from dagster._core.definitions.asset_condition import RuleCondition + from dagster._core.definitions.asset_condition import ( + AssetConditionEvaluationState, + RuleCondition, + ) from dagster._core.definitions.auto_materialize_rule import MaterializeOnMissingRule - return AssetConditionCursor( - asset_key=asset_key, - previous_evaluation=latest_evaluation, - previous_evaluation_timestamp=latest_timestamp, - previous_max_storage_id=latest_storage_id, + return AssetConditionEvaluationState( + evaluation=latest_evaluation, + evaluation_timestamp=latest_timestamp, + max_storage_id=latest_storage_id, # the only information we need to preserve from the previous cursor is the handled subset - extra_values_by_unique_id={ + extra_state_by_unique_id={ RuleCondition(MaterializeOnMissingRule()).unique_id: handled_root_subset, } if handled_root_subset and handled_root_subset.size > 0 @@ -203,6 +170,7 @@ def backcompat_deserialize_asset_daemon_cursor_str( """This serves as a backcompat layer for deserializing the old cursor format. Some information is impossible to fully recover, this will recover enough to continue operating as normal. """ + from .asset_condition import AssetConditionEvaluation, AssetConditionSnapshot from .auto_materialize_rule_evaluation import ( deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids, ) @@ -256,29 +224,39 @@ def backcompat_deserialize_asset_daemon_cursor_str( latest_evaluation_by_asset_key[key] = evaluation - asset_cursors = [] + previous_evaluation_state = [] cursor_keys = ( asset_graph.auto_materialize_policies_by_key.keys() if asset_graph else latest_evaluation_by_asset_key.keys() ) for asset_key in cursor_keys: - latest_evaluation = latest_evaluation_by_asset_key.get(asset_key) - asset_cursors.append( - get_backcompat_asset_condition_cursor( - asset_key, - data.get("latest_storage_id"), - data.get("latest_evaluation_timestamp"), - latest_evaluation, - handled_root_asset_graph_subset.get_asset_subset(asset_key, asset_graph) - if asset_graph - else None, + latest_evaluation_result = latest_evaluation_by_asset_key.get(asset_key) + # create a placeholder evaluation result if we don't have one + if not latest_evaluation_result: + partitions_def = asset_graph.get_partitions_def(asset_key) if asset_graph else None + latest_evaluation_result = AssetConditionEvaluation( + condition_snapshot=AssetConditionSnapshot("", "", ""), + true_subset=AssetSubset.empty(asset_key, partitions_def), + candidate_subset=AssetSubset.empty(asset_key, partitions_def), + start_timestamp=None, + end_timestamp=None, + subsets_with_metadata=[], + child_evaluations=[], ) + backcompat_evaluation_state = get_backcompat_asset_condition_evaluation_state( + latest_evaluation_result, + data.get("latest_storage_id"), + data.get("latest_evaluation_timestamp"), + handled_root_asset_graph_subset.get_asset_subset(asset_key, asset_graph) + if asset_graph + else None, ) + previous_evaluation_state.append(backcompat_evaluation_state) return AssetDaemonCursor( evaluation_id=default_evaluation_id, - asset_cursors=asset_cursors, + previous_evaluation_state=previous_evaluation_state, last_observe_request_timestamp_by_asset_key=last_observe_request_timestamp_by_asset_key, ) diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 1167fc69abdf8..50d62c3dc42b3 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -48,7 +48,7 @@ from .asset_graph import sort_key_for_asset_partition if TYPE_CHECKING: - from dagster._core.definitions.asset_condition import AssetConditionEvaluationResult + from dagster._core.definitions.asset_condition import AssetConditionResult class AutoMaterializeRule(ABC): @@ -78,7 +78,7 @@ def description(self) -> str: @abstractmethod def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": + ) -> "AssetConditionResult": """The core evaluation function for the rule. This function takes in a context object and returns a mapping from evaluated rules to the set of asset partitions that the rule applies to. @@ -239,13 +239,13 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionResult": + from .asset_condition import AssetConditionResult true_subset, subsets_with_metadata = freshness_evaluation_results_for_asset_key( context.root_context ) - return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) + return AssetConditionResult.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -268,7 +268,7 @@ def missed_cron_ticks( self, context: AssetConditionEvaluationContext ) -> Sequence[datetime.datetime]: """Returns the cron ticks which have been missed since the previous cursor was generated.""" - if not context.cursor.previous_evaluation_timestamp: + if not context.previous_evaluation_timestamp: previous_dt = next( reverse_cron_string_iterator( end_timestamp=context.evaluation_time.timestamp(), @@ -279,7 +279,7 @@ def missed_cron_ticks( return [previous_dt] missed_ticks = [] for dt in cron_string_iterator( - start_timestamp=context.cursor.previous_evaluation_timestamp, + start_timestamp=context.previous_evaluation_timestamp, cron_string=self.cron_schedule, execution_timezone=self.timezone, ): @@ -353,8 +353,8 @@ def get_new_asset_partitions_to_request( def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionResult": + from .asset_condition import AssetConditionResult new_asset_partitions_to_request = self.get_new_asset_partitions_to_request(context) asset_subset_to_request = AssetSubset.from_asset_partitions_set( @@ -364,7 +364,7 @@ def evaluate_for_asset( - context.materialized_requested_or_discarded_since_previous_tick_subset ) - return AssetConditionEvaluationResult.create(context, true_subset=asset_subset_to_request) + return AssetConditionResult.create(context, true_subset=asset_subset_to_request) @whitelist_for_serdes @@ -471,11 +471,11 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": + ) -> "AssetConditionResult": """Evaluates the set of asset partitions of this asset whose parents have been updated, or will update on this tick. """ - from .asset_condition import AssetConditionEvaluationResult + from .asset_condition import AssetConditionResult asset_partitions_by_updated_parents: Mapping[ AssetKeyPartitionKey, Set[AssetKeyPartitionKey] @@ -561,7 +561,7 @@ def evaluate_for_asset( asset_partitions_by_evaluation_data, ignore_subset=context.materialized_requested_or_discarded_since_previous_tick_subset, ) - return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) + return AssetConditionResult.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -578,7 +578,11 @@ def get_handled_subset(self, context: AssetConditionEvaluationContext) -> AssetS """Returns the AssetSubset which has been handled (materialized, requested, or discarded). Accounts for cases in which the partitions definition may have changed between ticks. """ - previous_handled_subset = context.cursor.get_extras_value(context.condition, AssetSubset) + previous_handled_subset = ( + context.previous_evaluation_state.get_extra_state(context.condition, AssetSubset) + if context.previous_evaluation_state + else None + ) if previous_handled_subset: # partitioned -> unpartitioned or vice versa if previous_handled_subset.is_partitioned != (context.partitions_def is not None): @@ -603,11 +607,11 @@ def get_handled_subset(self, context: AssetConditionEvaluationContext) -> AssetS def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": + ) -> "AssetConditionResult": """Evaluates the set of asset partitions for this asset which are missing and were not previously discarded. """ - from .asset_condition import AssetConditionEvaluationResult + from .asset_condition import AssetConditionResult handled_subset = self.get_handled_subset(context) unhandled_candidates = ( @@ -619,12 +623,12 @@ def evaluate_for_asset( else context.candidate_subset ) - return AssetConditionEvaluationResult.create( + return AssetConditionResult.create( context, true_subset=unhandled_candidates, # we keep track of the handled subset instead of the unhandled subset because new # partitions may spontaneously jump into existence at any time - extra_value=handled_subset, + extra_state=handled_subset, ) @@ -640,8 +644,8 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionResult": + from .asset_condition import AssetConditionResult asset_partitions_by_evaluation_data = defaultdict(set) @@ -671,7 +675,7 @@ def evaluate_for_asset( true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) - return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) + return AssetConditionResult.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -687,8 +691,8 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext, - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionResult": + from .asset_condition import AssetConditionResult asset_partitions_by_evaluation_data = defaultdict(set) @@ -721,7 +725,7 @@ def evaluate_for_asset( true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) - return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) + return AssetConditionResult.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -757,8 +761,8 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext, - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionResult": + from .asset_condition import AssetConditionResult asset_partitions_by_evaluation_data = defaultdict(set) @@ -810,7 +814,7 @@ def evaluate_for_asset( true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) - return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) + return AssetConditionResult.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -827,8 +831,8 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionResult": + from .asset_condition import AssetConditionResult asset_partitions_by_evaluation_data = defaultdict(set) @@ -855,7 +859,7 @@ def evaluate_for_asset( true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) - return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) + return AssetConditionResult.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -876,8 +880,8 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionResult": + from .asset_condition import AssetConditionResult backfilling_subset = ( context.instance_queryer.get_active_backfill_target_asset_graph_subset() @@ -890,7 +894,7 @@ def evaluate_for_asset( else: true_subset = context.candidate_subset & backfilling_subset - return AssetConditionEvaluationResult.create(context, true_subset) + return AssetConditionResult.create(context, true_subset) @whitelist_for_serdes @@ -907,8 +911,8 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionResult": + from .asset_condition import AssetConditionResult # the set of asset partitions which exceed the limit rate_limited_asset_partitions = set( @@ -918,7 +922,7 @@ def evaluate_for_asset( )[self.limit :] ) - return AssetConditionEvaluationResult.create( + return AssetConditionResult.create( context, AssetSubset.from_asset_partitions_set( context.asset_key, context.partitions_def, rate_limited_asset_partitions diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py index 1088e27eab15f..9c18ef3793a74 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py @@ -132,7 +132,7 @@ def metadata(self) -> MetadataMapping: } -RuleEvaluationResults = Tuple[AssetSubset, Sequence["AssetSubsetWithMetadata"], PackableValue] +RuleEvaluations = Tuple[AssetSubset, Sequence["AssetSubsetWithMetadata"], PackableValue] @whitelist_for_serdes @@ -278,6 +278,7 @@ def _get_child_rule_evaluation( start_timestamp=None, end_timestamp=None, subsets_with_metadata=subsets_with_metadata, + child_evaluations=[], ) def _get_child_decision_type_evaluation( diff --git a/python_modules/dagster/dagster/_utils/test/schedule_storage.py b/python_modules/dagster/dagster/_utils/test/schedule_storage.py index c65f1da38b861..e1c00a1fb4df1 100644 --- a/python_modules/dagster/dagster/_utils/test/schedule_storage.py +++ b/python_modules/dagster/dagster/_utils/test/schedule_storage.py @@ -721,6 +721,8 @@ def test_auto_materialize_asset_evaluations(self, storage) -> None: candidate_subset=AssetSubset(asset_key=AssetKey("asset_one"), value=False), start_timestamp=0, end_timestamp=1, + subsets_with_metadata=[], + child_evaluations=[], ).with_run_ids(set()), AssetConditionEvaluation( condition_snapshot=condition_snapshot, @@ -734,6 +736,7 @@ def test_auto_materialize_asset_evaluations(self, storage) -> None: {"foo": MetadataValue.text("bar")}, ) ], + child_evaluations=[], ).with_run_ids(set()), ], ) @@ -782,6 +785,8 @@ def test_auto_materialize_asset_evaluations(self, storage) -> None: end_timestamp=1, true_subset=AssetSubset(asset_key=AssetKey("asset_one"), value=True), candidate_subset=AssetSubset(asset_key=AssetKey("asset_one"), value=True), + subsets_with_metadata=[], + child_evaluations=[], ).with_run_ids(set()), ], ) @@ -813,6 +818,8 @@ def test_auto_materialize_asset_evaluations(self, storage) -> None: end_timestamp=1, true_subset=AssetSubset(asset_key=AssetKey("asset_one"), value=True), candidate_subset=AssetSubset(asset_key=AssetKey("asset_one"), value=True), + subsets_with_metadata=[], + child_evaluations=[], ).with_run_ids(set()) eval_asset_three = AssetConditionEvaluation( @@ -821,6 +828,8 @@ def test_auto_materialize_asset_evaluations(self, storage) -> None: end_timestamp=1, true_subset=AssetSubset(asset_key=AssetKey("asset_three"), value=True), candidate_subset=AssetSubset(asset_key=AssetKey("asset_three"), value=True), + subsets_with_metadata=[], + child_evaluations=[], ).with_run_ids(set()) storage.add_auto_materialize_asset_evaluations( @@ -868,6 +877,7 @@ def test_auto_materialize_asset_evaluations_with_partitions(self, storage) -> No true_subset=asset_subset, candidate_subset=asset_subset, subsets_with_metadata=[asset_subset_with_metadata], + child_evaluations=[], ).with_run_ids(set()), ], ) @@ -901,6 +911,7 @@ def test_purge_asset_evaluations(self, storage) -> None: true_subset=AssetSubset(asset_key=AssetKey("asset_one"), value=True), candidate_subset=AssetSubset(asset_key=AssetKey("asset_one"), value=True), subsets_with_metadata=[], + child_evaluations=[], ).with_run_ids(set()), ], ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py index bdabe2924f487..e4ddc18aec894 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py @@ -694,7 +694,9 @@ def assert_evaluation( if num_requested is not None: assert actual_evaluation.true_subset.size == num_requested - def get_leaf_evaluations(e: AssetConditionEvaluation) -> Sequence[AssetConditionEvaluation]: + def get_leaf_evaluations( + e: AssetConditionEvaluation, + ) -> Sequence[AssetConditionEvaluation]: if len(e.child_evaluations) == 0: return [e] leaf_evals = []