diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index 40dd6850f32fe..ddee5287d7639 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -95,7 +95,7 @@ def get_asset_backfill_preview( asset_partitions = [] - for asset_key in asset_backfill_data.get_targeted_asset_keys_topological_order(): + for asset_key in asset_backfill_data.get_targeted_asset_keys_topological_order(asset_graph): if asset_graph.get_partitions_def(asset_key): partitions_subset = asset_backfill_data.target_subset.partitions_subsets_by_asset_key[ asset_key diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py index 7b5f84d41cc9e..35cc1a7e7a36f 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py @@ -284,7 +284,9 @@ def test_launch_asset_backfill_all_partitions_asset_selection(): target_subset.get_partitions_subset(AssetKey("asset2")).get_partition_keys() == all_partition_keys ) - assert not target_subset.get_partitions_subset(AssetKey("asset1")).get_partition_keys() + assert not target_subset.get_partitions_subset( + AssetKey("asset1"), asset_graph=repo.asset_graph + ).get_partition_keys() def test_launch_asset_backfill_partitions_by_asset(): @@ -536,9 +538,8 @@ def test_launch_asset_backfill_with_upstream_anchor_asset(): launch_backfill_result, instance, repo ) target_subset = asset_backfill_data.target_subset - asset_graph = target_subset.asset_graph + asset_graph = repo.asset_graph assert target_subset == AssetGraphSubset( - target_subset.asset_graph, partitions_subsets_by_asset_key={ AssetKey("hourly"): asset_graph.get_partitions_def( AssetKey("hourly") @@ -605,9 +606,8 @@ def test_launch_asset_backfill_with_two_anchor_assets(): launch_backfill_result, instance, repo ) target_subset = asset_backfill_data.target_subset - asset_graph = target_subset.asset_graph + asset_graph = repo.asset_graph assert target_subset == AssetGraphSubset( - target_subset.asset_graph, partitions_subsets_by_asset_key={ AssetKey("hourly1"): asset_graph.get_partitions_def( AssetKey("hourly1") @@ -663,9 +663,8 @@ def test_launch_asset_backfill_with_upstream_anchor_asset_and_non_partitioned_as launch_backfill_result, instance, repo ) target_subset = asset_backfill_data.target_subset - asset_graph = target_subset.asset_graph + asset_graph = repo.asset_graph assert target_subset == AssetGraphSubset( - target_subset.asset_graph, non_partitioned_asset_keys={AssetKey("non_partitioned")}, partitions_subsets_by_asset_key={ AssetKey("hourly"): ( diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index db9d8d3cced21..f6cffd301b897 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -234,6 +234,7 @@ def _execute_asset_backfill_iteration_no_side_effects( updated_backfill = backfill.with_asset_backfill_data( cast(AssetBackfillIterationResult, result).backfill_data, dynamic_partitions_store=graphql_context.instance, + asset_graph=asset_graph, ) graphql_context.instance.update_backfill(updated_backfill) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 3f1524a0d88b9..c3d0b8498cf0f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -574,12 +574,12 @@ def bfs_filter_subsets( queued_subsets_by_asset_key: Dict[AssetKey, Optional[PartitionsSubset]] = { initial_asset_key: ( - initial_subset.get_partitions_subset(initial_asset_key) + initial_subset.get_partitions_subset(initial_asset_key, self) if self.get_partitions_def(initial_asset_key) else None ), } - result = AssetGraphSubset(self) + result = AssetGraphSubset() while len(queue) > 0: asset_key = queue.popleft() @@ -587,7 +587,6 @@ def bfs_filter_subsets( if condition_fn(asset_key, partitions_subset): result |= AssetGraphSubset( - self, non_partitioned_asset_keys={asset_key} if partitions_subset is None else set(), partitions_subsets_by_asset_key=( {asset_key: partitions_subset} if partitions_subset is not None else {} diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py index efe084745b58f..d422a44cfb0a5 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py @@ -1,70 +1,132 @@ import operator from collections import defaultdict from datetime import datetime -from typing import AbstractSet, Any, Callable, Dict, Iterable, Mapping, Optional, Set, Union, cast +from typing import ( + AbstractSet, + Any, + Callable, + Dict, + Iterable, + Mapping, + NamedTuple, + Optional, + Set, + Union, + cast, +) from dagster import _check as check from dagster._core.definitions.partition import ( PartitionsDefinition, PartitionsSubset, ) +from dagster._core.definitions.time_window_partitions import ( + PartitionKeysTimeWindowPartitionsSubset, + TimeWindowPartitionsSubset, +) from dagster._core.errors import ( DagsterDefinitionChangedDeserializationError, ) from dagster._core.instance import DynamicPartitionsStore +from dagster._serdes.serdes import ( + FieldSerializer, + deserialize_value, + serialize_value, + whitelist_for_serdes, +) from .asset_graph import AssetGraph from .events import AssetKey, AssetKeyPartitionKey -class AssetGraphSubset: - def __init__( +class PartitionsSubsetByAssetKeySerializer(FieldSerializer): + """Packs and unpacks a mapping from AssetKey to PartitionsSubset. + + In JSON, a key must be a str, int, float, bool, or None. This serializer packs the AssetKey + into a str, and unpacks it back into an AssetKey. + + It also converts PartitionKeysTimeWindowPartitionsSubset into serializable TimeWindowPartitionsSubsets. + """ + + def pack(self, mapping: Mapping[AssetKey, Any], **_kwargs) -> Mapping[str, Any]: + return { + serialize_value(key): serialize_value( + value.to_time_window_partitions_subset() + if isinstance(value, PartitionKeysTimeWindowPartitionsSubset) + else value + ) + for key, value in mapping.items() + } + + def unpack( self, - asset_graph: AssetGraph, + mapping: Mapping[str, Any], + **_kwargs, + ) -> Mapping[AssetKey, Any]: + return { + deserialize_value(key, AssetKey): deserialize_value(value, TimeWindowPartitionsSubset) + for key, value in mapping.items() + } + + +@whitelist_for_serdes( + field_serializers={"partitions_subsets_by_asset_key": PartitionsSubsetByAssetKeySerializer} +) +class AssetGraphSubset( + NamedTuple( + "_AssetGraphSubset", + [ + ("partitions_subsets_by_asset_key", Mapping[AssetKey, PartitionsSubset]), + ("non_partitioned_asset_keys", AbstractSet[AssetKey]), + ], + ) +): + def __new__( + cls, partitions_subsets_by_asset_key: Optional[Mapping[AssetKey, PartitionsSubset]] = None, non_partitioned_asset_keys: Optional[AbstractSet[AssetKey]] = None, ): - self._asset_graph = asset_graph - self._partitions_subsets_by_asset_key = partitions_subsets_by_asset_key or {} - self._non_partitioned_asset_keys = non_partitioned_asset_keys or set() - - @property - def asset_graph(self) -> AssetGraph: - return self._asset_graph - - @property - def partitions_subsets_by_asset_key(self) -> Mapping[AssetKey, PartitionsSubset]: - return self._partitions_subsets_by_asset_key - - @property - def non_partitioned_asset_keys(self) -> AbstractSet[AssetKey]: - return self._non_partitioned_asset_keys + return super(AssetGraphSubset, cls).__new__( + cls, + partitions_subsets_by_asset_key=partitions_subsets_by_asset_key or {}, + non_partitioned_asset_keys=non_partitioned_asset_keys or set(), + ) @property def asset_keys(self) -> AbstractSet[AssetKey]: return { key for key, subset in self.partitions_subsets_by_asset_key.items() if len(subset) > 0 - } | self._non_partitioned_asset_keys + } | self.non_partitioned_asset_keys @property def num_partitions_and_non_partitioned_assets(self): - return len(self._non_partitioned_asset_keys) + sum( - len(subset) for subset in self._partitions_subsets_by_asset_key.values() + return len(self.non_partitioned_asset_keys) + sum( + len(subset) for subset in self.partitions_subsets_by_asset_key.values() ) - def get_partitions_subset(self, asset_key: AssetKey) -> PartitionsSubset: - partitions_def = self.asset_graph.get_partitions_def(asset_key) - if partitions_def is None: - check.failed("Can only call get_partitions_subset on a partitioned asset") + def get_partitions_subset( + self, asset_key: AssetKey, asset_graph: Optional[AssetGraph] = None + ) -> PartitionsSubset: + if asset_graph: + partitions_def = asset_graph.get_partitions_def(asset_key) + if partitions_def is None: + check.failed("Can only call get_partitions_subset on a partitioned asset") - return self.partitions_subsets_by_asset_key.get(asset_key, partitions_def.empty_subset()) + return self.partitions_subsets_by_asset_key.get( + asset_key, partitions_def.empty_subset() + ) + else: + return self.partitions_subsets_by_asset_key[asset_key] def iterate_asset_partitions(self) -> Iterable[AssetKeyPartitionKey]: - for asset_key, partitions_subset in self.partitions_subsets_by_asset_key.items(): + for ( + asset_key, + partitions_subset, + ) in self.partitions_subsets_by_asset_key.items(): for partition_key in partitions_subset.get_partition_keys(): yield AssetKeyPartitionKey(asset_key, partition_key) - for asset_key in self._non_partitioned_asset_keys: + for asset_key in self.non_partitioned_asset_keys: yield AssetKeyPartitionKey(asset_key, None) def __contains__(self, asset: Union[AssetKey, AssetKeyPartitionKey]) -> bool: @@ -74,19 +136,18 @@ def __contains__(self, asset: Union[AssetKey, AssetKeyPartitionKey]) -> bool: """ if isinstance(asset, AssetKey): # check if any keys are in the subset - if self.asset_graph.is_partitioned(asset): - partitions_subset = self.partitions_subsets_by_asset_key.get(asset) - return partitions_subset is not None and len(partitions_subset) > 0 - else: - return asset in self._non_partitioned_asset_keys + partitions_subset = self.partitions_subsets_by_asset_key.get(asset) + return (partitions_subset is not None and len(partitions_subset) > 0) or ( + asset in self.non_partitioned_asset_keys + ) elif asset.partition_key is None: - return asset.asset_key in self._non_partitioned_asset_keys + return asset.asset_key in self.non_partitioned_asset_keys else: partitions_subset = self.partitions_subsets_by_asset_key.get(asset.asset_key) return partitions_subset is not None and asset.partition_key in partitions_subset def to_storage_dict( - self, dynamic_partitions_store: DynamicPartitionsStore + self, dynamic_partitions_store: DynamicPartitionsStore, asset_graph: AssetGraph ) -> Mapping[str, object]: return { "partitions_subsets_by_asset_key": { @@ -95,7 +156,7 @@ def to_storage_dict( }, "serializable_partitions_def_ids_by_asset_key": { key.to_user_string(): check.not_none( - self._asset_graph.get_partitions_def(key) + asset_graph.get_partitions_def(key) ).get_serializable_unique_identifier( dynamic_partitions_store=dynamic_partitions_store ) @@ -103,28 +164,26 @@ def to_storage_dict( }, "partitions_def_class_names_by_asset_key": { key.to_user_string(): check.not_none( - self._asset_graph.get_partitions_def(key) + asset_graph.get_partitions_def(key) ).__class__.__name__ for key, _ in self.partitions_subsets_by_asset_key.items() }, "non_partitioned_asset_keys": [ - key.to_user_string() for key in self._non_partitioned_asset_keys + key.to_user_string() for key in self.non_partitioned_asset_keys ], } - def _oper( - self, other: Union["AssetGraphSubset", AbstractSet[AssetKeyPartitionKey]], oper: Callable - ) -> "AssetGraphSubset": + def _oper(self, other: "AssetGraphSubset", oper: Callable) -> "AssetGraphSubset": """Returns the AssetGraphSubset that results from applying the given operator to the set of asset partitions in self and other. Note: Not all operators are supported on the underlying PartitionsSubset objects. """ result_partition_subsets_by_asset_key = {**self.partitions_subsets_by_asset_key} - result_non_partitioned_asset_keys = set(self._non_partitioned_asset_keys) + result_non_partitioned_asset_keys = set(self.non_partitioned_asset_keys) - if not isinstance(other, AssetGraphSubset): - other = AssetGraphSubset.from_asset_partition_set(other, self.asset_graph) + # if not isinstance(other, AssetGraphSubset): + # other = AssetGraphSubset.from_asset_partition_set(other, self.asset_graph) for asset_key in other.asset_keys: if asset_key in other.non_partitioned_asset_keys: @@ -133,48 +192,62 @@ def _oper( result_non_partitioned_asset_keys, {asset_key} ) else: - subset = self.get_partitions_subset(asset_key) check.invariant(asset_key not in self.non_partitioned_asset_keys) - result_partition_subsets_by_asset_key[asset_key] = oper( - subset, other.get_partitions_subset(asset_key) + subset = ( + self.get_partitions_subset(asset_key) + if asset_key in self.partitions_subsets_by_asset_key + else None ) + other_subset = other.get_partitions_subset(asset_key) + if other_subset is None and subset is None: + pass + if subset is None and other_subset is not None: + if oper == operator.or_: + result_partition_subsets_by_asset_key[asset_key] = other_subset + elif oper == operator.sub: + pass + elif oper == operator.and_: + pass + else: + check.failed(f"Unsupported operator {oper}") + elif subset is not None and other_subset is None: + if oper == operator.or_: + pass + elif oper == operator.sub: + pass + elif oper == operator.and_: + del result_partition_subsets_by_asset_key[asset_key] + else: + result_partition_subsets_by_asset_key[asset_key] = oper(subset, other_subset) + return AssetGraphSubset( - self.asset_graph, result_partition_subsets_by_asset_key, result_non_partitioned_asset_keys, ) - def __or__( - self, other: Union["AssetGraphSubset", AbstractSet[AssetKeyPartitionKey]] - ) -> "AssetGraphSubset": + def __or__(self, other: "AssetGraphSubset") -> "AssetGraphSubset": return self._oper(other, operator.or_) - def __sub__( - self, other: Union["AssetGraphSubset", AbstractSet[AssetKeyPartitionKey]] - ) -> "AssetGraphSubset": + def __sub__(self, other: "AssetGraphSubset") -> "AssetGraphSubset": return self._oper(other, operator.sub) - def __and__( - self, other: Union["AssetGraphSubset", AbstractSet[AssetKeyPartitionKey]] - ) -> "AssetGraphSubset": + def __and__(self, other: "AssetGraphSubset") -> "AssetGraphSubset": return self._oper(other, operator.and_) def filter_asset_keys(self, asset_keys: AbstractSet[AssetKey]) -> "AssetGraphSubset": return AssetGraphSubset( - self.asset_graph, { asset_key: subset for asset_key, subset in self.partitions_subsets_by_asset_key.items() if asset_key in asset_keys }, - self._non_partitioned_asset_keys & asset_keys, + self.non_partitioned_asset_keys & asset_keys, ) def __eq__(self, other) -> bool: return ( isinstance(other, AssetGraphSubset) - and self.asset_graph == other.asset_graph and self.partitions_subsets_by_asset_key == other.partitions_subsets_by_asset_key and self.non_partitioned_asset_keys == other.non_partitioned_asset_keys ) @@ -183,13 +256,15 @@ def __repr__(self) -> str: return ( "AssetGraphSubset(" f"non_partitioned_asset_keys={self.non_partitioned_asset_keys}, " - f"partitions_subset_by_asset_key={self.partitions_subsets_by_asset_key}" + f"partitions_subsets_by_asset_key={self.partitions_subsets_by_asset_key}" ")" ) @classmethod def from_asset_partition_set( - cls, asset_partitions_set: AbstractSet[AssetKeyPartitionKey], asset_graph: AssetGraph + cls, + asset_partitions_set: AbstractSet[AssetKeyPartitionKey], + asset_graph: AssetGraph, ) -> "AssetGraphSubset": partitions_by_asset_key = defaultdict(set) non_partitioned_asset_keys = set() @@ -209,7 +284,6 @@ def from_asset_partition_set( for asset_key, partition_keys in partitions_by_asset_key.items() }, non_partitioned_asset_keys=non_partitioned_asset_keys, - asset_graph=asset_graph, ) @classmethod @@ -297,9 +371,7 @@ def from_storage_dict( AssetKey.from_user_string(key) for key in serialized_dict["non_partitioned_asset_keys"] } & asset_graph.all_asset_keys - return AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key, non_partitioned_asset_keys - ) + return AssetGraphSubset(partitions_subsets_by_asset_key, non_partitioned_asset_keys) @classmethod def all( @@ -333,12 +405,11 @@ def from_asset_keys( asset_key ] = partitions_def.empty_subset().with_partition_keys( partitions_def.get_partition_keys( - dynamic_partitions_store=dynamic_partitions_store, current_time=current_time + dynamic_partitions_store=dynamic_partitions_store, + current_time=current_time, ) ) else: non_partitioned_asset_keys.add(asset_key) - return AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key, non_partitioned_asset_keys - ) + return AssetGraphSubset(partitions_subsets_by_asset_key, non_partitioned_asset_keys) diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index fe6300f9e4fbd..e4afe7d074a64 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -1823,6 +1823,11 @@ def with_partitions_def( included_partition_keys=self._included_partition_keys, ) + def to_time_window_partitions_subset(self) -> "TimeWindowPartitionsSubset": + return TimeWindowPartitionsSubset( + self.partitions_def, self.num_partitions, self.included_time_windows + ) + class TimeWindowPartitionsSubsetSerializer(NamedTupleSerializer): # TimeWindowPartitionsSubsets have custom logic to delay calculating num_partitions until it diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 06f97e17095bb..dd97d87a61ed7 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -38,6 +38,7 @@ from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset from dagster._core.definitions.run_request import RunRequest from dagster._core.definitions.selector import JobSubsetSelector, PartitionsByAssetSelector +from dagster._core.definitions.time_window_partitions import DatetimeFieldSerializer from dagster._core.errors import ( DagsterAssetBackfillDataLoadError, DagsterBackfillFailedError, @@ -67,6 +68,7 @@ IWorkspaceProcessContext, ) from dagster._core.workspace.workspace import IWorkspace +from dagster._serdes import whitelist_for_serdes from dagster._utils import hash_collection, utc_datetime_from_timestamp from dagster._utils.caching_instance_queryer import CachingInstanceQueryer @@ -130,6 +132,7 @@ def __new__(cls, asset_key: AssetKey, asset_backfill_status: Optional[AssetBackf ) +@whitelist_for_serdes(field_serializers={"backfill_start_time": DatetimeFieldSerializer}) class AssetBackfillData(NamedTuple): """Has custom serialization instead of standard Dagster NamedTuple serialization because the asset graph is required to build the AssetGraphSubset objects. @@ -186,7 +189,7 @@ def _get_self_and_downstream_targeted_subset( self_and_downstream = initial_subset for asset_key in initial_subset.asset_keys: self_and_downstream = self_and_downstream | ( - self.target_subset.asset_graph.bfs_filter_subsets( + instance_queryer.asset_graph.bfs_filter_subsets( instance_queryer, lambda asset_key, _: asset_key in self.target_subset, initial_subset.filter_asset_keys({asset_key}), @@ -201,7 +204,7 @@ def _get_self_and_downstream_targeted_subset( for asset_key in self.target_subset.asset_keys if all( parent not in self.target_subset.asset_keys - for parent in self.target_subset.asset_graph.get_parents(asset_key) + for parent in instance_queryer.asset_graph.get_parents(asset_key) - {asset_key} # Do not include an asset as its own parent ) } @@ -229,7 +232,7 @@ def _get_self_and_downstream_targeted_subset( unreachable_target_root_subset = unreachable_targets.filter_asset_keys( AssetSelection.keys(*unreachable_targets.asset_keys) .sources() - .resolve(unreachable_targets.asset_graph) + .resolve(instance_queryer.asset_graph) ) root_subset = root_subset | unreachable_target_root_subset @@ -257,18 +260,16 @@ def get_target_partitions_subset(self, asset_key: AssetKey) -> PartitionsSubset: # Return the targeted partitions for the root partitioned asset keys return self.target_subset.get_partitions_subset(asset_key) - def get_target_root_partitions_subset(self) -> PartitionsSubset: + def get_target_root_partitions_subset(self, asset_graph: AssetGraph) -> PartitionsSubset: """Returns the most upstream partitions subset that was targeted by the backfill.""" partitioned_asset_keys = { asset_key for asset_key in self.target_subset.asset_keys - if self.target_subset.asset_graph.get_partitions_def(asset_key) is not None + if asset_graph.get_partitions_def(asset_key) is not None } root_partitioned_asset_keys = ( - AssetSelection.keys(*partitioned_asset_keys) - .sources() - .resolve(self.target_subset.asset_graph) + AssetSelection.keys(*partitioned_asset_keys).sources().resolve(asset_graph) ) # Return the targeted partitions for the root partitioned asset keys @@ -289,13 +290,15 @@ def get_num_partitions(self) -> Optional[int]: else: return None - def get_targeted_asset_keys_topological_order(self) -> Sequence[AssetKey]: + def get_targeted_asset_keys_topological_order( + self, asset_graph: AssetGraph + ) -> Sequence[AssetKey]: """Returns a topological ordering of asset keys targeted by the backfill that exist in the asset graph. Orders keys in the same topological level alphabetically. """ - toposorted_keys = self.target_subset.asset_graph.toposort_asset_keys() + toposorted_keys = asset_graph.toposort_asset_keys() targeted_toposorted_keys = [] for level_keys in toposorted_keys: @@ -306,7 +309,7 @@ def get_targeted_asset_keys_topological_order(self) -> Sequence[AssetKey]: return targeted_toposorted_keys def get_backfill_status_per_asset_key( - self, + self, asset_graph: AssetGraph ) -> Sequence[Union[PartitionedAssetBackfillStatus, UnpartitionedAssetBackfillStatus]]: """Returns a list containing each targeted asset key's backfill status. This list orders assets topologically and only contains statuses for assets that are @@ -316,10 +319,16 @@ def get_backfill_status_per_asset_key( def _get_status_for_asset_key( asset_key: AssetKey, ) -> Union[PartitionedAssetBackfillStatus, UnpartitionedAssetBackfillStatus]: - if self.target_subset.asset_graph.get_partitions_def(asset_key) is not None: - materialized_subset = self.materialized_subset.get_partitions_subset(asset_key) - failed_subset = self.failed_and_downstream_subset.get_partitions_subset(asset_key) - requested_subset = self.requested_subset.get_partitions_subset(asset_key) + if asset_graph.get_partitions_def(asset_key) is not None: + materialized_subset = self.materialized_subset.get_partitions_subset( + asset_key, asset_graph + ) + failed_subset = self.failed_and_downstream_subset.get_partitions_subset( + asset_key, asset_graph + ) + requested_subset = self.requested_subset.get_partitions_subset( + asset_key, asset_graph + ) # The failed subset includes partitions that failed and their downstream partitions. # The downstream partitions are not included in the requested subset, so we determine @@ -331,7 +340,7 @@ def _get_status_for_asset_key( return PartitionedAssetBackfillStatus( asset_key, - len(self.target_subset.get_partitions_subset(asset_key)), + len(self.target_subset.get_partitions_subset(asset_key, asset_graph)), { AssetBackfillStatus.MATERIALIZED: len(materialized_subset), AssetBackfillStatus.FAILED: len(failed_subset - materialized_subset), @@ -360,7 +369,7 @@ def _get_status_for_asset_key( return UnpartitionedAssetBackfillStatus(asset_key, None) # Only return back statuses for the assets that still exist in the workspace - topological_order = self.get_targeted_asset_keys_topological_order() + topological_order = self.get_targeted_asset_keys_topological_order(asset_graph) return [_get_status_for_asset_key(asset_key) for asset_key in topological_order] def get_partition_names(self) -> Optional[Sequence[str]]: @@ -382,13 +391,12 @@ def get_partition_names(self) -> Optional[Sequence[str]]: def empty( cls, target_subset: AssetGraphSubset, backfill_start_time: datetime ) -> "AssetBackfillData": - asset_graph = target_subset.asset_graph return cls( target_subset=target_subset, requested_runs_for_target_roots=False, - requested_subset=AssetGraphSubset(asset_graph), - materialized_subset=AssetGraphSubset(asset_graph), - failed_and_downstream_subset=AssetGraphSubset(asset_graph), + requested_subset=AssetGraphSubset(), + materialized_subset=AssetGraphSubset(), + failed_and_downstream_subset=AssetGraphSubset(), latest_storage_id=None, backfill_start_time=backfill_start_time, ) @@ -471,7 +479,6 @@ def from_partitions_by_assets( non_partitioned_asset_keys.add(asset_key) target_subset = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key=partitions_subsets_by_asset_key, non_partitioned_asset_keys=non_partitioned_asset_keys, ) @@ -525,7 +532,6 @@ def from_asset_partitions( root_partitions_subset = root_partitions_def.subset_with_partition_keys(partition_names) target_subset = AssetGraphSubset( - asset_graph, non_partitioned_asset_keys=set(asset_selection) - partitioned_asset_keys, ) for root_asset_key in root_partitioned_asset_keys: @@ -533,7 +539,6 @@ def from_asset_partitions( dynamic_partitions_store, lambda asset_key, _: asset_key in partitioned_asset_keys, AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={root_asset_key: root_partitions_subset}, ), current_time=backfill_start_time, @@ -543,21 +548,23 @@ def from_asset_partitions( return cls.empty(target_subset, backfill_start_time) - def serialize(self, dynamic_partitions_store: DynamicPartitionsStore) -> str: + def serialize( + self, dynamic_partitions_store: DynamicPartitionsStore, asset_graph: AssetGraph + ) -> str: storage_dict = { "requested_runs_for_target_roots": self.requested_runs_for_target_roots, "serialized_target_subset": self.target_subset.to_storage_dict( - dynamic_partitions_store=dynamic_partitions_store + dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph ), "latest_storage_id": self.latest_storage_id, "serialized_requested_subset": self.requested_subset.to_storage_dict( - dynamic_partitions_store=dynamic_partitions_store + dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph ), "serialized_materialized_subset": self.materialized_subset.to_storage_dict( - dynamic_partitions_store=dynamic_partitions_store + dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph ), "serialized_failed_subset": self.failed_and_downstream_subset.to_storage_dict( - dynamic_partitions_store=dynamic_partitions_store + dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph ), } return json.dumps(storage_dict) @@ -700,7 +707,9 @@ def _submit_runs_and_update_backfill_in_chunks( requested_partitions_in_chunk = _get_requested_asset_partitions_from_run_requests( run_requests_chunk, asset_graph, instance_queryer ) - submitted_partitions = submitted_partitions | requested_partitions_in_chunk + submitted_partitions = submitted_partitions | AssetGraphSubset.from_asset_partition_set( + set(requested_partitions_in_chunk), asset_graph=asset_graph + ) # AssetBackfillIterationResult contains the requested subset after all runs are submitted. # Replace this value with just the partitions that have been submitted so far. @@ -713,7 +722,9 @@ def _submit_runs_and_update_backfill_in_chunks( # Refetch, in case the backfill was requested for cancellation in the meantime backfill = cast(PartitionBackfill, instance.get_backfill(backfill_id)) updated_backfill = backfill.with_asset_backfill_data( - backfill_data_with_submitted_runs, dynamic_partitions_store=instance + backfill_data_with_submitted_runs, + dynamic_partitions_store=instance, + asset_graph=asset_graph, ) instance.update_backfill(updated_backfill) @@ -824,7 +835,7 @@ def execute_asset_backfill_iteration( # Refetch, in case the backfill was canceled in the meantime backfill = cast(PartitionBackfill, instance.get_backfill(backfill.backfill_id)) updated_backfill = backfill.with_asset_backfill_data( - updated_asset_backfill_data, dynamic_partitions_store=instance + updated_asset_backfill_data, dynamic_partitions_store=instance, asset_graph=asset_graph ) if updated_asset_backfill_data.is_complete(): # The asset backfill is complete when all runs to be requested have finished (success, @@ -875,7 +886,7 @@ def execute_asset_backfill_iteration( ) updated_backfill = backfill.with_asset_backfill_data( - updated_asset_backfill_data, dynamic_partitions_store=instance + updated_asset_backfill_data, dynamic_partitions_store=instance, asset_graph=asset_graph ) # The asset backfill is successfully canceled when all requested runs have finished (success, # failure, or cancellation). Since the AssetBackfillData object stores materialization states @@ -1028,7 +1039,7 @@ def get_asset_backfill_iteration_materialized_partitions( This function is a generator so we can return control to the daemon and let it heartbeat during expensive operations. """ - recently_materialized_asset_partitions = AssetGraphSubset(asset_graph) + recently_materialized_asset_partitions = AssetGraphSubset() for asset_key in asset_backfill_data.target_subset.asset_keys: records = instance_queryer.instance.get_event_records( EventRecordsFilter( @@ -1044,9 +1055,13 @@ def get_asset_backfill_iteration_materialized_partitions( run_id=record.run_id, tag_key=BACKFILL_ID_TAG, tag_value=backfill_id ) ] - recently_materialized_asset_partitions |= { - AssetKeyPartitionKey(asset_key, record.partition_key) for record in records_in_backfill - } + recently_materialized_asset_partitions |= AssetGraphSubset.from_asset_partition_set( + { + AssetKeyPartitionKey(asset_key, record.partition_key) + for record in records_in_backfill + }, + asset_graph, + ) yield None @@ -1106,8 +1121,8 @@ def execute_asset_backfill_iteration_inner( next_latest_storage_id = instance_queryer.instance.event_log_storage.get_maximum_record_id() - updated_materialized_subset = AssetGraphSubset(asset_graph) - failed_and_downstream_subset = AssetGraphSubset(asset_graph) + updated_materialized_subset = AssetGraphSubset() + failed_and_downstream_subset = AssetGraphSubset() else: target_parent_asset_keys = { parent @@ -1210,7 +1225,8 @@ def execute_asset_backfill_iteration_inner( or request_roots, materialized_subset=updated_materialized_subset, failed_and_downstream_subset=failed_and_downstream_subset, - requested_subset=asset_backfill_data.requested_subset | asset_partitions_to_request, + requested_subset=asset_backfill_data.requested_subset + | AssetGraphSubset.from_asset_partition_set(set(asset_partitions_to_request), asset_graph), backfill_start_time=backfill_start_time, ) yield AssetBackfillIterationResult(run_requests, updated_asset_backfill_data) diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index b947b7d08aa51..81492d36c0472 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -149,16 +149,17 @@ def get_backfill_status_per_asset_key( return [] if self.serialized_asset_backfill_data is not None: + asset_graph = ExternalAssetGraph.from_workspace(workspace) try: asset_backfill_data = AssetBackfillData.from_serialized( self.serialized_asset_backfill_data, - ExternalAssetGraph.from_workspace(workspace), + asset_graph, self.backfill_timestamp, ) except DagsterDefinitionChangedDeserializationError: return [] - return asset_backfill_data.get_backfill_status_per_asset_key() + return asset_backfill_data.get_backfill_status_per_asset_key(asset_graph) else: return [] @@ -190,15 +191,16 @@ def get_target_root_partitions_subset( if self.serialized_asset_backfill_data is not None: try: + asset_graph = ExternalAssetGraph.from_workspace(workspace) asset_backfill_data = AssetBackfillData.from_serialized( self.serialized_asset_backfill_data, - ExternalAssetGraph.from_workspace(workspace), + asset_graph, self.backfill_timestamp, ) except DagsterDefinitionChangedDeserializationError: return None - return asset_backfill_data.get_target_root_partitions_subset() + return asset_backfill_data.get_target_root_partitions_subset(asset_graph) else: return None @@ -323,6 +325,7 @@ def with_asset_backfill_data( self, asset_backfill_data: AssetBackfillData, dynamic_partitions_store: DynamicPartitionsStore, + asset_graph: AssetGraph, ) -> "PartitionBackfill": return PartitionBackfill( status=self.status, @@ -337,7 +340,7 @@ def with_asset_backfill_data( error=self.error, asset_selection=self.asset_selection, serialized_asset_backfill_data=asset_backfill_data.serialize( - dynamic_partitions_store=dynamic_partitions_store + dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph ), ) @@ -375,7 +378,7 @@ def from_asset_partitions( dynamic_partitions_store=dynamic_partitions_store, all_partitions=all_partitions, backfill_start_time=utc_datetime_from_timestamp(backfill_timestamp), - ).serialize(dynamic_partitions_store=dynamic_partitions_store), + ).serialize(dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph), ) @classmethod @@ -399,5 +402,5 @@ def from_partitions_by_assets( dynamic_partitions_store=dynamic_partitions_store, backfill_start_time=utc_datetime_from_timestamp(backfill_timestamp), partitions_by_assets=partitions_by_assets, - ).serialize(dynamic_partitions_store=dynamic_partitions_store), + ).serialize(dynamic_partitions_store=dynamic_partitions_store, asset_graph=asset_graph), ) diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index 004a817218dc3..ece9b10004206 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -432,7 +432,7 @@ def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset: if backfill.is_asset_backfill ] - result = AssetGraphSubset(self.asset_graph) + result = AssetGraphSubset() for asset_backfill in asset_backfills: if asset_backfill.serialized_asset_backfill_data is None: check.failed("Asset backfill missing serialized_asset_backfill_data") diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index 77667b0616a11..6a73f07e36782 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -60,9 +60,12 @@ def repo(): @pytest.fixture( - name="asset_graph_from_assets", params=[AssetGraph.from_assets, to_external_asset_graph] + name="asset_graph_from_assets", + params=[AssetGraph.from_assets, to_external_asset_graph], ) -def asset_graph_from_assets_fixture(request) -> Callable[[List[AssetsDefinition]], AssetGraph]: +def asset_graph_from_assets_fixture( + request, +) -> Callable[[List[AssetsDefinition]], AssetGraph]: return request.param @@ -86,7 +89,12 @@ def asset3(asset1, asset2): assets = [asset0, asset1, asset2, asset3] asset_graph = asset_graph_from_assets(assets) - assert asset_graph.all_asset_keys == {asset0.key, asset1.key, asset2.key, asset3.key} + assert asset_graph.all_asset_keys == { + asset0.key, + asset1.key, + asset2.key, + asset3.key, + } assert not asset_graph.is_partitioned(asset0.key) assert asset_graph.is_partitioned(asset1.key) assert asset_graph.have_same_partitioning(asset1.key, asset2.key) @@ -99,7 +107,9 @@ def asset3(asset1, asset2): assert asset_graph.get_code_version(asset1.key) is None -def test_get_children_partitions_unpartitioned_parent_partitioned_child(asset_graph_from_assets): +def test_get_children_partitions_unpartitioned_parent_partitioned_child( + asset_graph_from_assets, +): @asset def parent(): ... @@ -117,7 +127,9 @@ def child(parent): ) -def test_get_parent_partitions_unpartitioned_child_partitioned_parent(asset_graph_from_assets): +def test_get_parent_partitions_unpartitioned_child_partitioned_parent( + asset_graph_from_assets, +): @asset(partitions_def=StaticPartitionsDefinition(["a", "b"])) def parent(): ... @@ -133,7 +145,10 @@ def child(parent): assert asset_graph.get_parents_partitions( instance, current_time, child.key ).parent_partitions == set( - [AssetKeyPartitionKey(parent.key, "a"), AssetKeyPartitionKey(parent.key, "b")] + [ + AssetKeyPartitionKey(parent.key, "a"), + AssetKeyPartitionKey(parent.key, "b"), + ] ) @@ -293,7 +308,8 @@ def non_subsettable_multi_asset(): def test_required_multi_asset_sets_subsettable_multi_asset(asset_graph_from_assets): @multi_asset( - outs={"a": AssetOut(dagster_type=None), "b": AssetOut(dagster_type=None)}, can_subset=True + outs={"a": AssetOut(dagster_type=None), "b": AssetOut(dagster_type=None)}, + can_subset=True, ) def subsettable_multi_asset(): ... @@ -402,10 +418,9 @@ def include_all(asset_key, partitions_subset): ["2022-01-02", "2022-01-03"] ) initial_asset1_subset = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={asset1.key: initial_partitions_subset} + partitions_subsets_by_asset_key={asset1.key: initial_partitions_subset} ) corresponding_asset3_subset = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ asset3.key: asset3.partitions_def.empty_subset().with_partition_key_range( asset3.partitions_def, @@ -427,12 +442,15 @@ def include_all(asset_key, partitions_subset): def include_none(asset_key, partitions_subset): return False - assert asset_graph.bfs_filter_subsets( - dynamic_partitions_store=MagicMock(), - initial_subset=initial_asset1_subset, - condition_fn=include_none, - current_time=pendulum.now("UTC"), - ) == AssetGraphSubset(asset_graph) + assert ( + asset_graph.bfs_filter_subsets( + dynamic_partitions_store=MagicMock(), + initial_subset=initial_asset1_subset, + condition_fn=include_none, + current_time=pendulum.now("UTC"), + ) + == AssetGraphSubset() + ) def exclude_asset3(asset_key, partitions_subset): return asset_key is not asset3.key @@ -451,7 +469,7 @@ def exclude_asset2(asset_key, partitions_subset): return asset_key is not asset2.key initial_asset0_subset = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={asset0.key: initial_partitions_subset} + partitions_subsets_by_asset_key={asset0.key: initial_partitions_subset} ) assert ( asset_graph.bfs_filter_subsets( @@ -503,7 +521,6 @@ def include_all(asset_key, partitions_subset): ["2022-01-01", "2022-01-02"] ) expected_asset_graph_subset = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ asset0.key: initial_subset, asset1.key: initial_subset, @@ -516,7 +533,6 @@ def include_all(asset_key, partitions_subset): asset_graph.bfs_filter_subsets( dynamic_partitions_store=MagicMock(), initial_subset=AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={asset0.key: initial_subset}, ), condition_fn=include_all, @@ -545,12 +561,7 @@ def unpartitioned1(): def unpartitioned2(): ... - asset_graph = asset_graph_from_assets( - [partitioned1, partitioned2, unpartitioned1, unpartitioned2] - ) - asset_graph_subset = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ partitioned1.key: daily_partitions_def.subset_with_partition_keys(["2022-01-01"]), partitioned2.key: daily_partitions_def.empty_subset(), @@ -588,12 +599,7 @@ def unpartitioned1(): def unpartitioned2(): ... - asset_graph = asset_graph_from_assets( - [partitioned1, partitioned2, unpartitioned1, unpartitioned2] - ) - subset1 = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ partitioned1.key: daily_partitions_def.subset_with_partition_keys( ["2022-01-01", "2022-01-02", "2022-01-03"] @@ -606,7 +612,6 @@ def unpartitioned2(): ) subset2 = AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ partitioned1.key: daily_partitions_def.subset_with_partition_keys( ["2022-01-02", "2022-01-03", "2022-01-04"] @@ -621,7 +626,6 @@ def unpartitioned2(): assert len(list((subset1 - subset1).iterate_asset_partitions())) == 0 assert len(list((subset2 - subset2).iterate_asset_partitions())) == 0 assert subset1 - subset2 == AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ partitioned1.key: daily_partitions_def.subset_with_partition_keys(["2022-01-01"]), partitioned2.key: daily_partitions_def.empty_subset(), @@ -629,7 +633,6 @@ def unpartitioned2(): non_partitioned_asset_keys=set(), ) assert subset2 - subset1 == AssetGraphSubset( - asset_graph, partitions_subsets_by_asset_key={ partitioned1.key: daily_partitions_def.subset_with_partition_keys(["2022-01-04"]), partitioned2.key: daily_partitions_def.subset_with_partition_keys( @@ -691,7 +694,6 @@ def unpartitioned3(): ) ag1_storage_dict = AssetGraphSubset( - get_ag1(), partitions_subsets_by_asset_key={ AssetKey("partitioned1"): daily_partitions_def.subset_with_partition_keys( ["2022-01-01", "2022-01-02", "2022-01-03"] @@ -700,8 +702,11 @@ def unpartitioned3(): ["2022-01-01", "2022-01-02", "2022-01-03"] ), }, - non_partitioned_asset_keys={AssetKey("unpartitioned1"), AssetKey("unpartitioned2")}, - ).to_storage_dict(None) + non_partitioned_asset_keys={ + AssetKey("unpartitioned1"), + AssetKey("unpartitioned2"), + }, + ).to_storage_dict(dynamic_partitions_store=None, asset_graph=get_ag1()) asset_graph2 = get_ag2() assert not AssetGraphSubset.can_deserialize(ag1_storage_dict, asset_graph2) @@ -715,7 +720,6 @@ def unpartitioned3(): ag1_storage_dict, asset_graph=asset_graph2, allow_partial=True ) assert ag2_subset == AssetGraphSubset( - asset_graph2, partitions_subsets_by_asset_key={ AssetKey("partitioned1"): daily_partitions_def.subset_with_partition_keys( ["2022-01-01", "2022-01-02", "2022-01-03"] @@ -765,7 +769,8 @@ def test_required_assets_and_checks_by_key_multi_asset(asset_graph_from_assets): bar_check = AssetCheckSpec(name="bar", asset="asset1") @multi_asset( - outs={"asset0": AssetOut(), "asset1": AssetOut()}, check_specs=[foo_check, bar_check] + outs={"asset0": AssetOut(), "asset1": AssetOut()}, + check_specs=[foo_check, bar_check], ) def asset_fn(): ... @@ -782,15 +787,26 @@ def subsettable_asset_fn(): asset_graph = asset_graph_from_assets([asset_fn, subsettable_asset_fn]) - grouped_keys = [AssetKey(["asset0"]), AssetKey(["asset1"]), foo_check.key, bar_check.key] + grouped_keys = [ + AssetKey(["asset0"]), + AssetKey(["asset1"]), + foo_check.key, + bar_check.key, + ] for key in grouped_keys: assert asset_graph.get_required_asset_and_check_keys(key) == set(grouped_keys) - for key in [AssetKey(["subsettable_asset0"]), AssetKey(["subsettable_asset1"]), biz_check.key]: + for key in [ + AssetKey(["subsettable_asset0"]), + AssetKey(["subsettable_asset1"]), + biz_check.key, + ]: assert asset_graph.get_required_asset_and_check_keys(key) == set() -def test_required_assets_and_checks_by_key_multi_asset_single_asset(asset_graph_from_assets): +def test_required_assets_and_checks_by_key_multi_asset_single_asset( + asset_graph_from_assets, +): foo_check = AssetCheckSpec(name="foo", asset="asset0") bar_check = AssetCheckSpec(name="bar", asset="asset0") diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 91f3a97061087..013df558854a3 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -62,6 +62,7 @@ PARTITION_NAME_TAG, ) from dagster._core.test_utils import instance_for_test +from dagster._serdes import deserialize_value, serialize_value from dagster._seven.compat.pendulum import create_pendulum_time from dagster._utils import Counter, traced_counter from dagster._utils.caching_instance_queryer import CachingInstanceQueryer @@ -851,7 +852,7 @@ def downstream_weekly_partitioned_asset( ], ) - counts = completed_backfill_data.get_backfill_status_per_asset_key() + counts = completed_backfill_data.get_backfill_status_per_asset_key(asset_graph) assert counts[0].asset_key == unpartitioned_upstream_of_partitioned.key assert counts[0].backfill_status == AssetBackfillStatus.MATERIALIZED @@ -904,7 +905,7 @@ def upstream_success(): "fake_id", backfill_data, asset_graph, instance, assets_by_repo_name ) - counts = backfill_data.get_backfill_status_per_asset_key() + counts = backfill_data.get_backfill_status_per_asset_key(asset_graph) assert counts[0].asset_key == upstream_fail.key assert counts[0].partitions_counts_by_status[AssetBackfillStatus.MATERIALIZED] == 0 assert counts[0].partitions_counts_by_status[AssetBackfillStatus.FAILED] == 1 @@ -920,7 +921,7 @@ def upstream_success(): backfill_data = _single_backfill_iteration( "fake_id", backfill_data, asset_graph, instance, assets_by_repo_name ) - counts = backfill_data.get_backfill_status_per_asset_key() + counts = backfill_data.get_backfill_status_per_asset_key(asset_graph) assert counts[0].asset_key == upstream_fail.key assert counts[0].partitions_counts_by_status[AssetBackfillStatus.MATERIALIZED] == 1 assert counts[0].partitions_counts_by_status[AssetBackfillStatus.FAILED] == 0 @@ -960,9 +961,14 @@ def downstream_daily_partitioned_asset( target_subset = backfill_data.target_subset assert target_subset.get_partitions_subset( - upstream_hourly_partitioned_asset.key + upstream_hourly_partitioned_asset.key, asset_graph ).get_partition_keys() == ["2023-01-09-00:00"] - assert len(target_subset.get_partitions_subset(downstream_daily_partitioned_asset.key)) == 0 + assert ( + len( + target_subset.get_partitions_subset(downstream_daily_partitioned_asset.key, asset_graph) + ) + == 0 + ) def test_asset_backfill_throw_error_on_invalid_upstreams(): @@ -1061,13 +1067,13 @@ def downstream_daily_partitioned_asset( assert canceling_backfill_data.have_all_requested_runs_finished() is True assert ( canceling_backfill_data.materialized_subset.get_partitions_subset( - upstream_hourly_partitioned_asset.key + upstream_hourly_partitioned_asset.key, asset_graph ).get_partition_keys() == targeted_partitions ) assert ( canceling_backfill_data.materialized_subset.get_partitions_subset( - downstream_daily_partitioned_asset.key + downstream_daily_partitioned_asset.key, asset_graph ).get_partition_keys() == [] ) @@ -1398,3 +1404,40 @@ def foo_child(): } run_backfill_to_completion(asset_graph, assets_by_repo_name, asset_backfill_data, [], instance) + + +def test_asset_backfill_serialization_deserialization(): + @asset( + partitions_def=DailyPartitionsDefinition("2023-01-01"), + ) + def upstream(): + pass + + @asset + def middle(): + pass + + @asset( + partitions_def=DailyPartitionsDefinition("2023-01-01"), + ) + def downstream(upstream): + pass + + assets_by_repo_name = {"repo": [upstream, downstream, middle]} + asset_graph = get_asset_graph(assets_by_repo_name) + + asset_backfill_data = AssetBackfillData.from_asset_partitions( + asset_graph=asset_graph, + # partition_names=["2023-01-01", "2023-01-02", "2023-01-05"], + partition_names=["2023-01-01"], + # asset_selection=[upstream.key, middle.key, downstream.key], + asset_selection=[upstream.key], + dynamic_partitions_store=MagicMock(), + all_partitions=False, + backfill_start_time=pendulum.datetime(2023, 1, 9, 0, 0, 0), + ) + + assert ( + deserialize_value(serialize_value(asset_backfill_data), AssetBackfillData) + == asset_backfill_data + ) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py index 82bbf49c98fb2..293f7774966d9 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py @@ -370,7 +370,7 @@ def downstream_weekly_partitioned_asset(): fail_asset_partitions=set(), ) - counts = completed_backfill_data.get_backfill_status_per_asset_key() + counts = completed_backfill_data.get_backfill_status_per_asset_key(asset_graph) assert counts[0].asset_key == unpartitioned_upstream_of_partitioned.key assert counts[0].backfill_status == AssetBackfillStatus.MATERIALIZED @@ -452,7 +452,7 @@ def downstream_b(): fail_asset_partitions=set(), ) - counts = completed_backfill_data.get_backfill_status_per_asset_key() + counts = completed_backfill_data.get_backfill_status_per_asset_key(asset_graph) assert counts[0].asset_key == upstream_a.key assert ( diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index 84da523762b98..449e8d4a1cd25 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -307,18 +307,18 @@ def repo(): for i, target in enumerate(self.active_backfill_targets or []): if isinstance(target, Mapping): target_subset = AssetGraphSubset( - asset_graph=repo.asset_graph, + # asset_graph=repo.asset_graph, partitions_subsets_by_asset_key=target, non_partitioned_asset_keys=set(), ) else: target_subset = AssetGraphSubset( - asset_graph=repo.asset_graph, + # asset_graph=repo.asset_graph, partitions_subsets_by_asset_key={}, non_partitioned_asset_keys=target, ) empty_subset = AssetGraphSubset( - asset_graph=repo.asset_graph, + # asset_graph=repo.asset_graph, partitions_subsets_by_asset_key={}, non_partitioned_asset_keys=set(), ) @@ -338,7 +338,7 @@ def repo(): tags={}, backfill_timestamp=test_time.timestamp(), serialized_asset_backfill_data=asset_backfill_data.serialize( - dynamic_partitions_store=instance + dynamic_partitions_store=instance, asset_graph=repo.asset_graph ), ) instance.add_backfill(backfill)