Skip to content

Commit

Permalink
use pendulum datetimes
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 21, 2023
1 parent 18f4f88 commit 1b0494c
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
create_pendulum_time,
to_timezone,
)
from dagster._utils import utc_datetime_from_timestamp
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
from dagster._utils.schedules import (
cron_string_iterator,
Expand Down Expand Up @@ -200,9 +199,7 @@ def unpack(
whitelist_map,
context,
)
unpacked_datetime = pendulum.instance(
utc_datetime_from_timestamp(unpacked.timestamp), tz=unpacked.timezone
).in_tz(tz=unpacked.timezone)
unpacked_datetime = pendulum.from_timestamp(unpacked.timestamp, unpacked.timezone)
check.invariant(unpacked_datetime.tzinfo is not None)
return unpacked_datetime

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ def execute_asset_backfill_iteration(
if not backfill.is_asset_backfill:
check.failed("Backfill must be an asset backfill")

backfill_start_time = utc_datetime_from_timestamp(backfill.backfill_timestamp)
backfill_start_time = pendulum.from_timestamp(backfill.backfill_timestamp, "UTC")
instance_queryer = CachingInstanceQueryer(
instance=instance, asset_graph=asset_graph, evaluation_time=backfill_start_time
)
Expand Down
7 changes: 4 additions & 3 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from enum import Enum
from typing import Mapping, NamedTuple, Optional, Sequence, Union

import pendulum

from dagster import _check as check
from dagster._core.definitions import AssetKey
from dagster._core.definitions.asset_graph import AssetGraph
Expand All @@ -13,7 +15,6 @@
from dagster._core.storage.tags import USER_TAG
from dagster._core.workspace.workspace import IWorkspace
from dagster._serdes import whitelist_for_serdes
from dagster._utils import utc_datetime_from_timestamp
from dagster._utils.error import SerializableErrorInfo

from ..definitions.selector import PartitionsByAssetSelector
Expand Down Expand Up @@ -419,7 +420,7 @@ def from_asset_partitions(
asset_selection=asset_selection,
dynamic_partitions_store=dynamic_partitions_store,
all_partitions=all_partitions,
backfill_start_time=utc_datetime_from_timestamp(backfill_timestamp),
backfill_start_time=pendulum.from_timestamp(backfill_timestamp, tz="UTC"),
)
return cls(
backfill_id=backfill_id,
Expand All @@ -445,7 +446,7 @@ def from_partitions_by_assets(
asset_backfill_data = AssetBackfillData.from_partitions_by_assets(
asset_graph=asset_graph,
dynamic_partitions_store=dynamic_partitions_store,
backfill_start_time=utc_datetime_from_timestamp(backfill_timestamp),
backfill_start_time=pendulum.from_timestamp(backfill_timestamp, tz="UTC"),
partitions_by_assets=partitions_by_assets,
)
return cls(
Expand Down

0 comments on commit 1b0494c

Please sign in to comment.