Skip to content

Commit

Permalink
key by asset key instead
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 21, 2023
1 parent 5243b75 commit 323344d
Showing 1 changed file with 15 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import operator
from collections import defaultdict
from datetime import datetime
from functools import cached_property
from typing import (
AbstractSet,
Any,
Expand All @@ -26,25 +25,29 @@
DagsterDefinitionChangedDeserializationError,
)
from dagster._core.instance import DynamicPartitionsStore
from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes
from dagster._serdes.serdes import (
NamedTupleSerializer,
SerializableNonScalarKeyMapping,
whitelist_for_serdes,
)

from .asset_graph import AssetGraph
from .events import AssetKey, AssetKeyPartitionKey


class AssetGraphSubsetSerializer(NamedTupleSerializer):
def before_pack(self, value: "AssetGraphSubset") -> "AssetGraphSubset":
converted_partitions_subsets_by_serialized_asset_key = {}
for k, v in value.partitions_subsets_by_serialized_asset_key.items():
converted_partitions_subsets_by_asset_key = {}
for k, v in value.partitions_subsets_by_asset_key.items():
if isinstance(v, PartitionKeysTimeWindowPartitionsSubset):
converted_partitions_subsets_by_serialized_asset_key[
k
] = v.to_time_window_partitions_subset()
converted_partitions_subsets_by_asset_key[k] = v.to_time_window_partitions_subset()
else:
converted_partitions_subsets_by_serialized_asset_key[k] = v
converted_partitions_subsets_by_asset_key[k] = v

return value._replace(
partitions_subsets_by_serialized_asset_key=converted_partitions_subsets_by_serialized_asset_key
partitions_subsets_by_asset_key=SerializableNonScalarKeyMapping(
converted_partitions_subsets_by_asset_key
)
)


Expand All @@ -53,42 +56,22 @@ class AssetGraphSubset(
NamedTuple(
"_AssetGraphSubset",
[
("partitions_subsets_by_serialized_asset_key", Mapping[str, PartitionsSubset]),
("partitions_subsets_by_asset_key", Mapping[AssetKey, PartitionsSubset]),
("non_partitioned_asset_keys", AbstractSet[AssetKey]),
],
)
):
def __new__(
cls,
partitions_subsets_by_serialized_asset_key: Optional[Mapping[str, PartitionsSubset]] = None,
non_partitioned_asset_keys: Optional[AbstractSet[AssetKey]] = None,
partitions_subsets_by_asset_key: Optional[Mapping[AssetKey, PartitionsSubset]] = None,
non_partitioned_asset_keys: Optional[AbstractSet[AssetKey]] = None,
):
check.invariant(
not (partitions_subsets_by_serialized_asset_key and partitions_subsets_by_asset_key),
"Cannot provide both partitions_subsets_by_serialized_asset_key and partitions_subsets_by_asset_key",
)

if partitions_subsets_by_asset_key:
partitions_subsets_by_serialized_asset_key = {
key.to_user_string(): value
for key, value in partitions_subsets_by_asset_key.items()
}

return super(AssetGraphSubset, cls).__new__(
cls,
partitions_subsets_by_serialized_asset_key=partitions_subsets_by_serialized_asset_key
or {},
partitions_subsets_by_asset_key=partitions_subsets_by_asset_key or {},
non_partitioned_asset_keys=non_partitioned_asset_keys or set(),
)

@cached_property
def partitions_subsets_by_asset_key(self) -> Mapping[AssetKey, PartitionsSubset]:
return {
AssetKey.from_user_string(key): value
for key, value in self.partitions_subsets_by_serialized_asset_key.items()
}

@property
def asset_keys(self) -> AbstractSet[AssetKey]:
return {
Expand Down

0 comments on commit 323344d

Please sign in to comment.