From 1b0494cfcf8e7c349a63954b793fb48ca8ead74d Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Mon, 20 Nov 2023 12:00:54 -0800 Subject: [PATCH] use pendulum datetimes --- .../dagster/_core/definitions/time_window_partitions.py | 5 +---- .../dagster/dagster/_core/execution/asset_backfill.py | 2 +- python_modules/dagster/dagster/_core/execution/backfill.py | 7 ++++--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index fd761723893a3..484aaa33f7d36 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -47,7 +47,6 @@ create_pendulum_time, to_timezone, ) -from dagster._utils import utc_datetime_from_timestamp from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE from dagster._utils.schedules import ( cron_string_iterator, @@ -200,9 +199,7 @@ def unpack( whitelist_map, context, ) - unpacked_datetime = pendulum.instance( - utc_datetime_from_timestamp(unpacked.timestamp), tz=unpacked.timezone - ).in_tz(tz=unpacked.timezone) + unpacked_datetime = pendulum.from_timestamp(unpacked.timestamp, unpacked.timezone) check.invariant(unpacked_datetime.tzinfo is not None) return unpacked_datetime diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 9cb7d707d2bb6..1acc4464fde22 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -871,7 +871,7 @@ def execute_asset_backfill_iteration( if not backfill.is_asset_backfill: check.failed("Backfill must be an asset backfill") - backfill_start_time = utc_datetime_from_timestamp(backfill.backfill_timestamp) + backfill_start_time = pendulum.from_timestamp(backfill.backfill_timestamp, "UTC") instance_queryer = CachingInstanceQueryer( instance=instance, asset_graph=asset_graph, evaluation_time=backfill_start_time ) diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index 99382f3f1ab58..2c168fe93f97d 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -1,6 +1,8 @@ from enum import Enum from typing import Mapping, NamedTuple, Optional, Sequence, Union +import pendulum + from dagster import _check as check from dagster._core.definitions import AssetKey from dagster._core.definitions.asset_graph import AssetGraph @@ -13,7 +15,6 @@ from dagster._core.storage.tags import USER_TAG from dagster._core.workspace.workspace import IWorkspace from dagster._serdes import whitelist_for_serdes -from dagster._utils import utc_datetime_from_timestamp from dagster._utils.error import SerializableErrorInfo from ..definitions.selector import PartitionsByAssetSelector @@ -419,7 +420,7 @@ def from_asset_partitions( asset_selection=asset_selection, dynamic_partitions_store=dynamic_partitions_store, all_partitions=all_partitions, - backfill_start_time=utc_datetime_from_timestamp(backfill_timestamp), + backfill_start_time=pendulum.from_timestamp(backfill_timestamp, tz="UTC"), ) return cls( backfill_id=backfill_id, @@ -445,7 +446,7 @@ def from_partitions_by_assets( asset_backfill_data = AssetBackfillData.from_partitions_by_assets( asset_graph=asset_graph, dynamic_partitions_store=dynamic_partitions_store, - backfill_start_time=utc_datetime_from_timestamp(backfill_timestamp), + backfill_start_time=pendulum.from_timestamp(backfill_timestamp, tz="UTC"), partitions_by_assets=partitions_by_assets, ) return cls(