diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py index 7b5f84d41cc9e..d21f24a0eabf8 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py @@ -538,7 +538,6 @@ def test_launch_asset_backfill_with_upstream_anchor_asset(): target_subset = asset_backfill_data.target_subset asset_graph = target_subset.asset_graph assert target_subset == AssetGraphSubset( - target_subset.asset_graph, partitions_subsets_by_asset_key={ AssetKey("hourly"): asset_graph.get_partitions_def( AssetKey("hourly") @@ -607,7 +606,6 @@ def test_launch_asset_backfill_with_two_anchor_assets(): target_subset = asset_backfill_data.target_subset asset_graph = target_subset.asset_graph assert target_subset == AssetGraphSubset( - target_subset.asset_graph, partitions_subsets_by_asset_key={ AssetKey("hourly1"): asset_graph.get_partitions_def( AssetKey("hourly1") @@ -665,7 +663,6 @@ def test_launch_asset_backfill_with_upstream_anchor_asset_and_non_partitioned_as target_subset = asset_backfill_data.target_subset asset_graph = target_subset.asset_graph assert target_subset == AssetGraphSubset( - target_subset.asset_graph, non_partitioned_asset_keys={AssetKey("non_partitioned")}, partitions_subsets_by_asset_key={ AssetKey("hourly"): ( diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index db9d8d3cced21..f6cffd301b897 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -234,6 +234,7 @@ def _execute_asset_backfill_iteration_no_side_effects( updated_backfill = backfill.with_asset_backfill_data( cast(AssetBackfillIterationResult, result).backfill_data, dynamic_partitions_store=graphql_context.instance, + asset_graph=asset_graph, ) graphql_context.instance.update_backfill(updated_backfill) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 3f1524a0d88b9..cbfdef19112c5 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -579,7 +579,7 @@ def bfs_filter_subsets( else None ), } - result = AssetGraphSubset(self) + result = AssetGraphSubset() while len(queue) > 0: asset_key = queue.popleft() @@ -587,7 +587,6 @@ def bfs_filter_subsets( if condition_fn(asset_key, partitions_subset): result |= AssetGraphSubset( - self, non_partitioned_asset_keys={asset_key} if partitions_subset is None else set(), partitions_subsets_by_asset_key=( {asset_key: partitions_subset} if partitions_subset is not None else {} diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py index efe084745b58f..64ace5667681c 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py @@ -1,70 +1,122 @@ import operator from collections import defaultdict from datetime import datetime -from typing import AbstractSet, Any, Callable, Dict, Iterable, Mapping, Optional, Set, Union, cast +from typing import ( + AbstractSet, + Any, + Callable, + Dict, + Iterable, + Mapping, + NamedTuple, + Optional, + Set, + Union, + cast, +) from dagster import _check as check from dagster._core.definitions.partition import ( PartitionsDefinition, PartitionsSubset, ) +from dagster._core.definitions.time_window_partitions import ( + TimePartitionKeyPartitionsSubset, + TimeWindowPartitionsSubset, +) from dagster._core.errors import ( DagsterDefinitionChangedDeserializationError, ) from dagster._core.instance import DynamicPartitionsStore +from dagster._serdes.serdes import ( + FieldSerializer, + deserialize_value, + serialize_value, + whitelist_for_serdes, +) from .asset_graph import AssetGraph from .events import AssetKey, AssetKeyPartitionKey -class AssetGraphSubset: - def __init__( +class PartitionsSubsetByAssetKeySerializer(FieldSerializer): + """Packs and unpacks a mapping from AssetKey to PartitionsSubset. + + In JSON, a key must be a str, int, float, bool, or None. This serializer packs the AssetKey + into a str, and unpacks it back into an AssetKey. + + It also converts TimePartitionKeyPartitionsSubsets into serializable TimeWindowPartitionsSubsets. + """ + + def pack(self, mapping: Mapping[AssetKey, Any], **_kwargs) -> Mapping[str, Any]: + return { + serialize_value(key): serialize_value( + value.to_time_window_partitions_subset() + if isinstance(value, TimePartitionKeyPartitionsSubset) + else value + ) + for key, value in mapping.items() + } + + def unpack( self, - asset_graph: AssetGraph, + mapping: Mapping[str, Any], + **_kwargs, + ) -> Mapping[AssetKey, Any]: + return { + deserialize_value(key, AssetKey): deserialize_value(value, TimeWindowPartitionsSubset) + for key, value in mapping.items() + } + + +@whitelist_for_serdes( + field_serializers={"partitions_subsets_by_asset_key": PartitionsSubsetByAssetKeySerializer} +) +class AssetGraphSubset( + NamedTuple( + "_AssetGraphSubset", + [ + ("partitions_subsets_by_asset_key", Mapping[AssetKey, PartitionsSubset]), + ("non_partitioned_asset_keys", AbstractSet[AssetKey]), + ], + ) +): + def __new__( + cls, partitions_subsets_by_asset_key: Optional[Mapping[AssetKey, PartitionsSubset]] = None, non_partitioned_asset_keys: Optional[AbstractSet[AssetKey]] = None, ): - self._asset_graph = asset_graph - self._partitions_subsets_by_asset_key = partitions_subsets_by_asset_key or {} - self._non_partitioned_asset_keys = non_partitioned_asset_keys or set() - - @property - def asset_graph(self) -> AssetGraph: - return self._asset_graph - - @property - def partitions_subsets_by_asset_key(self) -> Mapping[AssetKey, PartitionsSubset]: - return self._partitions_subsets_by_asset_key - - @property - def non_partitioned_asset_keys(self) -> AbstractSet[AssetKey]: - return self._non_partitioned_asset_keys + return super(AssetGraphSubset, cls).__new__( + cls, + partitions_subsets_by_asset_key=partitions_subsets_by_asset_key or {}, + non_partitioned_asset_keys=non_partitioned_asset_keys or set(), + ) @property def asset_keys(self) -> AbstractSet[AssetKey]: return { key for key, subset in self.partitions_subsets_by_asset_key.items() if len(subset) > 0 - } | self._non_partitioned_asset_keys + } | self.non_partitioned_asset_keys @property def num_partitions_and_non_partitioned_assets(self): - return len(self._non_partitioned_asset_keys) + sum( - len(subset) for subset in self._partitions_subsets_by_asset_key.values() + return len(self.non_partitioned_asset_keys) + sum( + len(subset) for subset in self.partitions_subsets_by_asset_key.values() ) def get_partitions_subset(self, asset_key: AssetKey) -> PartitionsSubset: - partitions_def = self.asset_graph.get_partitions_def(asset_key) - if partitions_def is None: - check.failed("Can only call get_partitions_subset on a partitioned asset") + # partitions_def = asset_graph.get_partitions_def(asset_key) + # if partitions_def is None: + # check.failed("Can only call get_partitions_subset on a partitioned asset") - return self.partitions_subsets_by_asset_key.get(asset_key, partitions_def.empty_subset()) + return self.partitions_subsets_by_asset_key[asset_key] def iterate_asset_partitions(self) -> Iterable[AssetKeyPartitionKey]: for asset_key, partitions_subset in self.partitions_subsets_by_asset_key.items(): for partition_key in partitions_subset.get_partition_keys(): yield AssetKeyPartitionKey(asset_key, partition_key) - for asset_key in self._non_partitioned_asset_keys: + for asset_key in self.non_partitioned_asset_keys: yield AssetKeyPartitionKey(asset_key, None) def __contains__(self, asset: Union[AssetKey, AssetKeyPartitionKey]) -> bool: @@ -74,19 +126,18 @@ def __contains__(self, asset: Union[AssetKey, AssetKeyPartitionKey]) -> bool: """ if isinstance(asset, AssetKey): # check if any keys are in the subset - if self.asset_graph.is_partitioned(asset): - partitions_subset = self.partitions_subsets_by_asset_key.get(asset) - return partitions_subset is not None and len(partitions_subset) > 0 - else: - return asset in self._non_partitioned_asset_keys + partitions_subset = self.partitions_subsets_by_asset_key.get(asset) + return (partitions_subset is not None and len(partitions_subset) > 0) or ( + asset in self.non_partitioned_asset_keys + ) elif asset.partition_key is None: - return asset.asset_key in self._non_partitioned_asset_keys + return asset.asset_key in self.non_partitioned_asset_keys else: partitions_subset = self.partitions_subsets_by_asset_key.get(asset.asset_key) return partitions_subset is not None and asset.partition_key in partitions_subset def to_storage_dict( - self, dynamic_partitions_store: DynamicPartitionsStore + self, dynamic_partitions_store: DynamicPartitionsStore, asset_graph: AssetGraph ) -> Mapping[str, object]: return { "partitions_subsets_by_asset_key": { @@ -95,7 +146,7 @@ def to_storage_dict( }, "serializable_partitions_def_ids_by_asset_key": { key.to_user_string(): check.not_none( - self._asset_graph.get_partitions_def(key) + asset_graph.get_partitions_def(key) ).get_serializable_unique_identifier( dynamic_partitions_store=dynamic_partitions_store ) @@ -103,28 +154,26 @@ def to_storage_dict( }, "partitions_def_class_names_by_asset_key": { key.to_user_string(): check.not_none( - self._asset_graph.get_partitions_def(key) + asset_graph.get_partitions_def(key) ).__class__.__name__ for key, _ in self.partitions_subsets_by_asset_key.items() }, "non_partitioned_asset_keys": [ - key.to_user_string() for key in self._non_partitioned_asset_keys + key.to_user_string() for key in self.non_partitioned_asset_keys ], } - def _oper( - self, other: Union["AssetGraphSubset", AbstractSet[AssetKeyPartitionKey]], oper: Callable - ) -> "AssetGraphSubset": + def _oper(self, other: "AssetGraphSubset", oper: Callable) -> "AssetGraphSubset": """Returns the AssetGraphSubset that results from applying the given operator to the set of asset partitions in self and other. Note: Not all operators are supported on the underlying PartitionsSubset objects. """ result_partition_subsets_by_asset_key = {**self.partitions_subsets_by_asset_key} - result_non_partitioned_asset_keys = set(self._non_partitioned_asset_keys) + result_non_partitioned_asset_keys = set(self.non_partitioned_asset_keys) - if not isinstance(other, AssetGraphSubset): - other = AssetGraphSubset.from_asset_partition_set(other, self.asset_graph) + # if not isinstance(other, AssetGraphSubset): + # other = AssetGraphSubset.from_asset_partition_set(other, self.asset_graph) for asset_key in other.asset_keys: if asset_key in other.non_partitioned_asset_keys: @@ -133,48 +182,62 @@ def _oper( result_non_partitioned_asset_keys, {asset_key} ) else: - subset = self.get_partitions_subset(asset_key) check.invariant(asset_key not in self.non_partitioned_asset_keys) - result_partition_subsets_by_asset_key[asset_key] = oper( - subset, other.get_partitions_subset(asset_key) + subset = ( + self.get_partitions_subset(asset_key) + if asset_key in self.partitions_subsets_by_asset_key + else None ) + other_subset = other.get_partitions_subset(asset_key) + if other_subset is None and subset is None: + pass + if subset is None and other_subset is not None: + if oper == operator.or_: + result_partition_subsets_by_asset_key[asset_key] = other_subset + elif oper == operator.sub: + pass + elif oper == operator.and_: + pass + else: + check.failed(f"Unsupported operator {oper}") + elif subset is not None and other_subset is None: + if oper == operator.or_: + pass + elif oper == operator.sub: + pass + elif oper == operator.and_: + del result_partition_subsets_by_asset_key[asset_key] + else: + result_partition_subsets_by_asset_key[asset_key] = oper(subset, other_subset) + return AssetGraphSubset( - self.asset_graph, result_partition_subsets_by_asset_key, result_non_partitioned_asset_keys, ) - def __or__( - self, other: Union["AssetGraphSubset", AbstractSet[AssetKeyPartitionKey]] - ) -> "AssetGraphSubset": + def __or__(self, other: "AssetGraphSubset") -> "AssetGraphSubset": return self._oper(other, operator.or_) - def __sub__( - self, other: Union["AssetGraphSubset", AbstractSet[AssetKeyPartitionKey]] - ) -> "AssetGraphSubset": + def __sub__(self, other: "AssetGraphSubset") -> "AssetGraphSubset": return self._oper(other, operator.sub) - def __and__( - self, other: Union["AssetGraphSubset", AbstractSet[AssetKeyPartitionKey]] - ) -> "AssetGraphSubset": + def __and__(self, other: "AssetGraphSubset") -> "AssetGraphSubset": return self._oper(other, operator.and_) def filter_asset_keys(self, asset_keys: AbstractSet[AssetKey]) -> "AssetGraphSubset": return AssetGraphSubset( - self.asset_graph, { asset_key: subset for asset_key, subset in self.partitions_subsets_by_asset_key.items() if asset_key in asset_keys }, - self._non_partitioned_asset_keys & asset_keys, + self.non_partitioned_asset_keys & asset_keys, ) def __eq__(self, other) -> bool: return ( isinstance(other, AssetGraphSubset) - and self.asset_graph == other.asset_graph and self.partitions_subsets_by_asset_key == other.partitions_subsets_by_asset_key and self.non_partitioned_asset_keys == other.non_partitioned_asset_keys ) @@ -209,7 +272,6 @@ def from_asset_partition_set( for asset_key, partition_keys in partitions_by_asset_key.items() }, non_partitioned_asset_keys=non_partitioned_asset_keys, - asset_graph=asset_graph, ) @classmethod @@ -297,9 +359,7 @@ def from_storage_dict( AssetKey.from_user_string(key) for key in serialized_dict["non_partitioned_asset_keys"] } & asset_graph.all_asset_keys - return AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key, non_partitioned_asset_keys - ) + return AssetGraphSubset(partitions_subsets_by_asset_key, non_partitioned_asset_keys) @classmethod def all( @@ -339,6 +399,4 @@ def from_asset_keys( else: non_partitioned_asset_keys.add(asset_key) - return AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key, non_partitioned_asset_keys - ) + return AssetGraphSubset(partitions_subsets_by_asset_key, non_partitioned_asset_keys) diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index 5e67eb103ee4d..e0341c75fb499 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -1862,6 +1862,11 @@ def with_partitions_def( included_partition_keys=self._included_partition_keys, ) + def to_time_window_partitions_subset(self) -> "TimeWindowPartitionsSubset": + return TimeWindowPartitionsSubset( + self.partitions_def, self.num_partitions, self.included_time_windows + ) + class TimeWindowPartitionsDefinitionSerializer(FieldSerializer): """Serializes a TimeWindowPartitionsDefinition by converting it to a SerializableTimeWindowPartitionsDefinition.""" diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 3b12535b9b52b..55b452c7fedc6 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -38,6 +38,7 @@ from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset from dagster._core.definitions.run_request import RunRequest from dagster._core.definitions.selector import JobSubsetSelector, PartitionsByAssetSelector +from dagster._core.definitions.time_window_partitions import DatetimeFieldSerializer from dagster._core.errors import ( DagsterAssetBackfillDataLoadError, DagsterBackfillFailedError, @@ -67,6 +68,7 @@ IWorkspaceProcessContext, ) from dagster._core.workspace.workspace import IWorkspace +from dagster._serdes import whitelist_for_serdes from dagster._utils import hash_collection, utc_datetime_from_timestamp from dagster._utils.caching_instance_queryer import CachingInstanceQueryer @@ -130,6 +132,7 @@ def __new__(cls, asset_key: AssetKey, asset_backfill_status: Optional[AssetBackf ) +@whitelist_for_serdes(field_serializers={"backfill_start_time": DatetimeFieldSerializer}) class AssetBackfillData(NamedTuple): """Has custom serialization instead of standard Dagster NamedTuple serialization because the asset graph is required to build the AssetGraphSubset objects. @@ -186,7 +189,7 @@ def _get_self_and_downstream_targeted_subset( self_and_downstream = initial_subset for asset_key in initial_subset.asset_keys: self_and_downstream = self_and_downstream | ( - self.target_subset.asset_graph.bfs_filter_subsets( + instance_queryer.asset_graph.bfs_filter_subsets( instance_queryer, lambda asset_key, _: asset_key in self.target_subset, initial_subset.filter_asset_keys({asset_key}), @@ -201,7 +204,7 @@ def _get_self_and_downstream_targeted_subset( for asset_key in self.target_subset.asset_keys if all( parent not in self.target_subset.asset_keys - for parent in self.target_subset.asset_graph.get_parents(asset_key) + for parent in instance_queryer.asset_graph.get_parents(asset_key) - {asset_key} # Do not include an asset as its own parent ) } @@ -229,7 +232,7 @@ def _get_self_and_downstream_targeted_subset( unreachable_target_root_subset = unreachable_targets.filter_asset_keys( AssetSelection.keys(*unreachable_targets.asset_keys) .sources() - .resolve(unreachable_targets.asset_graph) + .resolve(instance_queryer.asset_graph) ) root_subset = root_subset | unreachable_target_root_subset @@ -257,18 +260,16 @@ def get_target_partitions_subset(self, asset_key: AssetKey) -> PartitionsSubset: # Return the targeted partitions for the root partitioned asset keys return self.target_subset.get_partitions_subset(asset_key) - def get_target_root_partitions_subset(self) -> PartitionsSubset: + def get_target_root_partitions_subset(self, asset_graph: AssetGraph) -> PartitionsSubset: """Returns the most upstream partitions subset that was targeted by the backfill.""" partitioned_asset_keys = { asset_key for asset_key in self.target_subset.asset_keys - if self.target_subset.asset_graph.get_partitions_def(asset_key) is not None + if asset_graph.get_partitions_def(asset_key) is not None } root_partitioned_asset_keys = ( - AssetSelection.keys(*partitioned_asset_keys) - .sources() - .resolve(self.target_subset.asset_graph) + AssetSelection.keys(*partitioned_asset_keys).sources().resolve(asset_graph) ) # Return the targeted partitions for the root partitioned asset keys @@ -289,13 +290,15 @@ def get_num_partitions(self) -> Optional[int]: else: return None - def get_targeted_asset_keys_topological_order(self) -> Sequence[AssetKey]: + def get_targeted_asset_keys_topological_order( + self, asset_graph: AssetGraph + ) -> Sequence[AssetKey]: """Returns a topological ordering of asset keys targeted by the backfill that exist in the asset graph. Orders keys in the same topological level alphabetically. """ - toposorted_keys = self.target_subset.asset_graph.toposort_asset_keys() + toposorted_keys = asset_graph.toposort_asset_keys() targeted_toposorted_keys = [] for level_keys in toposorted_keys: @@ -306,7 +309,7 @@ def get_targeted_asset_keys_topological_order(self) -> Sequence[AssetKey]: return targeted_toposorted_keys def get_backfill_status_per_asset_key( - self, + self, asset_graph: AssetGraph ) -> Sequence[Union[PartitionedAssetBackfillStatus, UnpartitionedAssetBackfillStatus]]: """Returns a list containing each targeted asset key's backfill status. This list orders assets topologically and only contains statuses for assets that are @@ -316,7 +319,7 @@ def get_backfill_status_per_asset_key( def _get_status_for_asset_key( asset_key: AssetKey, ) -> Union[PartitionedAssetBackfillStatus, UnpartitionedAssetBackfillStatus]: - if self.target_subset.asset_graph.get_partitions_def(asset_key) is not None: + if asset_graph.get_partitions_def(asset_key) is not None: materialized_subset = self.materialized_subset.get_partitions_subset(asset_key) failed_subset = self.failed_and_downstream_subset.get_partitions_subset(asset_key) requested_subset = self.requested_subset.get_partitions_subset(asset_key) @@ -360,7 +363,7 @@ def _get_status_for_asset_key( return UnpartitionedAssetBackfillStatus(asset_key, None) # Only return back statuses for the assets that still exist in the workspace - topological_order = self.get_targeted_asset_keys_topological_order() + topological_order = self.get_targeted_asset_keys_topological_order(asset_graph) return [_get_status_for_asset_key(asset_key) for asset_key in topological_order] def get_partition_names(self) -> Optional[Sequence[str]]: @@ -382,13 +385,12 @@ def get_partition_names(self) -> Optional[Sequence[str]]: def empty( cls, target_subset: AssetGraphSubset, backfill_start_time: datetime ) -> "AssetBackfillData": - asset_graph = target_subset.asset_graph return cls( target_subset=target_subset, requested_runs_for_target_roots=False, - requested_subset=AssetGraphSubset(asset_graph), - materialized_subset=AssetGraphSubset(asset_graph), - failed_and_downstream_subset=AssetGraphSubset(asset_graph), + requested_subset=AssetGraphSubset(), + materialized_subset=AssetGraphSubset(), + failed_and_downstream_subset=AssetGraphSubset(), latest_storage_id=None, backfill_start_time=backfill_start_time, ) @@ -471,7 +473,6 @@ def from_partitions_by_assets( non_partitioned_asset_keys.add(asset_key) target_subset = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key=partitions_subsets_by_asset_key, non_partitioned_asset_keys=non_partitioned_asset_keys, ) @@ -525,7 +526,6 @@ def from_asset_partitions( root_partitions_subset = root_partitions_def.subset_with_partition_keys(partition_names) target_subset = AssetGraphSubset( - asset_graph, non_partitioned_asset_keys=set(asset_selection) - partitioned_asset_keys, ) for root_asset_key in root_partitioned_asset_keys: @@ -533,7 +533,6 @@ def from_asset_partitions( dynamic_partitions_store, lambda asset_key, _: asset_key in partitioned_asset_keys, AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={root_asset_key: root_partitions_subset}, ), current_time=backfill_start_time, @@ -543,21 +542,23 @@ def from_asset_partitions( return cls.empty(target_subset, backfill_start_time) - def serialize(self, dynamic_partitions_store: DynamicPartitionsStore) -> str: + def serialize( + self, dynamic_partitions_store: DynamicPartitionsStore, asset_graph: AssetGraph + ) -> str: storage_dict = { "requested_runs_for_target_roots": self.requested_runs_for_target_roots, "serialized_target_subset": self.target_subset.to_storage_dict( - dynamic_partitions_store=dynamic_partitions_store + dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph ), "latest_storage_id": self.latest_storage_id, "serialized_requested_subset": self.requested_subset.to_storage_dict( - dynamic_partitions_store=dynamic_partitions_store + dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph ), "serialized_materialized_subset": self.materialized_subset.to_storage_dict( - dynamic_partitions_store=dynamic_partitions_store + dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph ), "serialized_failed_subset": self.failed_and_downstream_subset.to_storage_dict( - dynamic_partitions_store=dynamic_partitions_store + dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph ), } return json.dumps(storage_dict) @@ -700,7 +701,9 @@ def _submit_runs_and_update_backfill_in_chunks( requested_partitions_in_chunk = _get_requested_asset_partitions_from_run_requests( run_requests_chunk, asset_graph, instance_queryer ) - submitted_partitions = submitted_partitions | requested_partitions_in_chunk + submitted_partitions = submitted_partitions | AssetGraphSubset.from_asset_partition_set( + set(requested_partitions_in_chunk), asset_graph=asset_graph + ) # AssetBackfillIterationResult contains the requested subset after all runs are submitted. # Replace this value with just the partitions that have been submitted so far. @@ -713,7 +716,9 @@ def _submit_runs_and_update_backfill_in_chunks( # Refetch, in case the backfill was requested for cancellation in the meantime backfill = cast(PartitionBackfill, instance.get_backfill(backfill_id)) updated_backfill = backfill.with_asset_backfill_data( - backfill_data_with_submitted_runs, dynamic_partitions_store=instance + backfill_data_with_submitted_runs, + dynamic_partitions_store=instance, + asset_graph=asset_graph, ) instance.update_backfill(updated_backfill) @@ -824,7 +829,7 @@ def execute_asset_backfill_iteration( # Refetch, in case the backfill was canceled in the meantime backfill = cast(PartitionBackfill, instance.get_backfill(backfill.backfill_id)) updated_backfill = backfill.with_asset_backfill_data( - updated_asset_backfill_data, dynamic_partitions_store=instance + updated_asset_backfill_data, dynamic_partitions_store=instance, asset_graph=asset_graph ) if updated_asset_backfill_data.is_complete(): # The asset backfill is complete when all runs to be requested have finished (success, @@ -875,7 +880,7 @@ def execute_asset_backfill_iteration( ) updated_backfill = backfill.with_asset_backfill_data( - updated_asset_backfill_data, dynamic_partitions_store=instance + updated_asset_backfill_data, dynamic_partitions_store=instance, asset_graph=asset_graph ) # The asset backfill is successfully canceled when all requested runs have finished (success, # failure, or cancellation). Since the AssetBackfillData object stores materialization states @@ -1028,7 +1033,7 @@ def get_asset_backfill_iteration_materialized_partitions( This function is a generator so we can return control to the daemon and let it heartbeat during expensive operations. """ - recently_materialized_asset_partitions = AssetGraphSubset(asset_graph) + recently_materialized_asset_partitions = AssetGraphSubset() for asset_key in asset_backfill_data.target_subset.asset_keys: records = instance_queryer.instance.get_event_records( EventRecordsFilter( @@ -1044,9 +1049,13 @@ def get_asset_backfill_iteration_materialized_partitions( run_id=record.run_id, tag_key=BACKFILL_ID_TAG, tag_value=backfill_id ) ] - recently_materialized_asset_partitions |= { - AssetKeyPartitionKey(asset_key, record.partition_key) for record in records_in_backfill - } + recently_materialized_asset_partitions |= AssetGraphSubset.from_asset_partition_set( + { + AssetKeyPartitionKey(asset_key, record.partition_key) + for record in records_in_backfill + }, + asset_graph, + ) yield None @@ -1108,8 +1117,8 @@ def execute_asset_backfill_iteration_inner( event_type=DagsterEventType.ASSET_MATERIALIZATION ) - updated_materialized_subset = AssetGraphSubset(asset_graph) - failed_and_downstream_subset = AssetGraphSubset(asset_graph) + updated_materialized_subset = AssetGraphSubset() + failed_and_downstream_subset = AssetGraphSubset() else: target_parent_asset_keys = { parent @@ -1212,7 +1221,8 @@ def execute_asset_backfill_iteration_inner( or request_roots, materialized_subset=updated_materialized_subset, failed_and_downstream_subset=failed_and_downstream_subset, - requested_subset=asset_backfill_data.requested_subset | asset_partitions_to_request, + requested_subset=asset_backfill_data.requested_subset + | AssetGraphSubset.from_asset_partition_set(set(asset_partitions_to_request), asset_graph), backfill_start_time=backfill_start_time, ) yield AssetBackfillIterationResult(run_requests, updated_asset_backfill_data) diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index b947b7d08aa51..81492d36c0472 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -149,16 +149,17 @@ def get_backfill_status_per_asset_key( return [] if self.serialized_asset_backfill_data is not None: + asset_graph = ExternalAssetGraph.from_workspace(workspace) try: asset_backfill_data = AssetBackfillData.from_serialized( self.serialized_asset_backfill_data, - ExternalAssetGraph.from_workspace(workspace), + asset_graph, self.backfill_timestamp, ) except DagsterDefinitionChangedDeserializationError: return [] - return asset_backfill_data.get_backfill_status_per_asset_key() + return asset_backfill_data.get_backfill_status_per_asset_key(asset_graph) else: return [] @@ -190,15 +191,16 @@ def get_target_root_partitions_subset( if self.serialized_asset_backfill_data is not None: try: + asset_graph = ExternalAssetGraph.from_workspace(workspace) asset_backfill_data = AssetBackfillData.from_serialized( self.serialized_asset_backfill_data, - ExternalAssetGraph.from_workspace(workspace), + asset_graph, self.backfill_timestamp, ) except DagsterDefinitionChangedDeserializationError: return None - return asset_backfill_data.get_target_root_partitions_subset() + return asset_backfill_data.get_target_root_partitions_subset(asset_graph) else: return None @@ -323,6 +325,7 @@ def with_asset_backfill_data( self, asset_backfill_data: AssetBackfillData, dynamic_partitions_store: DynamicPartitionsStore, + asset_graph: AssetGraph, ) -> "PartitionBackfill": return PartitionBackfill( status=self.status, @@ -337,7 +340,7 @@ def with_asset_backfill_data( error=self.error, asset_selection=self.asset_selection, serialized_asset_backfill_data=asset_backfill_data.serialize( - dynamic_partitions_store=dynamic_partitions_store + dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph ), ) @@ -375,7 +378,7 @@ def from_asset_partitions( dynamic_partitions_store=dynamic_partitions_store, all_partitions=all_partitions, backfill_start_time=utc_datetime_from_timestamp(backfill_timestamp), - ).serialize(dynamic_partitions_store=dynamic_partitions_store), + ).serialize(dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph), ) @classmethod @@ -399,5 +402,5 @@ def from_partitions_by_assets( dynamic_partitions_store=dynamic_partitions_store, backfill_start_time=utc_datetime_from_timestamp(backfill_timestamp), partitions_by_assets=partitions_by_assets, - ).serialize(dynamic_partitions_store=dynamic_partitions_store), + ).serialize(dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph), ) diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index f865344ce04cc..1fa1440ff9340 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -456,7 +456,7 @@ def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset: if backfill.is_asset_backfill ] - result = AssetGraphSubset(self.asset_graph) + result = AssetGraphSubset() for asset_backfill in asset_backfills: if asset_backfill.serialized_asset_backfill_data is None: check.failed("Asset backfill missing serialized_asset_backfill_data") diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index 77667b0616a11..38176aac82dff 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -402,10 +402,9 @@ def include_all(asset_key, partitions_subset): ["2022-01-02", "2022-01-03"] ) initial_asset1_subset = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={asset1.key: initial_partitions_subset} + partitions_subsets_by_asset_key={asset1.key: initial_partitions_subset} ) corresponding_asset3_subset = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ asset3.key: asset3.partitions_def.empty_subset().with_partition_key_range( asset3.partitions_def, @@ -427,12 +426,15 @@ def include_all(asset_key, partitions_subset): def include_none(asset_key, partitions_subset): return False - assert asset_graph.bfs_filter_subsets( - dynamic_partitions_store=MagicMock(), - initial_subset=initial_asset1_subset, - condition_fn=include_none, - current_time=pendulum.now("UTC"), - ) == AssetGraphSubset(asset_graph) + assert ( + asset_graph.bfs_filter_subsets( + dynamic_partitions_store=MagicMock(), + initial_subset=initial_asset1_subset, + condition_fn=include_none, + current_time=pendulum.now("UTC"), + ) + == AssetGraphSubset() + ) def exclude_asset3(asset_key, partitions_subset): return asset_key is not asset3.key @@ -451,7 +453,7 @@ def exclude_asset2(asset_key, partitions_subset): return asset_key is not asset2.key initial_asset0_subset = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={asset0.key: initial_partitions_subset} + partitions_subsets_by_asset_key={asset0.key: initial_partitions_subset} ) assert ( asset_graph.bfs_filter_subsets( @@ -503,7 +505,6 @@ def include_all(asset_key, partitions_subset): ["2022-01-01", "2022-01-02"] ) expected_asset_graph_subset = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ asset0.key: initial_subset, asset1.key: initial_subset, @@ -516,7 +517,6 @@ def include_all(asset_key, partitions_subset): asset_graph.bfs_filter_subsets( dynamic_partitions_store=MagicMock(), initial_subset=AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={asset0.key: initial_subset}, ), condition_fn=include_all, @@ -550,7 +550,6 @@ def unpartitioned2(): ) asset_graph_subset = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ partitioned1.key: daily_partitions_def.subset_with_partition_keys(["2022-01-01"]), partitioned2.key: daily_partitions_def.empty_subset(), @@ -593,7 +592,6 @@ def unpartitioned2(): ) subset1 = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ partitioned1.key: daily_partitions_def.subset_with_partition_keys( ["2022-01-01", "2022-01-02", "2022-01-03"] @@ -606,7 +604,6 @@ def unpartitioned2(): ) subset2 = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ partitioned1.key: daily_partitions_def.subset_with_partition_keys( ["2022-01-02", "2022-01-03", "2022-01-04"] @@ -621,7 +618,6 @@ def unpartitioned2(): assert len(list((subset1 - subset1).iterate_asset_partitions())) == 0 assert len(list((subset2 - subset2).iterate_asset_partitions())) == 0 assert subset1 - subset2 == AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ partitioned1.key: daily_partitions_def.subset_with_partition_keys(["2022-01-01"]), partitioned2.key: daily_partitions_def.empty_subset(), @@ -629,7 +625,6 @@ def unpartitioned2(): non_partitioned_asset_keys=set(), ) assert subset2 - subset1 == AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ partitioned1.key: daily_partitions_def.subset_with_partition_keys(["2022-01-04"]), partitioned2.key: daily_partitions_def.subset_with_partition_keys( @@ -691,7 +686,6 @@ def unpartitioned3(): ) ag1_storage_dict = AssetGraphSubset( - get_ag1(), partitions_subsets_by_asset_key={ AssetKey("partitioned1"): daily_partitions_def.subset_with_partition_keys( ["2022-01-01", "2022-01-02", "2022-01-03"] @@ -715,7 +709,6 @@ def unpartitioned3(): ag1_storage_dict, asset_graph=asset_graph2, allow_partial=True ) assert ag2_subset == AssetGraphSubset( - asset_graph2, partitions_subsets_by_asset_key={ AssetKey("partitioned1"): daily_partitions_def.subset_with_partition_keys( ["2022-01-01", "2022-01-02", "2022-01-03"] diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 91f3a97061087..f2aebecaa2fd8 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -62,6 +62,7 @@ PARTITION_NAME_TAG, ) from dagster._core.test_utils import instance_for_test +from dagster._serdes import deserialize_value, serialize_value from dagster._seven.compat.pendulum import create_pendulum_time from dagster._utils import Counter, traced_counter from dagster._utils.caching_instance_queryer import CachingInstanceQueryer @@ -1398,3 +1399,40 @@ def foo_child(): } run_backfill_to_completion(asset_graph, assets_by_repo_name, asset_backfill_data, [], instance) + + +def test_asset_backfill_serialization_deserialization(): + @asset( + partitions_def=DailyPartitionsDefinition("2023-01-01"), + ) + def upstream(): + pass + + @asset + def middle(): + pass + + @asset( + partitions_def=DailyPartitionsDefinition("2023-01-01"), + ) + def downstream(upstream): + pass + + assets_by_repo_name = {"repo": [upstream, downstream, middle]} + asset_graph = get_asset_graph(assets_by_repo_name) + + asset_backfill_data = AssetBackfillData.from_asset_partitions( + asset_graph=asset_graph, + # partition_names=["2023-01-01", "2023-01-02", "2023-01-05"], + partition_names=["2023-01-01"], + # asset_selection=[upstream.key, middle.key, downstream.key], + asset_selection=[upstream.key], + dynamic_partitions_store=MagicMock(), + all_partitions=False, + backfill_start_time=pendulum.datetime(2023, 1, 9, 0, 0, 0), + ) + + assert ( + deserialize_value(serialize_value(asset_backfill_data), AssetBackfillData) + == asset_backfill_data + ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index 8cfdcc713f961..3beb95eee3528 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -307,18 +307,18 @@ def repo(): for i, target in enumerate(self.active_backfill_targets or []): if isinstance(target, Mapping): target_subset = AssetGraphSubset( - asset_graph=repo.asset_graph, + # asset_graph=repo.asset_graph, partitions_subsets_by_asset_key=target, non_partitioned_asset_keys=set(), ) else: target_subset = AssetGraphSubset( - asset_graph=repo.asset_graph, + # asset_graph=repo.asset_graph, partitions_subsets_by_asset_key={}, non_partitioned_asset_keys=target, ) empty_subset = AssetGraphSubset( - asset_graph=repo.asset_graph, + # asset_graph=repo.asset_graph, partitions_subsets_by_asset_key={}, non_partitioned_asset_keys=set(), )