Skip to content

Commit

Permalink
create serializable time window partitions def named tuple
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 7, 2023
1 parent 7efff05 commit fb5215e
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import FieldSerializer
from dagster._utils import utc_datetime_from_timestamp
from dagster._utils.cached_method import cached_method
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
from dagster._utils.schedules import (
cron_string_iterator,
Expand Down Expand Up @@ -86,15 +87,52 @@ class TimeWindow(NamedTuple):
@whitelist_for_serdes(
field_serializers={"start": DatetimeFieldSerializer, "end": DatetimeFieldSerializer}
)
class SerializableTimeWindowPartitionsDefinition(
NamedTuple(
"_SerializableTimeWindowPartitionsDefinition",
[
("start", PublicAttr[datetime]),
("fmt", PublicAttr[str]),
("timezone", PublicAttr[str]),
("end", PublicAttr[Optional[datetime]]),
("end_offset", PublicAttr[int]),
("cron_schedule", PublicAttr[str]),
],
)
):
def __new__(
cls,
start: datetime,
fmt: str,
timezone: str,
end: Optional[datetime],
end_offset: int,
cron_schedule: str,
):
return super(SerializableTimeWindowPartitionsDefinition, cls).__new__(
cls, start, fmt, timezone, end, end_offset, cron_schedule
)

def to_time_window_partitions_def(self) -> "TimeWindowPartitionsDefinition":
return TimeWindowPartitionsDefinition(
self.start,
self.fmt,
self.end,
timezone=self.timezone,
end_offset=self.end_offset,
cron_schedule=self.cron_schedule,
)


class TimeWindowPartitionsDefinition(
PartitionsDefinition,
NamedTuple(
"_TimeWindowPartitionsDefinition",
[
("start", PublicAttr[datetime]),
("fmt", PublicAttr[str]),
("timezone", PublicAttr[str]),
("end", PublicAttr[Optional[datetime]]),
("fmt", PublicAttr[str]),
("end_offset", PublicAttr[int]),
("cron_schedule", PublicAttr[str]),
],
Expand Down Expand Up @@ -132,14 +170,14 @@ def __new__(
cls,
start: Union[datetime, str],
fmt: str,
timezone: Optional[str] = None,
end: Union[datetime, str, None] = None,
end_offset: int = 0,
cron_schedule: Optional[str] = None,
schedule_type: Optional[ScheduleType] = None,
timezone: Optional[str] = None,
end_offset: int = 0,
minute_offset: Optional[int] = None,
hour_offset: Optional[int] = None,
day_offset: Optional[int] = None,
cron_schedule: Optional[str] = None,
):
check.opt_str_param(timezone, "timezone")
timezone = timezone or "UTC"
Expand Down Expand Up @@ -185,7 +223,14 @@ def __new__(
)

return super(TimeWindowPartitionsDefinition, cls).__new__(
cls, start_dt, fmt, timezone, end_dt, end_offset, cron_schedule
cls, start_dt, timezone, end_dt, fmt, end_offset, cron_schedule
)

def to_serializable_time_window_partitions_def(
self,
) -> SerializableTimeWindowPartitionsDefinition:
return SerializableTimeWindowPartitionsDefinition(
self.start, self.fmt, self.timezone, self.end, self.end_offset, self.cron_schedule
)

def get_current_timestamp(self, current_time: Optional[datetime] = None) -> float:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,9 @@ def test_time_window_partitions_def_serialization(partitions_def):
timezone=partitions_def.timezone,
end_offset=partitions_def.end_offset,
)
deserialized = deserialize_value(
serialize_value(time_window_partitions_def), TimeWindowPartitionsDefinition
assert (
deserialize_value(
serialize_value(time_window_partitions_def.to_serializable_time_window_partitions_def())
).to_time_window_partitions_def()
== time_window_partitions_def
)
assert deserialized == time_window_partitions_def

0 comments on commit fb5215e

Please sign in to comment.