diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition.py b/python_modules/dagster/dagster/_core/definitions/asset_condition.py index 56c859b01a718..974586cf0fdfc 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition.py @@ -12,6 +12,8 @@ Optional, Sequence, Tuple, + Type, + TypeVar, Union, ) @@ -41,6 +43,9 @@ from .auto_materialize_rule import AutoMaterializeRule +T = TypeVar("T") + + @whitelist_for_serdes class HistoricalAllPartitionsSubset(NamedTuple): """Serializable indicator that this value was an AllPartitionsSubset at serialization time, but @@ -111,7 +116,7 @@ def unpack( @whitelist_for_serdes(field_serializers={"candidate_subset": CandidateSubsetSerializer}) -class AssetConditionEvaluation(NamedTuple): +class AssetConditionEvaluationResult(NamedTuple): """Internal representation of the results of evaluating a node in the evaluation tree.""" condition_snapshot: AssetConditionSnapshot @@ -120,7 +125,7 @@ class AssetConditionEvaluation(NamedTuple): start_timestamp: Optional[float] end_timestamp: Optional[float] subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = [] - child_evaluations: Sequence["AssetConditionEvaluation"] = [] + child_evaluations: Sequence["AssetConditionEvaluationResult"] = [] @property def asset_key(self) -> AssetKey: @@ -141,7 +146,9 @@ def get_candidate_subset( ) return self.candidate_subset - def equivalent_to_stored_evaluation(self, other: Optional["AssetConditionEvaluation"]) -> bool: + def equivalent_to_stored_evaluation( + self, other: Optional["AssetConditionEvaluationResult"] + ) -> bool: """Returns if all fields other than `run_ids` are equal.""" return ( other is not None @@ -178,7 +185,9 @@ def get_requested_or_discarded_subset(self, condition: "AssetCondition") -> Asse else: return self.true_subset | discarded_subset - def for_child(self, child_condition: "AssetCondition") -> Optional["AssetConditionEvaluation"]: + def for_child( + self, child_condition: "AssetCondition" + ) -> Optional["AssetConditionEvaluationResult"]: """Returns the evaluation of a given child condition by finding the child evaluation that has an identical hash to the given condition. """ @@ -193,40 +202,48 @@ def with_run_ids(self, run_ids: AbstractSet[str]) -> "AssetConditionEvaluationWi return AssetConditionEvaluationWithRunIds(evaluation=self, run_ids=frozenset(run_ids)) -class AssetConditionEvaluationResult(NamedTuple): +@whitelist_for_serdes +class AssetConditionEvaluationInfo(NamedTuple): """Return value for the evaluate method of an AssetCondition.""" - condition: "AssetCondition" - evaluation: AssetConditionEvaluation - extra_values_by_unique_id: Mapping[str, PackableValue] + asset_key: AssetKey + evaluation_result: AssetConditionEvaluationResult + + max_storage_id: Optional[int] + timestamp: Optional[float] + extra_state_by_unique_id: Mapping[str, PackableValue] @property def true_subset(self) -> AssetSubset: - return self.evaluation.true_subset + return self.evaluation_result.true_subset @staticmethod def create_from_children( context: "AssetConditionEvaluationContext", true_subset: AssetSubset, - child_results: Sequence["AssetConditionEvaluationResult"], - ) -> "AssetConditionEvaluationResult": + child_results: Sequence["AssetConditionEvaluationInfo"], + ) -> "AssetConditionEvaluationInfo": """Returns a new AssetConditionEvaluationResult from the given child results.""" - return AssetConditionEvaluationResult( - condition=context.condition, - evaluation=AssetConditionEvaluation( - context.condition.snapshot, + return AssetConditionEvaluationInfo( + asset_key=context.asset_key, + evaluation_result=AssetConditionEvaluationResult( + condition_snapshot=context.condition.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, start_timestamp=context.start_timestamp, end_timestamp=pendulum.now("UTC").timestamp(), subsets_with_metadata=[], - child_evaluations=[child_result.evaluation for child_result in child_results], + child_evaluations=[ + child_result.evaluation_result for child_result in child_results + ], ), - extra_values_by_unique_id=dict( + extra_state_by_unique_id=dict( item for child_result in child_results - for item in child_result.extra_values_by_unique_id.items() + for item in child_result.extra_state_by_unique_id.items() ), + max_storage_id=context.new_max_storage_id, + timestamp=context.evaluation_time.timestamp(), ) @staticmethod @@ -234,12 +251,12 @@ def create( context: "AssetConditionEvaluationContext", true_subset: AssetSubset, subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = [], - extra_value: PackableValue = None, - ) -> "AssetConditionEvaluationResult": + extra_state: PackableValue = None, + ) -> "AssetConditionEvaluationInfo": """Returns a new AssetConditionEvaluationResult from the given parameters.""" - return AssetConditionEvaluationResult( - condition=context.condition, - evaluation=AssetConditionEvaluation( + return AssetConditionEvaluationInfo( + asset_key=context.asset_key, + evaluation_result=AssetConditionEvaluationResult( context.condition.snapshot, true_subset=true_subset, start_timestamp=context.start_timestamp, @@ -247,11 +264,22 @@ def create( candidate_subset=context.candidate_subset, subsets_with_metadata=subsets_with_metadata, ), - extra_values_by_unique_id={context.condition.unique_id: extra_value} - if extra_value + extra_state_by_unique_id={context.condition.unique_id: extra_state} + if extra_state else {}, + max_storage_id=context.new_max_storage_id, + timestamp=context.evaluation_time.timestamp(), ) + def get_extra_state(self, condition: "AssetCondition", as_type: Type[T]) -> Optional[T]: + """Returns the value from the extras dict for the given condition, if it exists and is of + the expected type. Otherwise, returns None. + """ + extra_state = self.extra_state_by_unique_id.get(condition.unique_id) + if isinstance(extra_state, as_type): + return extra_state + return None + @whitelist_for_serdes class AssetConditionEvaluationWithRunIds(NamedTuple): @@ -259,7 +287,7 @@ class AssetConditionEvaluationWithRunIds(NamedTuple): response to it. """ - evaluation: AssetConditionEvaluation + evaluation: AssetConditionEvaluationResult run_ids: FrozenSet[str] @property @@ -286,9 +314,7 @@ def unique_id(self) -> str: return hashlib.md5("".join(parts).encode()).hexdigest() @abstractmethod - def evaluate( - self, context: "AssetConditionEvaluationContext" - ) -> AssetConditionEvaluationResult: + def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionEvaluationInfo: raise NotImplementedError() @abstractproperty @@ -359,9 +385,7 @@ def unique_id(self) -> str: def description(self) -> str: return self.rule.description - def evaluate( - self, context: "AssetConditionEvaluationContext" - ) -> AssetConditionEvaluationResult: + def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionEvaluationInfo: context.root_context.daemon_context._verbose_log_fn( # noqa f"Evaluating rule: {self.rule.to_snapshot()}" ) @@ -383,17 +407,15 @@ class AndAssetCondition( def description(self) -> str: return "All of" - def evaluate( - self, context: "AssetConditionEvaluationContext" - ) -> AssetConditionEvaluationResult: - child_results: List[AssetConditionEvaluationResult] = [] + def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionEvaluationInfo: + child_results: List[AssetConditionEvaluationInfo] = [] true_subset = context.candidate_subset for child in self.children: child_context = context.for_child(condition=child, candidate_subset=true_subset) child_result = child.evaluate(child_context) child_results.append(child_result) true_subset &= child_result.true_subset - return AssetConditionEvaluationResult.create_from_children( + return AssetConditionEvaluationInfo.create_from_children( context, true_subset, child_results ) @@ -408,10 +430,8 @@ class OrAssetCondition( def description(self) -> str: return "Any of" - def evaluate( - self, context: "AssetConditionEvaluationContext" - ) -> AssetConditionEvaluationResult: - child_results: List[AssetConditionEvaluationResult] = [] + def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionEvaluationInfo: + child_results: List[AssetConditionEvaluationInfo] = [] true_subset = context.empty_subset() for child in self.children: child_context = context.for_child( @@ -420,7 +440,7 @@ def evaluate( child_result = child.evaluate(child_context) child_results.append(child_result) true_subset |= child_result.true_subset - return AssetConditionEvaluationResult.create_from_children( + return AssetConditionEvaluationInfo.create_from_children( context, true_subset, child_results ) @@ -443,15 +463,13 @@ def description(self) -> str: def child(self) -> AssetCondition: return self.children[0] - def evaluate( - self, context: "AssetConditionEvaluationContext" - ) -> AssetConditionEvaluationResult: + def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionEvaluationInfo: child_context = context.for_child( condition=self.child, candidate_subset=context.candidate_subset ) child_result = self.child.evaluate(child_context) true_subset = context.candidate_subset - child_result.true_subset - return AssetConditionEvaluationResult.create_from_children( + return AssetConditionEvaluationInfo.create_from_children( context, true_subset, [child_result] ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py index df79ae469eed6..f6eb77fa1e8c0 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py @@ -26,12 +26,16 @@ from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping from dagster._utils.caching_instance_queryer import CachingInstanceQueryer -from .asset_daemon_cursor import AssetConditionCursor from .asset_graph import AssetGraph from .asset_subset import AssetSubset if TYPE_CHECKING: - from .asset_condition import AssetCondition, AssetConditionEvaluation, AssetSubsetWithMetadata + from .asset_condition import ( + AssetCondition, + AssetConditionEvaluationInfo, + AssetConditionEvaluationResult, + AssetSubsetWithMetadata, + ) from .asset_daemon_context import AssetDaemonContext @@ -54,15 +58,15 @@ class AssetConditionEvaluationContext: asset_key: AssetKey condition: "AssetCondition" - cursor: AssetConditionCursor - previous_condition_evaluation: Optional["AssetConditionEvaluation"] + previous_evaluation_info: Optional["AssetConditionEvaluationInfo"] + previous_evaluation_result: Optional["AssetConditionEvaluationResult"] candidate_subset: AssetSubset instance_queryer: CachingInstanceQueryer data_time_resolver: CachingDataTimeResolver daemon_context: "AssetDaemonContext" - evaluation_results_by_key: Mapping[AssetKey, "AssetConditionEvaluation"] + evaluation_info_by_key: Mapping[AssetKey, "AssetConditionEvaluationInfo"] expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]] start_timestamp: float @@ -72,11 +76,11 @@ class AssetConditionEvaluationContext: def create( asset_key: AssetKey, condition: "AssetCondition", - cursor: AssetConditionCursor, + previous_evaluation_info: Optional["AssetConditionEvaluationInfo"], instance_queryer: CachingInstanceQueryer, data_time_resolver: CachingDataTimeResolver, daemon_context: "AssetDaemonContext", - evaluation_results_by_key: Mapping[AssetKey, "AssetConditionEvaluation"], + evaluation_info_by_key: Mapping[AssetKey, "AssetConditionEvaluationInfo"], expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]], ) -> "AssetConditionEvaluationContext": partitions_def = instance_queryer.asset_graph.get_partitions_def(asset_key) @@ -84,8 +88,10 @@ def create( return AssetConditionEvaluationContext( asset_key=asset_key, condition=condition, - cursor=cursor, - previous_condition_evaluation=cursor.previous_evaluation, + previous_evaluation_info=previous_evaluation_info, + previous_evaluation_result=previous_evaluation_info.evaluation_result + if previous_evaluation_info + else None, candidate_subset=AssetSubset.all( asset_key, partitions_def, @@ -95,7 +101,7 @@ def create( data_time_resolver=data_time_resolver, instance_queryer=instance_queryer, daemon_context=daemon_context, - evaluation_results_by_key=evaluation_results_by_key, + evaluation_info_by_key=evaluation_info_by_key, expected_data_time_mapping=expected_data_time_mapping, start_timestamp=pendulum.now("UTC").timestamp(), ) @@ -106,8 +112,8 @@ def for_child( return dataclasses.replace( self, condition=condition, - previous_condition_evaluation=self.previous_condition_evaluation.for_child(condition) - if self.previous_condition_evaluation + previous_evaluation_result=self.previous_evaluation_result.for_child(condition) + if self.previous_evaluation_result else None, candidate_subset=candidate_subset, root_ref=self.root_context, @@ -134,23 +140,25 @@ def evaluation_time(self) -> datetime.datetime: @property def previous_max_storage_id(self) -> Optional[int]: - return self.cursor.previous_max_storage_id + return ( + self.previous_evaluation_info.max_storage_id if self.previous_evaluation_info else None + ) @property def previous_evaluation_timestamp(self) -> Optional[float]: - return self.cursor.previous_evaluation_timestamp + return self.previous_evaluation_info.timestamp if self.previous_evaluation_info else None @property def previous_true_subset(self) -> AssetSubset: - if self.previous_condition_evaluation is None: + if self.previous_evaluation_result is None: return self.empty_subset() - return self.previous_condition_evaluation.true_subset + return self.previous_evaluation_result.true_subset @property def previous_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetadata"]: - if self.previous_condition_evaluation is None: + if self.previous_evaluation_result is None: return [] - return self.previous_condition_evaluation.subsets_with_metadata + return self.previous_evaluation_result.subsets_with_metadata @functools.cached_property @root_property @@ -162,10 +170,10 @@ def parent_will_update_subset(self) -> AssetSubset: for parent_key in self.asset_graph.get_parents(self.asset_key): if not self.materializable_in_same_run(self.asset_key, parent_key): continue - parent_result = self.evaluation_results_by_key.get(parent_key) - if not parent_result: + parent_info = self.evaluation_info_by_key.get(parent_key) + if not parent_info: continue - parent_subset = parent_result.true_subset + parent_subset = parent_info.true_subset subset |= parent_subset._replace(asset_key=self.asset_key) return subset @@ -179,7 +187,7 @@ def materialized_since_previous_tick_subset(self) -> AssetSubset: self.instance_queryer.get_asset_partitions_updated_after_cursor( self.asset_key, asset_partitions=None, - after_cursor=self.cursor.previous_max_storage_id, + after_cursor=self.previous_max_storage_id, respect_materialization_data_versions=False, ), ) @@ -188,11 +196,15 @@ def materialized_since_previous_tick_subset(self) -> AssetSubset: @root_property def previous_tick_requested_subset(self) -> AssetSubset: """The set of asset partitions that were requested (or discarded) on the previous tick.""" - previous_evaluation = self.cursor.previous_evaluation - if previous_evaluation is None: + if ( + self.previous_evaluation_info is None + or self.previous_evaluation_info.evaluation_result is None + ): return self.empty_subset() - return previous_evaluation.get_requested_or_discarded_subset(self.condition) + return self.previous_evaluation_info.evaluation_result.get_requested_or_discarded_subset( + self.condition + ) @property def materialized_requested_or_discarded_since_previous_tick_subset(self) -> AssetSubset: @@ -211,7 +223,7 @@ def _parent_has_updated_subset_and_new_latest_storage_id( asset_partitions, cursor, ) = self.root_context.instance_queryer.asset_partitions_with_newly_updated_parents_and_new_cursor( - latest_storage_id=self.cursor.previous_max_storage_id, + latest_storage_id=self.previous_max_storage_id, child_asset_key=self.root_context.asset_key, map_old_time_partitions=False, ) @@ -247,16 +259,16 @@ def candidates_not_evaluated_on_previous_tick_subset(self) -> AssetSubset: """ from .asset_condition import HistoricalAllPartitionsSubset - if not self.previous_condition_evaluation: + if not self.previous_evaluation_result: return self.candidate_subset # when the candidate_subset is HistoricalAllPartitionsSubset, this indicates that the # entire asset was evaluated for this condition on the previous tick, and so no candidates # were *not* evaluated on the previous tick elif isinstance( - self.previous_condition_evaluation.candidate_subset, HistoricalAllPartitionsSubset + self.previous_evaluation_result.candidate_subset, HistoricalAllPartitionsSubset ): return self.empty_subset() - return self.candidate_subset - self.previous_condition_evaluation.candidate_subset + return self.candidate_subset - self.previous_evaluation_result.candidate_subset def materializable_in_same_run(self, child_key: AssetKey, parent_key: AssetKey) -> bool: """Returns whether a child asset can be materialized in the same run as a parent asset.""" @@ -303,10 +315,10 @@ def get_parents_that_will_not_be_materialized_on_current_tick( } def will_update_asset_partition(self, asset_partition: AssetKeyPartitionKey) -> bool: - parent_evaluation = self.evaluation_results_by_key.get(asset_partition.asset_key) - if not parent_evaluation: + parent_evaluation_info = self.evaluation_info_by_key.get(asset_partition.asset_key) + if not parent_evaluation_info: return False - return asset_partition in parent_evaluation.true_subset + return asset_partition in parent_evaluation_info.true_subset def add_evaluation_data_from_previous_tick( self, diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index c95480ddc42ec..8e256819f4589 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -34,11 +34,11 @@ from ... import PartitionKeyRange from ..storage.tags import ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG -from .asset_condition import AssetConditionEvaluation +from .asset_condition import AssetConditionEvaluationInfo, AssetConditionEvaluationResult from .asset_condition_evaluation_context import ( AssetConditionEvaluationContext, ) -from .asset_daemon_cursor import AssetConditionCursor, AssetDaemonCursor +from .asset_daemon_cursor import AssetDaemonCursor from .asset_graph import AssetGraph from .auto_materialize_rule import AutoMaterializeRule from .backfill_policy import BackfillPolicy, BackfillPolicyType @@ -176,9 +176,9 @@ def prefetch(self) -> None: def evaluate_asset( self, asset_key: AssetKey, - evaluation_results_by_key: Mapping[AssetKey, AssetConditionEvaluation], + evaluation_info_by_key: Mapping[AssetKey, AssetConditionEvaluationInfo], expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]], - ) -> Tuple[AssetConditionEvaluation, AssetConditionCursor, Optional[datetime.datetime]]: + ) -> Tuple[AssetConditionEvaluationInfo, Optional[datetime.datetime]]: """Evaluates the auto materialize policy of a given asset key. Params: @@ -196,48 +196,34 @@ def evaluate_asset( self.asset_graph.auto_materialize_policies_by_key.get(asset_key) ).to_asset_condition() - asset_cursor = self.cursor.get_asset_cursor(asset_key) + asset_cursor = self.cursor.get_previous_evaluation_info(asset_key) context = AssetConditionEvaluationContext.create( asset_key=asset_key, - cursor=asset_cursor, + previous_evaluation_info=asset_cursor, condition=asset_condition, instance_queryer=self.instance_queryer, data_time_resolver=self.data_time_resolver, daemon_context=self, - evaluation_results_by_key=evaluation_results_by_key, + evaluation_info_by_key=evaluation_info_by_key, expected_data_time_mapping=expected_data_time_mapping, ) - evaluation_result = asset_condition.evaluate(context) - - new_asset_cursor = AssetConditionCursor( - asset_key=asset_key, - previous_max_storage_id=context.new_max_storage_id, - previous_evaluation_timestamp=context.evaluation_time.timestamp(), - previous_evaluation=evaluation_result.evaluation, - extra_values_by_unique_id=evaluation_result.extra_values_by_unique_id, - ) + evaluation_info = asset_condition.evaluate(context) expected_data_time = get_expected_data_time_for_asset_key( - context, will_materialize=evaluation_result.true_subset.size > 0 + context, will_materialize=evaluation_info.true_subset.size > 0 ) - return evaluation_result.evaluation, new_asset_cursor, expected_data_time + return evaluation_info, expected_data_time def get_asset_condition_evaluations( self, - ) -> Tuple[ - Sequence[AssetConditionEvaluation], - Sequence[AssetConditionCursor], - AbstractSet[AssetKeyPartitionKey], - ]: + ) -> Tuple[Sequence[AssetConditionEvaluationInfo], AbstractSet[AssetKeyPartitionKey]]: """Returns a mapping from asset key to the AutoMaterializeAssetEvaluation for that key, a sequence of new per-asset cursors, and the set of all asset partitions that should be materialized or discarded this tick. """ - asset_cursors: List[AssetConditionCursor] = [] - - evaluation_results_by_key: Dict[AssetKey, AssetConditionEvaluation] = {} + evaluation_info_by_key: Dict[AssetKey, AssetConditionEvaluationInfo] = {} expected_data_time_mapping: Dict[AssetKey, Optional[datetime.datetime]] = defaultdict() to_request: Set[AssetKeyPartitionKey] = set() @@ -256,14 +242,14 @@ def get_asset_condition_evaluations( f" {asset_key.to_user_string()} ({num_checked_assets}/{num_auto_materialize_asset_keys})" ) - (evaluation, asset_cursor_for_asset, expected_data_time) = self.evaluate_asset( - asset_key, evaluation_results_by_key, expected_data_time_mapping + (evaluation_info, expected_data_time) = self.evaluate_asset( + asset_key, evaluation_info_by_key, expected_data_time_mapping ) - num_requested = evaluation.true_subset.size + num_requested = evaluation_info.true_subset.size log_fn = self._logger.info if num_requested > 0 else self._logger.debug - to_request_asset_partitions = evaluation.true_subset.asset_partitions + to_request_asset_partitions = evaluation_info.true_subset.asset_partitions to_request_str = ",".join( [(ap.partition_key or "No partition") for ap in to_request_asset_partitions] ) @@ -274,9 +260,8 @@ def get_asset_condition_evaluations( f" requested ({to_request_str}) ({format(time.time()-start_time, '.3f')} seconds)" ) - evaluation_results_by_key[asset_key] = evaluation + evaluation_info_by_key[asset_key] = evaluation_info expected_data_time_mapping[asset_key] = expected_data_time - asset_cursors.append(asset_cursor_for_asset) # if we need to materialize any partitions of a non-subsettable multi-asset, we need to # materialize all of them @@ -285,22 +270,25 @@ def get_asset_condition_evaluations( expected_data_time_mapping[neighbor_key] = expected_data_time # make sure that the true_subset of the neighbor is accurate - if neighbor_key in evaluation_results_by_key: - neighbor_evaluation = evaluation_results_by_key[neighbor_key] - evaluation_results_by_key[neighbor_key] = neighbor_evaluation._replace( - true_subset=neighbor_evaluation.true_subset - | evaluation.true_subset._replace(asset_key=neighbor_key) + if neighbor_key in evaluation_info_by_key: + neighbor_evaluation = evaluation_info_by_key[neighbor_key] + evaluation_info_by_key[neighbor_key] = neighbor_evaluation._replace( + evaluation_result=neighbor_evaluation.evaluation_result._replace( + true_subset=neighbor_evaluation.true_subset._replace( + asset_key=neighbor_key + ) + ) ) to_request |= { ap._replace(asset_key=neighbor_key) - for ap in evaluation.true_subset.asset_partitions + for ap in evaluation_info.true_subset.asset_partitions } - return (list(evaluation_results_by_key.values()), asset_cursors, to_request) + return (list(evaluation_info_by_key.values()), to_request) def evaluate( self, - ) -> Tuple[Sequence[RunRequest], AssetDaemonCursor, Sequence[AssetConditionEvaluation]]: + ) -> Tuple[Sequence[RunRequest], AssetDaemonCursor, Sequence[AssetConditionEvaluationResult]]: observe_request_timestamp = pendulum.now().timestamp() auto_observe_run_requests = ( get_auto_observe_run_requests( @@ -314,7 +302,7 @@ def evaluate( else [] ) - evaluations, asset_cursors, to_request = self.get_asset_condition_evaluations() + evaluation_info, to_request = self.get_asset_condition_evaluations() run_requests = [ *build_run_requests( @@ -329,7 +317,7 @@ def evaluate( run_requests, self.cursor.with_updates( evaluation_id=self._evaluation_id, - asset_cursors=asset_cursors, + evaluation_info=evaluation_info, newly_observe_requested_asset_keys=[ asset_key for run_request in auto_observe_run_requests @@ -337,12 +325,12 @@ def evaluate( ], evaluation_timestamp=self.instance_queryer.evaluation_time.timestamp(), ), - # only record evaluations where something changed + # only record evaluation results where something changed [ - evaluation - for evaluation in evaluations - if not evaluation.equivalent_to_stored_evaluation( - self.cursor.get_previous_evaluation(evaluation.asset_key) + ei.evaluation_result + for ei in evaluation_info + if not ei.evaluation_result.equivalent_to_stored_evaluation( + self.cursor.get_previous_evaluation_result(ei.asset_key) ) ], ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index 9107f31435ecc..c5309083c2136 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -9,14 +9,11 @@ NamedTuple, Optional, Sequence, - Type, - TypeVar, ) from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.asset_subset import AssetSubset from dagster._core.definitions.events import AssetKey -from dagster._core.definitions.partition import PartitionsDefinition from dagster._serdes.serdes import ( DeserializationError, FieldSerializer, @@ -35,9 +32,11 @@ from .asset_graph import AssetGraph if TYPE_CHECKING: - from .asset_condition import AssetCondition, AssetConditionEvaluation, AssetConditionSnapshot - -T = TypeVar("T") + from .asset_condition import ( + AssetConditionEvaluationInfo, + AssetConditionEvaluationResult, + AssetConditionSnapshot, + ) @whitelist_for_serdes @@ -50,46 +49,6 @@ class AssetConditionCursorExtras(NamedTuple): extras: Mapping[str, PackableValue] -@whitelist_for_serdes -class AssetConditionCursor(NamedTuple): - """Represents the evaluated state of an AssetConditionCursor at a certain point in time. This - information can be used to make future evaluations more efficient. - """ - - asset_key: AssetKey - previous_evaluation: Optional["AssetConditionEvaluation"] - previous_max_storage_id: Optional[int] - previous_evaluation_timestamp: Optional[float] - - extra_values_by_unique_id: Mapping[str, PackableValue] - - @staticmethod - def empty(asset_key: AssetKey) -> "AssetConditionCursor": - return AssetConditionCursor( - asset_key=asset_key, - previous_evaluation=None, - previous_max_storage_id=None, - previous_evaluation_timestamp=None, - extra_values_by_unique_id={}, - ) - - def get_extras_value(self, condition: "AssetCondition", as_type: Type[T]) -> Optional[T]: - """Returns the value from the extras dict for the given condition, if it exists and is of - the expected type. Otherwise, returns None. - """ - extras_value = self.extra_values_by_unique_id.get(condition.unique_id) - if isinstance(extras_value, as_type): - return extras_value - return None - - def get_previous_requested_or_discarded_subset( - self, condition: "AssetCondition", partitions_def: Optional[PartitionsDefinition] - ) -> AssetSubset: - if not self.previous_evaluation: - return AssetSubset.empty(self.asset_key, partitions_def) - return self.previous_evaluation.get_requested_or_discarded_subset(condition) - - class ObserveRequestTimestampSerializer(FieldSerializer): def pack( self, @@ -118,12 +77,12 @@ class AssetDaemonCursor(NamedTuple): Attributes: evaluation_id (int): The ID of the evaluation that produced this cursor. - asset_cursors (Sequence[AssetConditionCursor]): The state of each asset that the daemon - is responsible for handling. + previous_evaluation_info (Sequence[AssetConditionEvaluationInfo]): The evaluation info + recorded for each asset on the previous tick. """ evaluation_id: int - asset_cursors: Sequence[AssetConditionCursor] + previous_evaluation_info: Sequence["AssetConditionEvaluationInfo"] last_observe_request_timestamp_by_asset_key: Mapping[AssetKey, float] @@ -131,7 +90,7 @@ class AssetDaemonCursor(NamedTuple): def empty(evaluation_id: int = 0) -> "AssetDaemonCursor": return AssetDaemonCursor( evaluation_id=evaluation_id, - asset_cursors=[], + previous_evaluation_info=[], last_observe_request_timestamp_by_asset_key={}, ) @@ -165,31 +124,38 @@ def serialize(self) -> str: @property @functools.lru_cache(maxsize=1) - def asset_cursors_by_key(self) -> Mapping[AssetKey, AssetConditionCursor]: - """Efficient lookup of asset cursors by asset key.""" - return {cursor.asset_key: cursor for cursor in self.asset_cursors} + def previous_evaluation_info_by_key(self) -> Mapping[AssetKey, "AssetConditionEvaluationInfo"]: + """Efficient lookup of previous evaluation info by asset key.""" + return { + evaluation_info.asset_key: evaluation_info + for evaluation_info in self.previous_evaluation_info + } - def get_asset_cursor(self, asset_key: AssetKey) -> AssetConditionCursor: + def get_previous_evaluation_info( + self, asset_key: AssetKey + ) -> Optional["AssetConditionEvaluationInfo"]: """Returns the AssetConditionCursor associated with the given asset key. If no stored cursor exists, returns an empty cursor. """ - return self.asset_cursors_by_key.get(asset_key) or AssetConditionCursor.empty(asset_key) + return self.previous_evaluation_info_by_key.get(asset_key) - def get_previous_evaluation(self, asset_key: AssetKey) -> Optional["AssetConditionEvaluation"]: + def get_previous_evaluation_result( + self, asset_key: AssetKey + ) -> Optional["AssetConditionEvaluationResult"]: """Returns the previous AssetConditionEvaluation for a given asset key, if it exists.""" - cursor = self.get_asset_cursor(asset_key) - return cursor.previous_evaluation if cursor else None + previous_evaluation_info = self.get_previous_evaluation_info(asset_key) + return previous_evaluation_info.evaluation_result if previous_evaluation_info else None def with_updates( self, evaluation_id: int, evaluation_timestamp: float, newly_observe_requested_asset_keys: Sequence[AssetKey], - asset_cursors: Sequence[AssetConditionCursor], + evaluation_info: Sequence["AssetConditionEvaluationInfo"], ) -> "AssetDaemonCursor": return self._replace( evaluation_id=evaluation_id, - asset_cursors=asset_cursors, + previous_evaluation_info=evaluation_info, last_observe_request_timestamp_by_asset_key={ **self.last_observe_request_timestamp_by_asset_key, **{ @@ -206,24 +172,27 @@ def __hash__(self) -> int: # BACKCOMPAT -def get_backcompat_asset_condition_cursor( +def get_backcompat_asset_condition_evaluation_info( asset_key: AssetKey, + latest_evaluation: "AssetConditionEvaluationResult", latest_storage_id: Optional[int], latest_timestamp: Optional[float], - latest_evaluation: Optional["AssetConditionEvaluation"], handled_root_subset: Optional[AssetSubset], -) -> AssetConditionCursor: +) -> "AssetConditionEvaluationInfo": """Generates an AssetDaemonCursor from information available on the old cursor format.""" - from dagster._core.definitions.asset_condition import RuleCondition + from dagster._core.definitions.asset_condition import ( + AssetConditionEvaluationInfo, + RuleCondition, + ) from dagster._core.definitions.auto_materialize_rule import MaterializeOnMissingRule - return AssetConditionCursor( + return AssetConditionEvaluationInfo( asset_key=asset_key, - previous_evaluation=latest_evaluation, - previous_evaluation_timestamp=latest_timestamp, - previous_max_storage_id=latest_storage_id, + evaluation_result=latest_evaluation, + timestamp=latest_timestamp, + max_storage_id=latest_storage_id, # the only information we need to preserve from the previous cursor is the handled subset - extra_values_by_unique_id={ + extra_state_by_unique_id={ RuleCondition(MaterializeOnMissingRule()).unique_id: handled_root_subset, } if handled_root_subset and handled_root_subset.size > 0 @@ -237,6 +206,7 @@ def backcompat_deserialize_asset_daemon_cursor_str( """This serves as a backcompat layer for deserializing the old cursor format. Some information is impossible to fully recover, this will recover enough to continue operating as normal. """ + from .asset_condition import AssetConditionEvaluationResult, AssetConditionSnapshot from .auto_materialize_rule_evaluation import ( deserialize_auto_materialize_asset_evaluation_to_asset_condition_evaluation_with_run_ids, ) @@ -290,28 +260,39 @@ def backcompat_deserialize_asset_daemon_cursor_str( latest_evaluation_by_asset_key[key] = evaluation - asset_cursors = [] + previous_evaluation_info = [] cursor_keys = ( asset_graph.auto_materialize_policies_by_key.keys() if asset_graph else latest_evaluation_by_asset_key.keys() ) for asset_key in cursor_keys: - latest_evaluation = latest_evaluation_by_asset_key.get(asset_key) - asset_cursors.append( - get_backcompat_asset_condition_cursor( - asset_key, - data.get("latest_storage_id"), - data.get("latest_evaluation_timestamp"), - latest_evaluation, - handled_root_asset_graph_subset.get_asset_subset(asset_key, asset_graph) - if asset_graph - else None, + latest_evaluation_result = latest_evaluation_by_asset_key.get(asset_key) + # create a placeholder evaluation result if we don't have one + if not latest_evaluation_result: + partitions_def = asset_graph.get_partitions_def(asset_key) if asset_graph else None + latest_evaluation_result = AssetConditionEvaluationResult( + condition_snapshot=AssetConditionSnapshot("", "", ""), + true_subset=AssetSubset.empty(asset_key, partitions_def), + candidate_subset=AssetSubset.empty(asset_key, partitions_def), + start_timestamp=None, + end_timestamp=None, + subsets_with_metadata=[], + child_evaluations=[], ) + backcompat_evaluation_info = get_backcompat_asset_condition_evaluation_info( + asset_key, + latest_evaluation_result, + data.get("latest_storage_id"), + data.get("latest_evaluation_timestamp"), + handled_root_asset_graph_subset.get_asset_subset(asset_key, asset_graph) + if asset_graph + else None, ) + previous_evaluation_info.append(backcompat_evaluation_info) return AssetDaemonCursor( evaluation_id=default_evaluation_id, - asset_cursors=asset_cursors, + previous_evaluation_info=previous_evaluation_info, last_observe_request_timestamp_by_asset_key=last_observe_request_timestamp_by_asset_key, ) diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 1167fc69abdf8..a5da710acc0e3 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -48,7 +48,7 @@ from .asset_graph import sort_key_for_asset_partition if TYPE_CHECKING: - from dagster._core.definitions.asset_condition import AssetConditionEvaluationResult + from dagster._core.definitions.asset_condition import AssetConditionEvaluationInfo class AutoMaterializeRule(ABC): @@ -78,7 +78,7 @@ def description(self) -> str: @abstractmethod def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": + ) -> "AssetConditionEvaluationInfo": """The core evaluation function for the rule. This function takes in a context object and returns a mapping from evaluated rules to the set of asset partitions that the rule applies to. @@ -239,13 +239,13 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionEvaluationInfo": + from .asset_condition import AssetConditionEvaluationInfo true_subset, subsets_with_metadata = freshness_evaluation_results_for_asset_key( context.root_context ) - return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) + return AssetConditionEvaluationInfo.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -268,7 +268,7 @@ def missed_cron_ticks( self, context: AssetConditionEvaluationContext ) -> Sequence[datetime.datetime]: """Returns the cron ticks which have been missed since the previous cursor was generated.""" - if not context.cursor.previous_evaluation_timestamp: + if not context.previous_evaluation_timestamp: previous_dt = next( reverse_cron_string_iterator( end_timestamp=context.evaluation_time.timestamp(), @@ -279,7 +279,7 @@ def missed_cron_ticks( return [previous_dt] missed_ticks = [] for dt in cron_string_iterator( - start_timestamp=context.cursor.previous_evaluation_timestamp, + start_timestamp=context.previous_evaluation_timestamp, cron_string=self.cron_schedule, execution_timezone=self.timezone, ): @@ -353,8 +353,8 @@ def get_new_asset_partitions_to_request( def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionEvaluationInfo": + from .asset_condition import AssetConditionEvaluationInfo new_asset_partitions_to_request = self.get_new_asset_partitions_to_request(context) asset_subset_to_request = AssetSubset.from_asset_partitions_set( @@ -364,7 +364,7 @@ def evaluate_for_asset( - context.materialized_requested_or_discarded_since_previous_tick_subset ) - return AssetConditionEvaluationResult.create(context, true_subset=asset_subset_to_request) + return AssetConditionEvaluationInfo.create(context, true_subset=asset_subset_to_request) @whitelist_for_serdes @@ -471,11 +471,11 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": + ) -> "AssetConditionEvaluationInfo": """Evaluates the set of asset partitions of this asset whose parents have been updated, or will update on this tick. """ - from .asset_condition import AssetConditionEvaluationResult + from .asset_condition import AssetConditionEvaluationInfo asset_partitions_by_updated_parents: Mapping[ AssetKeyPartitionKey, Set[AssetKeyPartitionKey] @@ -561,7 +561,7 @@ def evaluate_for_asset( asset_partitions_by_evaluation_data, ignore_subset=context.materialized_requested_or_discarded_since_previous_tick_subset, ) - return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) + return AssetConditionEvaluationInfo.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -578,7 +578,11 @@ def get_handled_subset(self, context: AssetConditionEvaluationContext) -> AssetS """Returns the AssetSubset which has been handled (materialized, requested, or discarded). Accounts for cases in which the partitions definition may have changed between ticks. """ - previous_handled_subset = context.cursor.get_extras_value(context.condition, AssetSubset) + previous_handled_subset = ( + context.previous_evaluation_info.get_extra_state(context.condition, AssetSubset) + if context.previous_evaluation_info + else None + ) if previous_handled_subset: # partitioned -> unpartitioned or vice versa if previous_handled_subset.is_partitioned != (context.partitions_def is not None): @@ -603,11 +607,11 @@ def get_handled_subset(self, context: AssetConditionEvaluationContext) -> AssetS def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": + ) -> "AssetConditionEvaluationInfo": """Evaluates the set of asset partitions for this asset which are missing and were not previously discarded. """ - from .asset_condition import AssetConditionEvaluationResult + from .asset_condition import AssetConditionEvaluationInfo handled_subset = self.get_handled_subset(context) unhandled_candidates = ( @@ -619,12 +623,12 @@ def evaluate_for_asset( else context.candidate_subset ) - return AssetConditionEvaluationResult.create( + return AssetConditionEvaluationInfo.create( context, true_subset=unhandled_candidates, # we keep track of the handled subset instead of the unhandled subset because new # partitions may spontaneously jump into existence at any time - extra_value=handled_subset, + extra_state=handled_subset, ) @@ -640,8 +644,8 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionEvaluationInfo": + from .asset_condition import AssetConditionEvaluationInfo asset_partitions_by_evaluation_data = defaultdict(set) @@ -671,7 +675,7 @@ def evaluate_for_asset( true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) - return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) + return AssetConditionEvaluationInfo.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -687,8 +691,8 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext, - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionEvaluationInfo": + from .asset_condition import AssetConditionEvaluationInfo asset_partitions_by_evaluation_data = defaultdict(set) @@ -721,7 +725,7 @@ def evaluate_for_asset( true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) - return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) + return AssetConditionEvaluationInfo.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -757,8 +761,8 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext, - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionEvaluationInfo": + from .asset_condition import AssetConditionEvaluationInfo asset_partitions_by_evaluation_data = defaultdict(set) @@ -810,7 +814,7 @@ def evaluate_for_asset( true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) - return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) + return AssetConditionEvaluationInfo.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -827,8 +831,8 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionEvaluationInfo": + from .asset_condition import AssetConditionEvaluationInfo asset_partitions_by_evaluation_data = defaultdict(set) @@ -855,7 +859,7 @@ def evaluate_for_asset( true_subset, subsets_with_metadata = context.add_evaluation_data_from_previous_tick( asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) - return AssetConditionEvaluationResult.create(context, true_subset, subsets_with_metadata) + return AssetConditionEvaluationInfo.create(context, true_subset, subsets_with_metadata) @whitelist_for_serdes @@ -876,8 +880,8 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionEvaluationInfo": + from .asset_condition import AssetConditionEvaluationInfo backfilling_subset = ( context.instance_queryer.get_active_backfill_target_asset_graph_subset() @@ -890,7 +894,7 @@ def evaluate_for_asset( else: true_subset = context.candidate_subset & backfilling_subset - return AssetConditionEvaluationResult.create(context, true_subset) + return AssetConditionEvaluationInfo.create(context, true_subset) @whitelist_for_serdes @@ -907,8 +911,8 @@ def description(self) -> str: def evaluate_for_asset( self, context: AssetConditionEvaluationContext - ) -> "AssetConditionEvaluationResult": - from .asset_condition import AssetConditionEvaluationResult + ) -> "AssetConditionEvaluationInfo": + from .asset_condition import AssetConditionEvaluationInfo # the set of asset partitions which exceed the limit rate_limited_asset_partitions = set( @@ -918,7 +922,7 @@ def evaluate_for_asset( )[self.limit :] ) - return AssetConditionEvaluationResult.create( + return AssetConditionEvaluationInfo.create( context, AssetSubset.from_asset_partitions_set( context.asset_key, context.partitions_def, rate_limited_asset_partitions diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py index 5514245d5861a..cf473fefc1060 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_evaluation.py @@ -38,7 +38,7 @@ from dagster._core.definitions.asset_condition import AssetSubsetWithMetadata from .asset_condition import ( - AssetConditionEvaluation, + AssetConditionEvaluationResult, AssetConditionEvaluationWithRunIds, AssetConditionSnapshot, ) @@ -243,9 +243,9 @@ def _get_child_rule_evaluation( ], is_partitioned: bool, rule_snapshot: AutoMaterializeRuleSnapshot, - ) -> "AssetConditionEvaluation": + ) -> "AssetConditionEvaluationResult": from .asset_condition import ( - AssetConditionEvaluation, + AssetConditionEvaluationResult, AssetSubsetWithMetadata, HistoricalAllPartitionsSubset, ) @@ -269,7 +269,7 @@ def _get_child_rule_evaluation( asset_key, is_partitioned, serialized ) - return AssetConditionEvaluation( + return AssetConditionEvaluationResult( condition_snapshot=condition_snapshot, true_subset=true_subset, candidate_subset=HistoricalAllPartitionsSubset() @@ -289,9 +289,9 @@ def _get_child_decision_type_evaluation( rule_snapshots: Sequence[AutoMaterializeRuleSnapshot], is_partitioned: bool, decision_type: AutoMaterializeDecisionType, - ) -> Optional["AssetConditionEvaluation"]: + ) -> Optional["AssetConditionEvaluationResult"]: from .asset_condition import ( - AssetConditionEvaluation, + AssetConditionEvaluationResult, AssetConditionSnapshot, HistoricalAllPartitionsSubset, NotAssetCondition, @@ -332,7 +332,7 @@ def _get_child_decision_type_evaluation( if is_partitioned else AssetSubset.empty(asset_key, None) ) - evaluation = AssetConditionEvaluation( + evaluation = AssetConditionEvaluationResult( condition_snapshot=decision_type_snapshot, true_subset=reduce( operator.or_, (e.true_subset for e in child_evaluations), initial @@ -355,7 +355,7 @@ def _get_child_decision_type_evaluation( evaluation.condition_snapshot.unique_id, ] unique_id = hashlib.md5("".join(unique_id_parts).encode()).hexdigest() - return AssetConditionEvaluation( + return AssetConditionEvaluationResult( condition_snapshot=AssetConditionSnapshot( class_name=NotAssetCondition.__name__, description="Not", unique_id=unique_id ), @@ -381,7 +381,7 @@ def unpack( ) -> "AssetConditionEvaluationWithRunIds": from .asset_condition import ( AndAssetCondition, - AssetConditionEvaluation, + AssetConditionEvaluationResult, AssetConditionSnapshot, HistoricalAllPartitionsSubset, ) @@ -435,7 +435,7 @@ def unpack( class_name=AndAssetCondition.__name__, description="All of", unique_id=unique_id ) - return AssetConditionEvaluation( + return AssetConditionEvaluationResult( condition_snapshot=condition_snapshot, true_subset=reduce(operator.and_, (e.true_subset for e in child_evaluations)), candidate_subset=HistoricalAllPartitionsSubset() diff --git a/python_modules/dagster/dagster/_utils/test/schedule_storage.py b/python_modules/dagster/dagster/_utils/test/schedule_storage.py index 55428a668d28f..00fce9a9e9291 100644 --- a/python_modules/dagster/dagster/_utils/test/schedule_storage.py +++ b/python_modules/dagster/dagster/_utils/test/schedule_storage.py @@ -6,7 +6,7 @@ from dagster import StaticPartitionsDefinition from dagster._core.definitions.asset_condition import ( - AssetConditionEvaluation, + AssetConditionEvaluationResult, AssetConditionSnapshot, AssetSubsetWithMetadata, ) @@ -715,14 +715,14 @@ def test_auto_materialize_asset_evaluations(self, storage) -> None: storage.add_auto_materialize_asset_evaluations( evaluation_id=10, asset_evaluations=[ - AssetConditionEvaluation( + AssetConditionEvaluationResult( condition_snapshot=condition_snapshot, true_subset=AssetSubset(asset_key=AssetKey("asset_one"), value=False), candidate_subset=AssetSubset(asset_key=AssetKey("asset_one"), value=False), start_timestamp=0, end_timestamp=1, ).with_run_ids(set()), - AssetConditionEvaluation( + AssetConditionEvaluationResult( condition_snapshot=condition_snapshot, true_subset=AssetSubset(asset_key=AssetKey("asset_two"), value=True), candidate_subset=AssetSubset(asset_key=AssetKey("asset_two"), value=True), @@ -768,7 +768,7 @@ def test_auto_materialize_asset_evaluations(self, storage) -> None: storage.add_auto_materialize_asset_evaluations( evaluation_id=11, asset_evaluations=[ - AssetConditionEvaluation( + AssetConditionEvaluationResult( condition_snapshot=condition_snapshot, start_timestamp=0, end_timestamp=1, @@ -799,7 +799,7 @@ def test_auto_materialize_asset_evaluations(self, storage) -> None: # add a mix of keys - one that already is using the unique index and one that is not - eval_one = AssetConditionEvaluation( + eval_one = AssetConditionEvaluationResult( condition_snapshot=AssetConditionSnapshot("foo", "bar", ""), start_timestamp=0, end_timestamp=1, @@ -807,7 +807,7 @@ def test_auto_materialize_asset_evaluations(self, storage) -> None: candidate_subset=AssetSubset(asset_key=AssetKey("asset_one"), value=True), ).with_run_ids(set()) - eval_asset_three = AssetConditionEvaluation( + eval_asset_three = AssetConditionEvaluationResult( condition_snapshot=AssetConditionSnapshot("foo", "bar", ""), start_timestamp=0, end_timestamp=1, @@ -853,7 +853,7 @@ def test_auto_materialize_asset_evaluations_with_partitions(self, storage) -> No storage.add_auto_materialize_asset_evaluations( evaluation_id=10, asset_evaluations=[ - AssetConditionEvaluation( + AssetConditionEvaluationResult( condition_snapshot=AssetConditionSnapshot("foo", "bar", ""), start_timestamp=0, end_timestamp=1, @@ -881,7 +881,7 @@ def test_purge_asset_evaluations(self, storage) -> None: storage.add_auto_materialize_asset_evaluations( evaluation_id=11, asset_evaluations=[ - AssetConditionEvaluation( + AssetConditionEvaluationResult( condition_snapshot=AssetConditionSnapshot("foo", "bar", ""), start_timestamp=0, end_timestamp=1, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py index 56a3b43452f8d..a5aaf0b13d125 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py @@ -38,7 +38,7 @@ materialize, ) from dagster._core.definitions.asset_condition import ( - AssetConditionEvaluation, + AssetConditionEvaluationResult, AssetSubsetWithMetadata, ) from dagster._core.definitions.asset_daemon_context import ( @@ -197,7 +197,7 @@ class AssetDaemonScenarioState(NamedTuple): current_time: datetime.datetime = pendulum.now() run_requests: Sequence[RunRequest] = [] serialized_cursor: str = AssetDaemonCursor.empty(0).serialize() - evaluations: Sequence[AssetConditionEvaluation] = [] + evaluations: Sequence[AssetConditionEvaluationResult] = [] logger: logging.Logger = logging.getLogger("dagster.amp") tick_index: int = 1 # this is set by the scenario runner @@ -356,7 +356,7 @@ def with_dynamic_partitions( def _evaluate_tick_fast( self, - ) -> Tuple[Sequence[RunRequest], AssetDaemonCursor, Sequence[AssetConditionEvaluation]]: + ) -> Tuple[Sequence[RunRequest], AssetDaemonCursor, Sequence[AssetConditionEvaluationResult]]: cursor = AssetDaemonCursor.from_serialized(self.serialized_cursor, self.asset_graph) new_run_requests, new_cursor, new_evaluations = AssetDaemonContext( @@ -393,7 +393,7 @@ def _evaluate_tick_fast( def _evaluate_tick_daemon( self, - ) -> Tuple[Sequence[RunRequest], AssetDaemonCursor, Sequence[AssetConditionEvaluation]]: + ) -> Tuple[Sequence[RunRequest], AssetDaemonCursor, Sequence[AssetConditionEvaluationResult]]: target = InProcessTestWorkspaceLoadTarget(get_code_location_origin(self)) with create_test_daemon_workspace_context( @@ -521,7 +521,7 @@ def sort_run_request_key_fn(run_request) -> Tuple[AssetKey, Optional[str]]: return self def _assert_evaluation_daemon( - self, key: AssetKey, actual_evaluation: AssetConditionEvaluation + self, key: AssetKey, actual_evaluation: AssetConditionEvaluationResult ) -> None: """Additional assertions for daemon mode. Checks that the evaluation for the given asset contains the expected run ids. @@ -580,7 +580,9 @@ def assert_evaluation( if num_requested is not None: assert actual_evaluation.true_subset.size == num_requested - def get_leaf_evaluations(e: AssetConditionEvaluation) -> Sequence[AssetConditionEvaluation]: + def get_leaf_evaluations( + e: AssetConditionEvaluationResult, + ) -> Sequence[AssetConditionEvaluationResult]: if len(e.child_evaluations) == 0: return [e] leaf_evals = []