Skip to content

Commit

Permalink
TimeWindowPartitionsDefinitionSerializer
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 6, 2023
1 parent 584ac0b commit dbaca7d
Showing 1 changed file with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@
import dagster._check as check
from dagster._annotations import PublicAttr, public
from dagster._core.instance import DynamicPartitionsStore
from dagster._serdes import whitelist_for_serdes
from dagster._serdes import (
whitelist_for_serdes,
)
from dagster._serdes.serdes import FieldSerializer
from dagster._serdes.serdes import FieldSerializer, deserialize_value, serialize_value
from dagster._utils import utc_datetime_from_timestamp
from dagster._utils.cached_method import cached_method
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
Expand Down Expand Up @@ -1858,7 +1857,25 @@ def resolve(self) -> "TimeWindowPartitionsSubset":
)


@whitelist_for_serdes
class TimeWindowPartitionsDefinitionSerializer(FieldSerializer):
"""Serializes a TimeWindowPartitionsDefinition by converting it to a SerializableTimeWindowPartitionsDefinition."""

def pack(self, partitions_def: TimeWindowPartitionsDefinition, **_kwargs) -> str:
return serialize_value(partitions_def.to_serializable_time_window_partitions_def())

def unpack(
self,
serialized_time_window_partitions_def: str,
**_kwargs,
) -> TimeWindowPartitionsDefinition:
return deserialize_value(
serialized_time_window_partitions_def, SerializableTimeWindowPartitionsDefinition
).to_time_window_partitions_def()


@whitelist_for_serdes(
field_serializers={"partitions_def": TimeWindowPartitionsDefinitionSerializer}
)
class TimeWindowPartitionsSubset(
BaseTimeWindowPartitionsSubset,
NamedTuple(
Expand Down

0 comments on commit dbaca7d

Please sign in to comment.