Skip to content

Commit

Permalink
suggestions from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 16, 2023
1 parent 8b03aaf commit 0e3a4fe
Showing 1 changed file with 21 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@
from dagster._serdes import (
whitelist_for_serdes,
)
from dagster._serdes.serdes import FieldSerializer
from dagster._seven.compat.pendulum import (
_IS_PENDULUM_2,
PendulumDateTime,
create_pendulum_time,
to_timezone,
)
from dagster._serdes.serdes import FieldSerializer, deserialize_value, serialize_value
from dagster._serdes.serdes import (
FieldSerializer,
NamedTupleSerializer,
Expand All @@ -48,6 +40,12 @@
pack_value,
unpack_value,
)
from dagster._seven.compat.pendulum import (
_IS_PENDULUM_2,
PendulumDateTime,
create_pendulum_time,
to_timezone,
)
from dagster._utils import utc_datetime_from_timestamp
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
from dagster._utils.schedules import (
Expand Down Expand Up @@ -151,12 +149,15 @@ def dst_safe_strptime(date_string: str, tz: str, fmt: str) -> PendulumDateTime:
dt = dt.add(microseconds=-1)
return dt

# UTCTimestampWithTimezone is used to preserve timezone information when serializing.
# We can't store datetime.isoformat() because it only preserves UTC offsets, which vary depending on
# daylight savings time.

# TimestampWithTimezone is used to preserve IANA timezone information when serializing.
# Serializing with the UTC offset (i.e. via datetime.isoformat) is insufficient because offsets vary
# depending on daylight savings time. This causes timedelta operations to be inexact, since the
# exact timezone is not preserved. To prevent any lossy serialization, ths implementation
# serializes both the datetime float and the IANA timezone.
@whitelist_for_serdes
class UTCTimestampWithTimezone(NamedTuple):
datetime_float: float
class TimestampWithTimezone(NamedTuple):
timestamp: float # Seconds since the Unix epoch
timezone: str


Expand All @@ -170,9 +171,7 @@ def pack(
check.invariant(datetime.tzinfo is not None)
pendulum_datetime = pendulum.instance(datetime, tz=datetime.tzinfo)
return pack_value(
UTCTimestampWithTimezone(
datetime.timestamp(), str(pendulum_datetime.timezone.name)
),
TimestampWithTimezone(datetime.timestamp(), str(pendulum_datetime.timezone.name)),
whitelist_map,
descent_path,
)
Expand All @@ -188,12 +187,12 @@ def unpack(
if serialized_datetime_with_timezone:
unpacked = unpack_value(
serialized_datetime_with_timezone,
UTCTimestampWithTimezone,
TimestampWithTimezone,
whitelist_map,
context,
)
unpacked_datetime = pendulum.instance(
utc_datetime_from_timestamp(unpacked.datetime_float), tz=unpacked.timezone
utc_datetime_from_timestamp(unpacked.timestamp), tz=unpacked.timezone
).in_tz(tz=unpacked.timezone)
check.invariant(unpacked_datetime.tzinfo is not None)
return unpacked_datetime
Expand Down Expand Up @@ -2057,17 +2056,17 @@ def cheap_ends_before(self, dt: datetime, dt_cron_schedule: str) -> bool:
Args:
dt_cron_schedule (str): A cron schedule that dt is on one of the ticks of.
"""
return self._included_time_windows[-1].end.timestamp() <= dt.timestamp()
return self.included_time_windows[-1].end.timestamp() <= dt.timestamp()

@cached_property
def num_partitions(self) -> int:
_num_partitions = self._asdict()["num_partitions"]
if _num_partitions is None:
num_partitions_ = self._asdict()["num_partitions"]
if num_partitions_ is None:
return sum(
len(self.partitions_def.get_partition_keys_in_time_window(time_window))
for time_window in self.included_time_windows
)
return _num_partitions
return num_partitions_

@classmethod
def _num_partitions_from_time_windows(
Expand Down

0 comments on commit 0e3a4fe

Please sign in to comment.