From 7561ebf7923a57d1c87d9bc0580874b877cb2f02 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Tue, 12 Dec 2023 17:19:40 -0800 Subject: [PATCH] AssetConditionCursor --- .../_core/definitions/asset_condition.py | 61 +++++--- .../asset_condition_evaluation_context.py | 129 +++++++---------- .../_core/definitions/asset_daemon_context.py | 19 +-- .../_core/definitions/asset_daemon_cursor.py | 130 ++++++++++++++++-- .../definitions/auto_materialize_rule.py | 6 +- .../asset_daemon_scenario.py | 5 + 6 files changed, 223 insertions(+), 127 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition.py b/python_modules/dagster/dagster/_core/definitions/asset_condition.py index 95951555bfbc3..592e541770548 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition.py @@ -13,6 +13,9 @@ ) import dagster._check as check +from dagster._core.definitions.asset_daemon_cursor import ( + AssetConditionCursorExtras, +) from dagster._core.definitions.events import AssetKey from dagster._core.definitions.metadata import MetadataMapping, MetadataValue from dagster._serdes.serdes import whitelist_for_serdes @@ -72,7 +75,7 @@ def equivalent_to_stored_evaluation(self, other: Optional["AssetConditionEvaluat and self.child_evaluations == other.child_evaluations ) - def discard_subset(self, condition: "AssetCondition") -> Optional[AssetSubset]: + def discarded_subset(self, condition: "AssetCondition") -> Optional[AssetSubset]: not_discard_condition = condition.not_discard_condition if not not_discard_condition or len(self.child_evaluations) != 3: return None @@ -81,6 +84,12 @@ def discard_subset(self, condition: "AssetCondition") -> Optional[AssetSubset]: discard_evaluation = not_discard_evaluation.child_evaluations[0] return discard_evaluation.true_subset + def get_requested_or_discarded_subset(self, condition: "AssetCondition") -> AssetSubset: + discarded_subset = self.discarded_subset(condition) + if discarded_subset is None: + return self.true_subset + return self.true_subset | discarded_subset + def for_child(self, child_condition: "AssetCondition") -> Optional["AssetConditionEvaluation"]: """Returns the evaluation of a given child condition by finding the child evaluation that has an identical hash to the given condition. @@ -129,7 +138,9 @@ def unique_id(self) -> str: return hashlib.md5("".join(parts).encode()).hexdigest() @abstractmethod - def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: + def evaluate( + self, context: AssetConditionEvaluationContext + ) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]: raise NotImplementedError() def __and__(self, other: "AssetCondition") -> "AssetCondition": @@ -192,7 +203,9 @@ def unique_id(self) -> str: parts = [self.rule.__class__.__name__, self.rule.description] return hashlib.md5("".join(parts).encode()).hexdigest() - def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: + def evaluate( + self, context: AssetConditionEvaluationContext + ) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]: context.root_context.daemon_context._verbose_log_fn( # noqa f"Evaluating rule: {self.rule.to_snapshot()}" ) @@ -205,7 +218,7 @@ def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEv true_subset=true_subset, candidate_subset=context.candidate_subset, subsets_with_metadata=subsets_with_metadata, - ) + ), [AssetConditionCursorExtras(condition_snapshot=self.snapshot, extras={})] class AndAssetCondition( @@ -214,20 +227,24 @@ class AndAssetCondition( ): """This class represents the condition that all of its children evaluate to true.""" - def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: + def evaluate( + self, context: AssetConditionEvaluationContext + ) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]: child_evaluations: List[AssetConditionEvaluation] = [] + child_extras: List[AssetConditionCursorExtras] = [] true_subset = context.candidate_subset for child in self.children: child_context = context.for_child(condition=child, candidate_subset=true_subset) - result = child.evaluate(child_context) - child_evaluations.append(result) - true_subset &= result.true_subset + child_evaluation, child_extra = child.evaluate(child_context) + child_evaluations.append(child_evaluation) + child_extras.extend(child_extra) + true_subset &= child_evaluation.true_subset return AssetConditionEvaluation( condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, child_evaluations=child_evaluations, - ) + ), child_extras class OrAssetCondition( @@ -236,22 +253,26 @@ class OrAssetCondition( ): """This class represents the condition that any of its children evaluate to true.""" - def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: + def evaluate( + self, context: AssetConditionEvaluationContext + ) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]: child_evaluations: List[AssetConditionEvaluation] = [] + child_extras: List[AssetConditionCursorExtras] = [] true_subset = context.empty_subset() for child in self.children: child_context = context.for_child( condition=child, candidate_subset=context.candidate_subset ) - result = child.evaluate(child_context) - child_evaluations.append(result) - true_subset |= result.true_subset + child_evaluation, child_extra = child.evaluate(child_context) + child_evaluations.append(child_evaluation) + child_extras.extend(child_extra) + true_subset |= child_evaluation.true_subset return AssetConditionEvaluation( condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, child_evaluations=child_evaluations, - ) + ), child_extras class NotAssetCondition( @@ -268,16 +289,18 @@ def __new__(cls, children: Sequence[AssetCondition]): def child(self) -> AssetCondition: return self.children[0] - def evaluate(self, context: AssetConditionEvaluationContext) -> AssetConditionEvaluation: + def evaluate( + self, context: AssetConditionEvaluationContext + ) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]: child_context = context.for_child( condition=self.child, candidate_subset=context.candidate_subset ) - result = self.child.evaluate(child_context) - true_subset = context.candidate_subset - result.true_subset + child_evaluation, child_extras = self.child.evaluate(child_context) + true_subset = context.candidate_subset - child_evaluation.true_subset return AssetConditionEvaluation( condition_snapshot=self.snapshot, true_subset=true_subset, candidate_subset=context.candidate_subset, - child_evaluations=[result], - ) + child_evaluations=[child_evaluation], + ), child_extras diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py index e8593ab139ec3..b5852fb37f245 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition_evaluation_context.py @@ -11,14 +11,12 @@ from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping from dagster._utils.caching_instance_queryer import CachingInstanceQueryer -from .asset_daemon_cursor import AssetDaemonAssetCursor +from .asset_daemon_cursor import AssetConditionCursor from .asset_graph import AssetGraph from .asset_subset import AssetSubset if TYPE_CHECKING: - from dagster._core.definitions.asset_condition import AssetSubsetWithMetadata - - from .asset_condition import AssetCondition, AssetConditionEvaluation + from .asset_condition import AssetCondition, AssetConditionEvaluation, AssetSubsetWithMetadata from .asset_daemon_context import AssetDaemonContext @@ -41,8 +39,8 @@ class AssetConditionEvaluationContext: asset_key: AssetKey condition: "AssetCondition" - asset_cursor: Optional[AssetDaemonAssetCursor] - previous_evaluation: Optional["AssetConditionEvaluation"] + cursor: AssetConditionCursor + previous_condition_evaluation: Optional["AssetConditionEvaluation"] candidate_subset: AssetSubset instance_queryer: CachingInstanceQueryer @@ -58,21 +56,23 @@ class AssetConditionEvaluationContext: def create( asset_key: AssetKey, condition: "AssetCondition", - asset_cursor: Optional[AssetDaemonAssetCursor], + cursor: AssetConditionCursor, instance_queryer: CachingInstanceQueryer, data_time_resolver: CachingDataTimeResolver, daemon_context: "AssetDaemonContext", evaluation_results_by_key: Mapping[AssetKey, "AssetConditionEvaluation"], expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]], ) -> "AssetConditionEvaluationContext": + partitions_def = instance_queryer.asset_graph.get_partitions_def(asset_key) + return AssetConditionEvaluationContext( asset_key=asset_key, condition=condition, - asset_cursor=asset_cursor, - previous_evaluation=asset_cursor.previous_evaluation if asset_cursor else None, + cursor=cursor, + previous_condition_evaluation=cursor.previous_evaluation, candidate_subset=AssetSubset.all( asset_key, - instance_queryer.asset_graph.get_partitions_def(asset_key), + partitions_def, instance_queryer, instance_queryer.evaluation_time, ), @@ -89,10 +89,10 @@ def for_child( return dataclasses.replace( self, condition=condition, - candidate_subset=candidate_subset, - previous_evaluation=self.previous_evaluation.for_child(condition) - if self.previous_evaluation + previous_condition_evaluation=self.previous_condition_evaluation.for_child(condition) + if self.previous_condition_evaluation else None, + candidate_subset=candidate_subset, root_ref=self.root_context, ) @@ -116,15 +116,23 @@ def evaluation_time(self) -> datetime.datetime: @property def previous_max_storage_id(self) -> Optional[int]: - if not self.asset_cursor: - return None - return self.asset_cursor.previous_max_storage_id + return self.cursor.previous_max_storage_id @property def previous_evaluation_timestamp(self) -> Optional[float]: - if not self.asset_cursor: - return None - return self.asset_cursor.previous_evaluation_timestamp + return self.cursor.previous_evaluation_timestamp + + @property + def previous_true_subset(self) -> AssetSubset: + if self.previous_condition_evaluation is None: + return self.empty_subset() + return self.previous_condition_evaluation.true_subset + + @property + def previous_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetadata"]: + if self.previous_condition_evaluation is None: + return [] + return self.previous_condition_evaluation.subsets_with_metadata @functools.cached_property @root_property @@ -143,16 +151,7 @@ def parent_will_update_subset(self) -> AssetSubset: subset |= parent_subset._replace(asset_key=self.asset_key) return subset - @functools.cached_property - @root_property - def previous_tick_requested_subset(self) -> AssetSubset: - """Returns the set of asset partitions that were requested on the previous tick.""" - if not self.previous_evaluation: - return self.empty_subset() - return self.previous_evaluation.true_subset - - @functools.cached_property - @root_property + @property def materialized_since_previous_tick_subset(self) -> AssetSubset: """Returns the set of asset partitions that were materialized since the previous tick.""" return AssetSubset.from_asset_partitions_set( @@ -161,35 +160,35 @@ def materialized_since_previous_tick_subset(self) -> AssetSubset: self.instance_queryer.get_asset_partitions_updated_after_cursor( self.asset_key, asset_partitions=None, - after_cursor=self.asset_cursor.previous_max_storage_id - if self.asset_cursor - else None, + after_cursor=self.cursor.previous_max_storage_id if self.cursor else None, respect_materialization_data_versions=False, ), ) - @functools.cached_property - @root_property + @property + def previous_tick_requested_or_discarded_subset(self) -> AssetSubset: + if not self.cursor.previous_evaluation: + return self.empty_subset() + return self.cursor.previous_evaluation.get_requested_or_discarded_subset( + self.root_context.condition + ) + + @property def materialized_requested_or_discarded_since_previous_tick_subset(self) -> AssetSubset: """Returns the set of asset partitions that were materialized since the previous tick.""" - if not self.previous_evaluation: - return self.materialized_since_previous_tick_subset return ( self.materialized_since_previous_tick_subset - | self.previous_evaluation.true_subset - | (self.previous_evaluation.discard_subset(self.condition) or self.empty_subset()) + | self.previous_tick_requested_or_discarded_subset ) - @functools.cached_property - @root_property + @property def never_materialized_requested_or_discarded_root_subset(self) -> AssetSubset: if self.asset_key not in self.asset_graph.root_materializable_or_observable_asset_keys: return self.empty_subset() handled_subset = ( - self.asset_cursor.materialized_requested_or_discarded_subset - if self.asset_cursor - else self.empty_subset() + self.cursor.get_extras_value(self.condition, "handled_subset", AssetSubset) + or self.empty_subset() ) unhandled_subset = handled_subset.inverse( self.partitions_def, @@ -199,7 +198,6 @@ def never_materialized_requested_or_discarded_root_subset(self) -> AssetSubset: return unhandled_subset - self.materialized_since_previous_tick_subset @property - @root_property def parent_has_updated_subset(self) -> AssetSubset: """Returns the set of asset partitions whose parents have updated since the last time this condition was evaluated. @@ -228,21 +226,13 @@ def candidates_not_evaluated_on_previous_tick_subset(self) -> AssetSubset: """Returns the set of candidates for this tick which were not candidates on the previous tick. """ - if not self.previous_evaluation or not self.previous_evaluation.candidate_subset: + if not self.previous_condition_evaluation: return self.candidate_subset - return self.candidate_subset - self.previous_evaluation.candidate_subset - - @property - def previous_tick_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetadata"]: - """Returns the RuleEvaluationResults calculated on the previous tick for this condition.""" - return self.previous_evaluation.subsets_with_metadata if self.previous_evaluation else [] - - @property - def previous_tick_true_subset(self) -> AssetSubset: - """Returns the set of asset partitions that were true for this condition on the previous tick.""" - if not self.previous_evaluation: + # when the candidate_subset is None, this indicates that the entire asset was evaluated + # for this condition on the previous tick + elif self.previous_condition_evaluation.candidate_subset is None: return self.empty_subset() - return self.previous_evaluation.true_subset + return self.candidate_subset - self.previous_condition_evaluation.candidate_subset def materializable_in_same_run(self, child_key: AssetKey, parent_key: AssetKey) -> bool: """Returns whether a child asset can be materialized in the same run as a parent asset.""" @@ -296,28 +286,3 @@ def will_update_asset_partition(self, asset_partition: AssetKeyPartitionKey) -> def empty_subset(self) -> AssetSubset: return AssetSubset.empty(self.asset_key, self.partitions_def) - - def get_new_asset_cursor( - self, evaluation: "AssetConditionEvaluation" - ) -> AssetDaemonAssetCursor: - """Returns a new AssetDaemonAssetCursor based on the current cursor and the results of - this tick's evaluation. - """ - previous_handled_subset = ( - self.asset_cursor.materialized_requested_or_discarded_subset - if self.asset_cursor - else self.empty_subset() - ) - new_handled_subset = ( - previous_handled_subset - | self.materialized_requested_or_discarded_since_previous_tick_subset - | evaluation.true_subset - | (evaluation.discard_subset(self.condition) or self.empty_subset()) - ) - return AssetDaemonAssetCursor( - asset_key=self.asset_key, - previous_max_storage_id=self.daemon_context.get_new_latest_storage_id(), - previous_evaluation=evaluation, - previous_evaluation_timestamp=self.evaluation_time.timestamp(), - materialized_requested_or_discarded_subset=new_handled_subset, - ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 36feb348b25ee..c6ebc6c22addd 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -39,7 +39,7 @@ from .asset_condition_evaluation_context import ( AssetConditionEvaluationContext, ) -from .asset_daemon_cursor import AssetDaemonAssetCursor, AssetDaemonCursor +from .asset_daemon_cursor import AssetConditionCursor, AssetDaemonCursor from .asset_graph import AssetGraph from .auto_materialize_rule import AutoMaterializeRule from .backfill_policy import BackfillPolicy, BackfillPolicyType @@ -223,7 +223,7 @@ def evaluate_asset( asset_key: AssetKey, evaluation_results_by_key: Mapping[AssetKey, AssetConditionEvaluation], expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]], - ) -> Tuple[AssetConditionEvaluation, AssetDaemonAssetCursor, Optional[datetime.datetime]]: + ) -> Tuple[AssetConditionEvaluation, AssetConditionCursor, Optional[datetime.datetime]]: """Evaluates the auto materialize policy of a given asset key. Params: @@ -241,9 +241,11 @@ def evaluate_asset( self.asset_graph.auto_materialize_policies_by_key.get(asset_key) ).to_asset_condition() + asset_cursor = self.cursor.asset_cursor_for_key(asset_key, self.asset_graph) + context = AssetConditionEvaluationContext.create( asset_key=asset_key, - asset_cursor=self.cursor.asset_cursor_for_key(asset_key, self.asset_graph), + cursor=self.cursor.asset_cursor_for_key(asset_key, self.asset_graph), condition=asset_condition, instance_queryer=self.instance_queryer, data_time_resolver=self.data_time_resolver, @@ -252,26 +254,27 @@ def evaluate_asset( expected_data_time_mapping=expected_data_time_mapping, ) - evaluation = asset_condition.evaluate(context) - asset_cursor = context.get_new_asset_cursor(evaluation=evaluation) + evaluation, condition_cursor = asset_condition.evaluate(context) + + new_asset_cursor = asset_cursor.with_updates(context, evaluation) expected_data_time = get_expected_data_time_for_asset_key( context, will_materialize=evaluation.true_subset.size > 0 ) - return evaluation, asset_cursor, expected_data_time + return evaluation, new_asset_cursor, expected_data_time def get_asset_condition_evaluations( self, ) -> Tuple[ Sequence[AssetConditionEvaluation], - Sequence[AssetDaemonAssetCursor], + Sequence[AssetConditionCursor], AbstractSet[AssetKeyPartitionKey], ]: """Returns a mapping from asset key to the AutoMaterializeAssetEvaluation for that key, a sequence of new per-asset cursors, and the set of all asset partitions that should be materialized or discarded this tick. """ - asset_cursors: List[AssetDaemonAssetCursor] = [] + asset_cursors: List[AssetConditionCursor] = [] evaluation_results_by_key: Dict[AssetKey, AssetConditionEvaluation] = {} expected_data_time_mapping: Dict[AssetKey, Optional[datetime.datetime]] = defaultdict() diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index ee7fae4163a00..67d584d8ce069 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -7,6 +7,8 @@ NamedTuple, Optional, Sequence, + Type, + TypeVar, ) import dagster._check as check @@ -15,26 +17,116 @@ TimeWindowPartitionsDefinition, TimeWindowPartitionsSubset, ) -from dagster._serdes.serdes import deserialize_value, serialize_value +from dagster._serdes.serdes import PackableValue, deserialize_value, serialize_value from .asset_graph import AssetGraph from .asset_subset import AssetSubset -from .partition import PartitionsSubset +from .partition import PartitionsDefinition, PartitionsSubset if TYPE_CHECKING: - from .asset_condition import AssetConditionEvaluation + from .asset_condition import AssetCondition, AssetConditionEvaluation, AssetConditionSnapshot + from .asset_condition_evaluation_context import AssetConditionEvaluationContext +ExtrasDict = Mapping[str, PackableValue] -class AssetDaemonAssetCursor(NamedTuple): +T = TypeVar("T") + + +def _get_placeholder_missing_condition() -> "AssetCondition": + """Temporary hard-coding of the hash of the "materialize on missing" condition. This will + no longer be necessary once we start serializing the AssetDaemonCursor. + """ + from .asset_condition import RuleCondition + from .auto_materialize_rule import MaterializeOnMissingRule + + return RuleCondition(MaterializeOnMissingRule()) + + +_PLACEHOLDER_HANDLED_SUBSET_KEY = "handled_subset" + + +class AssetConditionCursorExtras(NamedTuple): + """Class to represent additional unstructured information that may be tracked by a particular + asset condition. + """ + + condition_snapshot: "AssetConditionSnapshot" + extras: ExtrasDict + + +class AssetConditionCursor(NamedTuple): """Convenience class to represent the state of an individual asset being handled by the daemon. In the future, this will be serialized as part of the cursor. """ asset_key: AssetKey + previous_evaluation: Optional["AssetConditionEvaluation"] previous_max_storage_id: Optional[int] previous_evaluation_timestamp: Optional[float] - previous_evaluation: Optional["AssetConditionEvaluation"] - materialized_requested_or_discarded_subset: AssetSubset + + extras: Sequence[AssetConditionCursorExtras] + + @staticmethod + def empty(asset_key: AssetKey) -> "AssetConditionCursor": + return AssetConditionCursor( + asset_key=asset_key, + previous_evaluation=None, + previous_max_storage_id=None, + previous_evaluation_timestamp=None, + extras=[], + ) + + def get_extras_value( + self, condition: "AssetCondition", key: str, as_type: Type[T] + ) -> Optional[T]: + """Returns a value from the extras dict for the given condition, if it exists and is of the + expected type. Otherwise, returns None. + """ + for condition_extras in self.extras: + if condition_extras.condition_snapshot == condition.snapshot: + extras_value = condition_extras.extras.get(key) + if isinstance(extras_value, as_type): + return extras_value + return None + return None + + def get_previous_requested_or_discarded_subset( + self, condition: "AssetCondition", partitions_def: Optional[PartitionsDefinition] + ) -> AssetSubset: + if not self.previous_evaluation: + return AssetSubset.empty(self.asset_key, partitions_def) + return self.previous_evaluation.get_requested_or_discarded_subset(condition) + + @property + def handled_subset(self) -> Optional[AssetSubset]: + return self.get_extras_value( + condition=_get_placeholder_missing_condition(), + key=_PLACEHOLDER_HANDLED_SUBSET_KEY, + as_type=AssetSubset, + ) + + def with_updates( + self, context: "AssetConditionEvaluationContext", evaluation: "AssetConditionEvaluation" + ) -> "AssetConditionCursor": + newly_materialized_requested_or_discarded_subset = ( + context.materialized_since_previous_tick_subset + | evaluation.get_requested_or_discarded_subset(context.condition) + ) + + handled_subset = ( + self.handled_subset or context.empty_subset() + ) | newly_materialized_requested_or_discarded_subset + + # for now, hard-code the materialized_requested_or_discarded_subset location + return self._replace( + previous_evaluation=evaluation, + extras=[ + AssetConditionCursorExtras( + condition_snapshot=_get_placeholder_missing_condition().snapshot, + extras={_PLACEHOLDER_HANDLED_SUBSET_KEY: handled_subset}, + ) + ], + ) class AssetDaemonCursor(NamedTuple): @@ -69,7 +161,7 @@ def was_previously_handled(self, asset_key: AssetKey) -> bool: def asset_cursor_for_key( self, asset_key: AssetKey, asset_graph: AssetGraph - ) -> AssetDaemonAssetCursor: + ) -> AssetConditionCursor: partitions_def = asset_graph.get_partitions_def(asset_key) handled_partitions_subset = self.handled_root_partitions_by_asset_key.get(asset_key) if handled_partitions_subset is not None: @@ -78,12 +170,19 @@ def asset_cursor_for_key( handled_subset = AssetSubset(asset_key=asset_key, value=True) else: handled_subset = AssetSubset.empty(asset_key, partitions_def) - return AssetDaemonAssetCursor( + + previous_evaluation = self.latest_evaluation_by_asset_key.get(asset_key) + return AssetConditionCursor( asset_key=asset_key, + previous_evaluation=previous_evaluation, previous_max_storage_id=self.latest_storage_id, previous_evaluation_timestamp=self.latest_evaluation_timestamp, - previous_evaluation=self.latest_evaluation_by_asset_key.get(asset_key), - materialized_requested_or_discarded_subset=handled_subset, + extras=[ + AssetConditionCursorExtras( + condition_snapshot=_get_placeholder_missing_condition().snapshot, + extras={"handled_subset": handled_subset}, + ) + ], ) def with_updates( @@ -94,7 +193,7 @@ def with_updates( observe_request_timestamp: float, evaluations: Sequence["AssetConditionEvaluation"], evaluation_time: datetime.datetime, - asset_cursors: Sequence[AssetDaemonAssetCursor], + asset_cursors: Sequence[AssetConditionCursor], ) -> "AssetDaemonCursor": """Returns a cursor that represents this cursor plus the updates that have happened within the tick. @@ -122,13 +221,14 @@ def with_updates( handled_root_asset_keys={ cursor.asset_key for cursor in asset_cursors - if not cursor.materialized_requested_or_discarded_subset.is_partitioned - and cursor.materialized_requested_or_discarded_subset.bool_value + if cursor.handled_subset is not None + and not cursor.handled_subset.is_partitioned + and cursor.handled_subset.bool_value }, handled_root_partitions_by_asset_key={ - cursor.asset_key: cursor.materialized_requested_or_discarded_subset.subset_value + cursor.asset_key: cursor.handled_subset.subset_value for cursor in asset_cursors - if cursor.materialized_requested_or_discarded_subset.is_partitioned + if cursor.handled_subset is not None and cursor.handled_subset.is_partitioned }, evaluation_id=evaluation_id, last_observe_request_timestamp_by_asset_key=result_last_observe_request_timestamp_by_asset_key, diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 7fa3da5bb6b06..012dc7727ec4e 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -109,7 +109,7 @@ def add_evaluation_data_from_previous_tick( # we've explicitly said to ignore it ignore_subset = has_metadata_subset | ignore_subset - for elt in context.previous_tick_subsets_with_metadata: + for elt in context.previous_subsets_with_metadata or []: carry_forward_subset = elt.subset - ignore_subset if carry_forward_subset.size > 0: mapping[elt.frozen_metadata] |= carry_forward_subset @@ -396,7 +396,7 @@ def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEv asset_subset_to_request = AssetSubset.from_asset_partitions_set( context.asset_key, context.partitions_def, new_asset_partitions_to_request ) | ( - context.previous_tick_true_subset + context.previous_true_subset - context.materialized_requested_or_discarded_since_previous_tick_subset ) @@ -626,7 +626,7 @@ def evaluate_for_asset(self, context: AssetConditionEvaluationContext) -> RuleEv context.asset_key, context.partitions_def, missing_asset_partitions ) missing_subset = newly_missing_subset | ( - context.previous_tick_true_subset + context.previous_true_subset - context.materialized_requested_or_discarded_since_previous_tick_subset ) return missing_subset, [] diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py index 2949653c83c2e..38b03ca6ff7bf 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py @@ -191,6 +191,7 @@ class AssetDaemonScenarioState(NamedTuple): serialized_cursor: str = AssetDaemonCursor.empty().serialize() evaluations: Sequence[AssetConditionEvaluation] = [] logger: logging.Logger = logging.getLogger("dagster.amp") + tick_index: int = 1 # this is set by the scenario runner scenario_instance: Optional[DagsterInstance] = None is_daemon: bool = False @@ -406,6 +407,9 @@ def _evaluate_tick_daemon( return new_run_requests, new_cursor, new_evaluations def evaluate_tick(self) -> "AssetDaemonScenarioState": + self.logger.critical("********************************") + self.logger.critical(f"EVALUATING TICK {self.tick_index}") + self.logger.critical("********************************") with pendulum.test(self.current_time): if self.is_daemon: new_run_requests, new_cursor, new_evaluations = self._evaluate_tick_daemon() @@ -416,6 +420,7 @@ def evaluate_tick(self) -> "AssetDaemonScenarioState": run_requests=new_run_requests, serialized_cursor=new_cursor.serialize(), evaluations=new_evaluations, + tick_index=self.tick_index + 1, ) def _log_assertion_error(self, expected: Sequence[Any], actual: Sequence[Any]) -> None: