Skip to content

Commit

Permalink
use pendulum datetimes
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 20, 2023
1 parent 96a6b69 commit f5e0097
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
create_pendulum_time,
to_timezone,
)
from dagster._utils import utc_datetime_from_timestamp
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
from dagster._utils.schedules import (
cron_string_iterator,
Expand Down Expand Up @@ -170,6 +169,13 @@ def pack(
if datetime:
check.invariant(datetime.tzinfo is not None)
pendulum_datetime = pendulum.instance(datetime, tz=datetime.tzinfo)
timezone_name = pendulum_datetime.timezone.name

# Calling in_tz will check that the timezone name represents a valid timezone
# and is not an offset
pendulum_datetime = pendulum_datetime.in_tz(timezone_name)
check.invariant(pendulum_datetime.timezone.name is not None)

return pack_value(
TimestampWithTimezone(datetime.timestamp(), str(pendulum_datetime.timezone.name)),
whitelist_map,
Expand All @@ -191,9 +197,7 @@ def unpack(
whitelist_map,
context,
)
unpacked_datetime = pendulum.instance(
utc_datetime_from_timestamp(unpacked.timestamp), tz=unpacked.timezone
).in_tz(tz=unpacked.timezone)
unpacked_datetime = pendulum.from_timestamp(unpacked.timestamp, unpacked.timezone)
check.invariant(unpacked_datetime.tzinfo is not None)
return unpacked_datetime

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ def execute_asset_backfill_iteration(
if not backfill.is_asset_backfill:
check.failed("Backfill must be an asset backfill")

backfill_start_time = utc_datetime_from_timestamp(backfill.backfill_timestamp)
backfill_start_time = pendulum.from_timestamp(backfill.backfill_timestamp, "UTC")
instance_queryer = CachingInstanceQueryer(
instance=instance, asset_graph=asset_graph, evaluation_time=backfill_start_time
)
Expand Down
7 changes: 4 additions & 3 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from enum import Enum
from typing import Mapping, NamedTuple, Optional, Sequence, Union

import pendulum

from dagster import _check as check
from dagster._core.definitions import AssetKey
from dagster._core.definitions.asset_graph import AssetGraph
Expand All @@ -13,7 +15,6 @@
from dagster._core.storage.tags import USER_TAG
from dagster._core.workspace.workspace import IWorkspace
from dagster._serdes import whitelist_for_serdes
from dagster._utils import utc_datetime_from_timestamp
from dagster._utils.error import SerializableErrorInfo

from ..definitions.selector import PartitionsByAssetSelector
Expand Down Expand Up @@ -419,7 +420,7 @@ def from_asset_partitions(
asset_selection=asset_selection,
dynamic_partitions_store=dynamic_partitions_store,
all_partitions=all_partitions,
backfill_start_time=utc_datetime_from_timestamp(backfill_timestamp),
backfill_start_time=pendulum.from_timestamp(backfill_timestamp, tz="UTC"),
)
return cls(
backfill_id=backfill_id,
Expand All @@ -445,7 +446,7 @@ def from_partitions_by_assets(
asset_backfill_data = AssetBackfillData.from_partitions_by_assets(
asset_graph=asset_graph,
dynamic_partitions_store=dynamic_partitions_store,
backfill_start_time=utc_datetime_from_timestamp(backfill_timestamp),
backfill_start_time=pendulum.from_timestamp(backfill_timestamp, tz="UTC"),
partitions_by_assets=partitions_by_assets,
)
return cls(
Expand Down

0 comments on commit f5e0097

Please sign in to comment.