diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index a1f2e9632d5df..b97b4852261fd 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -27,11 +27,10 @@ import dagster._check as check from dagster._annotations import PublicAttr, public from dagster._core.instance import DynamicPartitionsStore -from dagster._serdes import whitelist_for_serdes from dagster._serdes import ( whitelist_for_serdes, ) -from dagster._serdes.serdes import FieldSerializer +from dagster._serdes.serdes import FieldSerializer, deserialize_value, serialize_value from dagster._utils import utc_datetime_from_timestamp from dagster._utils.cached_method import cached_method from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE @@ -1858,7 +1857,25 @@ def resolve(self) -> "TimeWindowPartitionsSubset": ) -@whitelist_for_serdes +class TimeWindowPartitionsDefinitionSerializer(FieldSerializer): + """Serializes a TimeWindowPartitionsDefinition by converting it to a SerializableTimeWindowPartitionsDefinition.""" + + def pack(self, partitions_def: TimeWindowPartitionsDefinition, **_kwargs) -> str: + return serialize_value(partitions_def.to_serializable_time_window_partitions_def()) + + def unpack( + self, + serialized_time_window_partitions_def: str, + **_kwargs, + ) -> TimeWindowPartitionsDefinition: + return deserialize_value( + serialized_time_window_partitions_def, SerializableTimeWindowPartitionsDefinition + ).to_time_window_partitions_def() + + +@whitelist_for_serdes( + field_serializers={"partitions_def": TimeWindowPartitionsDefinitionSerializer} +) class TimeWindowPartitionsSubset( BaseTimeWindowPartitionsSubset, NamedTuple(