Skip to content

Commit

Permalink
make serializer generalizable
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 22, 2023
1 parent 903f0e2 commit 92b58a5
Showing 1 changed file with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,27 @@
from .events import AssetKey, AssetKeyPartitionKey


class AssetGraphSubsetSerializer(NamedTupleSerializer):
def before_pack(self, value: "AssetGraphSubset") -> "AssetGraphSubset":
converted_partitions_subsets_by_asset_key = {}
for k, v in value.partitions_subsets_by_asset_key.items():
if isinstance(v, PartitionKeysTimeWindowPartitionsSubset):
converted_partitions_subsets_by_asset_key[k] = v.to_time_window_partitions_subset()
else:
converted_partitions_subsets_by_asset_key[k] = v
class PartitionsSubsetMappingNamedTupleSerializer(NamedTupleSerializer):
def before_pack(self, value: NamedTuple) -> NamedTuple:
replaced_value_by_field_name = {}
for field_name, field_value in value._asdict().items():
if isinstance(field_value, Mapping) and all(
isinstance(v, PartitionsSubset) for v in field_value.values()
):
subsets_by_key = {
k: v.to_time_window_partitions_subset()
if isinstance(v, PartitionKeysTimeWindowPartitionsSubset)
else v
for k, v in field_value.items()
}
replaced_value_by_field_name[field_name] = SerializableNonScalarKeyMapping(
subsets_by_key
)

return value._replace(
partitions_subsets_by_asset_key=SerializableNonScalarKeyMapping(
converted_partitions_subsets_by_asset_key
)
)
return value._replace(**replaced_value_by_field_name)


@whitelist_for_serdes(serializer=AssetGraphSubsetSerializer)
@whitelist_for_serdes(serializer=PartitionsSubsetMappingNamedTupleSerializer)
class AssetGraphSubset(
NamedTuple(
"_AssetGraphSubset",
Expand Down

0 comments on commit 92b58a5

Please sign in to comment.