From e9dc40fe3128a85d4f347c63dae897bc26035d2f Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Tue, 7 Nov 2023 14:33:42 -0800 Subject: [PATCH] responding to feedback --- .../definitions/time_window_partitions.py | 43 ++++++------------- .../scenarios/partition_scenarios.py | 16 ++++--- 2 files changed, 23 insertions(+), 36 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index 396c46baa289b..21e7dd36f2885 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -5,6 +5,7 @@ from abc import abstractmethod, abstractproperty from datetime import datetime from enum import Enum +from functools import cached_property from typing import ( AbstractSet, Any, @@ -27,7 +28,6 @@ import dagster._check as check from dagster._annotations import PublicAttr, public from dagster._core.instance import DynamicPartitionsStore -from dagster._utils.cached_method import cached_method from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE from dagster._utils.schedules import ( cron_string_iterator, @@ -1669,8 +1669,7 @@ def with_partition_keys( included_partition_keys=new_partitions, ) - @property - @cached_method + @cached_property def included_time_windows(self) -> Sequence[TimeWindow]: result_time_windows, _ = self._add_partitions_to_time_windows( initial_windows=[], @@ -1683,8 +1682,7 @@ def included_time_windows(self) -> Sequence[TimeWindow]: def partitions_def(self) -> TimeWindowPartitionsDefinition: return self._partitions_def - @property - @cached_method + @cached_property def num_partitions(self) -> int: return len(self._included_partition_keys) @@ -1769,37 +1767,22 @@ def with_partitions_def( included_partition_keys=self._included_partition_keys, ) - def resolve(self) -> "TimeWindowPartitionsSubset": - return TimeWindowPartitionsSubset( - partitions_def=self._partitions_def, - num_partitions=self.num_partitions, - included_time_windows=self.included_time_windows, - ) - class TimeWindowPartitionsSubset(BaseTimeWindowPartitionsSubset): def __init__( self, partitions_def: TimeWindowPartitionsDefinition, - num_partitions: Optional[int] = None, - included_time_windows: Sequence[TimeWindow] = [], + num_partitions: Optional[int], + included_time_windows: Sequence[TimeWindow], ): - check.opt_int_param(num_partitions, "num_partitions") self._partitions_def = check.inst_param( partitions_def, "partitions_def", TimeWindowPartitionsDefinition ) - self._num_partitions = ( - num_partitions - if num_partitions - else self._num_partitions_from_time_windows(partitions_def, included_time_windows) - ) + self._num_partitions = check.opt_int_param(num_partitions, "num_partitions") self._included_time_windows = check.sequence_param( included_time_windows, "included_time_windows", of_type=TimeWindow ) - def get_included_time_windows(self) -> Sequence[TimeWindow]: - return self._included_time_windows - @property def partitions_def(self) -> TimeWindowPartitionsDefinition: return self._partitions_def @@ -1824,13 +1807,15 @@ def cheap_ends_before(self, dt: datetime, dt_cron_schedule: str) -> bool: Args: dt_cron_schedule (str): A cron schedule that dt is on one of the ticks of. """ - if self._included_time_windows is not None: - return self._included_time_windows[-1].end <= dt - - return False + return self._included_time_windows[-1].end <= dt - @property + @cached_property def num_partitions(self) -> int: + if self._num_partitions is None: + return sum( + len(self._partitions_def.get_partition_keys_in_time_window(time_window)) + for time_window in self.included_time_windows + ) return self._num_partitions @property @@ -1861,7 +1846,7 @@ def with_partition_keys(self, partition_keys: Iterable[str]) -> "TimeWindowParti return TimeWindowPartitionsSubset( self._partitions_def, - num_partitions=self._num_partitions + added_partitions, + num_partitions=self.num_partitions + added_partitions, included_time_windows=result_windows, ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py index 14e02d5efee14..a7e4949b1e7eb 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py @@ -131,12 +131,12 @@ active_backfill_targets=[ { AssetKey("daily"): TimeWindowPartitionsSubset( - daily_partitions_def + daily_partitions_def, num_partitions=None, included_time_windows=[] ).with_partition_keys(["2013-01-06"]) }, { AssetKey("hourly"): TimeWindowPartitionsSubset( - hourly_partitions_def + hourly_partitions_def, num_partitions=None, included_time_windows=[] ).with_partition_keys( [ "2013-01-06-01:00", @@ -160,7 +160,7 @@ active_backfill_targets=[ { AssetKey("hourly"): TimeWindowPartitionsSubset( - hourly_partitions_def + hourly_partitions_def, num_partitions=None, included_time_windows=[] ).with_partition_keys( [ "2013-01-05-00:00", @@ -172,7 +172,9 @@ { AssetKey( "non_existant_asset" # ignored since can't be loaded - ): TimeWindowPartitionsSubset(hourly_partitions_def).with_partition_keys( + ): TimeWindowPartitionsSubset( + hourly_partitions_def, num_partitions=None, included_time_windows=[] + ).with_partition_keys( [ "2013-01-05-00:00", ], @@ -193,7 +195,7 @@ active_backfill_targets=[ { AssetKey("hourly"): TimeWindowPartitionsSubset( - hourly_partitions_def + hourly_partitions_def, num_partitions=None, included_time_windows=[] ).with_partition_keys( hourly_partitions_def.get_partition_keys_in_range( PartitionKeyRange(start="2013-01-05-00:00", end="2013-01-07-03:00") @@ -229,7 +231,7 @@ active_backfill_targets=[ { AssetKey("hourly"): TimeWindowPartitionsSubset( - hourly_partitions_def + hourly_partitions_def, num_partitions=None, included_time_windows=[] ).with_partition_keys(["2013-01-05-04:00"]) } ], @@ -287,7 +289,7 @@ active_backfill_targets=[ { AssetKey("hourly"): TimeWindowPartitionsSubset( - hourly_partitions_def, + hourly_partitions_def, num_partitions=None, included_time_windows=[] ).with_partition_keys(["2013-01-05-04:00"]) } ],