diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py index fefe14d8cedb1..ae025020c72b7 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py @@ -1,7 +1,6 @@ import operator from collections import defaultdict from datetime import datetime -from functools import cached_property from typing import ( AbstractSet, Any, @@ -26,7 +25,11 @@ DagsterDefinitionChangedDeserializationError, ) from dagster._core.instance import DynamicPartitionsStore -from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes +from dagster._serdes.serdes import ( + NamedTupleSerializer, + SerializableNonScalarKeyMapping, + whitelist_for_serdes, +) from .asset_graph import AssetGraph from .events import AssetKey, AssetKeyPartitionKey @@ -34,17 +37,17 @@ class AssetGraphSubsetSerializer(NamedTupleSerializer): def before_pack(self, value: "AssetGraphSubset") -> "AssetGraphSubset": - converted_partitions_subsets_by_serialized_asset_key = {} - for k, v in value.partitions_subsets_by_serialized_asset_key.items(): + converted_partitions_subsets_by_asset_key = {} + for k, v in value.partitions_subsets_by_asset_key.items(): if isinstance(v, PartitionKeysTimeWindowPartitionsSubset): - converted_partitions_subsets_by_serialized_asset_key[ - k - ] = v.to_time_window_partitions_subset() + converted_partitions_subsets_by_asset_key[k] = v.to_time_window_partitions_subset() else: - converted_partitions_subsets_by_serialized_asset_key[k] = v + converted_partitions_subsets_by_asset_key[k] = v return value._replace( - partitions_subsets_by_serialized_asset_key=converted_partitions_subsets_by_serialized_asset_key + partitions_subsets_by_asset_key=SerializableNonScalarKeyMapping( + converted_partitions_subsets_by_asset_key + ) ) @@ -53,42 +56,22 @@ class AssetGraphSubset( NamedTuple( "_AssetGraphSubset", [ - ("partitions_subsets_by_serialized_asset_key", Mapping[str, PartitionsSubset]), + ("partitions_subsets_by_asset_key", Mapping[AssetKey, PartitionsSubset]), ("non_partitioned_asset_keys", AbstractSet[AssetKey]), ], ) ): def __new__( cls, - partitions_subsets_by_serialized_asset_key: Optional[Mapping[str, PartitionsSubset]] = None, - non_partitioned_asset_keys: Optional[AbstractSet[AssetKey]] = None, partitions_subsets_by_asset_key: Optional[Mapping[AssetKey, PartitionsSubset]] = None, + non_partitioned_asset_keys: Optional[AbstractSet[AssetKey]] = None, ): - check.invariant( - not (partitions_subsets_by_serialized_asset_key and partitions_subsets_by_asset_key), - "Cannot provide both partitions_subsets_by_serialized_asset_key and partitions_subsets_by_asset_key", - ) - - if partitions_subsets_by_asset_key: - partitions_subsets_by_serialized_asset_key = { - key.to_user_string(): value - for key, value in partitions_subsets_by_asset_key.items() - } - return super(AssetGraphSubset, cls).__new__( cls, - partitions_subsets_by_serialized_asset_key=partitions_subsets_by_serialized_asset_key - or {}, + partitions_subsets_by_asset_key=partitions_subsets_by_asset_key or {}, non_partitioned_asset_keys=non_partitioned_asset_keys or set(), ) - @cached_property - def partitions_subsets_by_asset_key(self) -> Mapping[AssetKey, PartitionsSubset]: - return { - AssetKey.from_user_string(key): value - for key, value in self.partitions_subsets_by_serialized_asset_key.items() - } - @property def asset_keys(self) -> AbstractSet[AssetKey]: return {