diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index 21e7dd36f2885..be8ebad01d807 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -734,7 +734,7 @@ def less_than(self, partition_key1: str, partition_key2: str) -> bool: @property def partitions_subset_class(self) -> Type["PartitionsSubset"]: - return TimePartitionKeyPartitionsSubset + return PartitionKeysTimeWindowPartitionsSubset def empty_subset(self) -> "PartitionsSubset": return self.partitions_subset_class.empty_subset(self) @@ -1380,12 +1380,16 @@ def inner( class BaseTimeWindowPartitionsSubset(PartitionsSubset): + """A base class that represents PartitionSubsets for TimeWindowPartitionsDefinitions. + Contains shared logic for time window partitions subsets, such as building time windows + from partition keys. + """ + # Every time we change the serialization format, we should increment the version number. # This will ensure that we can gracefully degrade when deserializing old data. SERIALIZATION_VERSION = 1 - @property - @abstractmethod + @abstractproperty def included_time_windows(self) -> Sequence[TimeWindow]: ... @@ -1647,7 +1651,11 @@ def __eq__(self, other): ) -class TimePartitionKeyPartitionsSubset(BaseTimeWindowPartitionsSubset): +class PartitionKeysTimeWindowPartitionsSubset(BaseTimeWindowPartitionsSubset): + """A PartitionsSubset for a TimeWindowPartitionsDefinition, which internally represents the + included partitions using strings. + """ + def __init__( self, partitions_def: TimeWindowPartitionsDefinition, @@ -1664,7 +1672,7 @@ def with_partition_keys( self, partition_keys: Iterable[str] ) -> "BaseTimeWindowPartitionsSubset": new_partitions = {*(self._included_partition_keys or []), *partition_keys} - return TimePartitionKeyPartitionsSubset( + return PartitionKeysTimeWindowPartitionsSubset( self._partitions_def, included_partition_keys=new_partitions, ) @@ -1742,10 +1750,10 @@ def __contains__(self, partition_key: str) -> bool: def __eq__(self, other): return ( - isinstance(other, TimePartitionKeyPartitionsSubset) + isinstance(other, PartitionKeysTimeWindowPartitionsSubset) and self.partitions_def == other.partitions_def and self._included_partition_keys == other._included_partition_keys - ) or super(TimePartitionKeyPartitionsSubset, self).__eq__(other) + ) or super(PartitionKeysTimeWindowPartitionsSubset, self).__eq__(other) @classmethod def empty_subset(cls, partitions_def: PartitionsDefinition) -> "PartitionsSubset": @@ -1762,13 +1770,17 @@ def with_partitions_def( "num_partitions would become inaccurate if the partitions_defs had different cron" " schedules", ) - return TimePartitionKeyPartitionsSubset( + return PartitionKeysTimeWindowPartitionsSubset( partitions_def=partitions_def, included_partition_keys=self._included_partition_keys, ) class TimeWindowPartitionsSubset(BaseTimeWindowPartitionsSubset): + """A PartitionsSubset for a TimeWindowPartitionsDefinition, which internally represents the + included partitions using TimeWindows. + """ + def __init__( self, partitions_def: TimeWindowPartitionsDefinition, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitions_subset.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitions_subset.py index 104215fa886f1..07abcd4fdb0fc 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitions_subset.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitions_subset.py @@ -1,9 +1,13 @@ import pytest -from dagster import DailyPartitionsDefinition, MultiPartitionsDefinition, StaticPartitionsDefinition +from dagster import ( + DailyPartitionsDefinition, + MultiPartitionsDefinition, + StaticPartitionsDefinition, +) from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsSubset from dagster._core.definitions.partition import DefaultPartitionsSubset from dagster._core.definitions.time_window_partitions import ( - TimePartitionKeyPartitionsSubset, + PartitionKeysTimeWindowPartitionsSubset, TimeWindowPartitionsSubset, ) from dagster._core.errors import DagsterInvalidDeserializationVersionError @@ -78,4 +82,4 @@ def test_get_subset_type(): def test_empty_subsets(): assert type(composite.empty_subset()) is MultiPartitionsSubset assert type(static_partitions.empty_subset()) is DefaultPartitionsSubset - assert type(time_window_partitions.empty_subset()) is TimePartitionKeyPartitionsSubset + assert type(time_window_partitions.empty_subset()) is PartitionKeysTimeWindowPartitionsSubset diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py index 1f57f1ec08918..e843706c99808 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py @@ -20,8 +20,8 @@ from dagster._check import CheckError from dagster._core.definitions.time_window_partitions import ( BaseTimeWindowPartitionsSubset, + PartitionKeysTimeWindowPartitionsSubset, ScheduleType, - TimePartitionKeyPartitionsSubset, TimeWindow, TimeWindowPartitionsSubset, ) @@ -33,7 +33,8 @@ def time_window(start: str, end: str) -> TimeWindow: return TimeWindow( - cast(datetime, pendulum.parser.parse(start)), cast(datetime, pendulum.parser.parse(end)) + cast(datetime, pendulum.parser.parse(start)), + cast(datetime, pendulum.parser.parse(end)), ) @@ -352,7 +353,14 @@ def assert_expected_partition_keys( datetime(year=2021, month=1, day=1), 1, create_pendulum_time(2021, 1, 6, 1, 20), - ["2021-01-01", "2021-01-02", "2021-01-03", "2021-01-04", "2021-01-05", "2021-01-06"], + [ + "2021-01-01", + "2021-01-02", + "2021-01-03", + "2021-01-04", + "2021-01-05", + "2021-01-06", + ], None, ), ( @@ -418,7 +426,8 @@ def test_time_partitions_daily_partitions( ) assert_expected_partition_keys( - partitions_def.get_partition_keys(current_time=current_time), expected_partition_keys + partitions_def.get_partition_keys(current_time=current_time), + expected_partition_keys, ) @@ -480,7 +489,8 @@ def test_time_partitions_monthly_partitions( ) assert_expected_partition_keys( - partitions_def.get_partition_keys(current_time=current_time), expected_partition_keys + partitions_def.get_partition_keys(current_time=current_time), + expected_partition_keys, ) @@ -515,7 +525,14 @@ def test_time_partitions_monthly_partitions( datetime(year=2021, month=1, day=1), 2, create_pendulum_time(2021, 1, 31, 1, 20), - ["2021-01-03", "2021-01-10", "2021-01-17", "2021-01-24", "2021-01-31", "2021-02-07"], + [ + "2021-01-03", + "2021-01-10", + "2021-01-17", + "2021-01-24", + "2021-01-31", + "2021-02-07", + ], ), ( datetime(year=2021, month=1, day=1), @@ -540,7 +557,8 @@ def test_time_partitions_weekly_partitions( partitions_def = WeeklyPartitionsDefinition(start_date=start, end_offset=partition_weeks_offset) assert_expected_partition_keys( - partitions_def.get_partition_keys(current_time=current_time), expected_partition_keys + partitions_def.get_partition_keys(current_time=current_time), + expected_partition_keys, ) @@ -674,7 +692,8 @@ def test_time_partitions_hourly_partitions( ) assert_expected_partition_keys( - partitions_def.get_partition_keys(current_time=current_time), expected_partition_keys + partitions_def.get_partition_keys(current_time=current_time), + expected_partition_keys, ) @@ -745,7 +764,7 @@ def test_start_not_aligned(): @pytest.mark.parametrize( "partitions_subset_class", - [TimePartitionKeyPartitionsSubset, TimeWindowPartitionsSubset], + [PartitionKeysTimeWindowPartitionsSubset, TimeWindowPartitionsSubset], ) @pytest.mark.parametrize( "case_str", @@ -792,7 +811,8 @@ def test_partition_subset_get_partition_keys_not_in_subset( ) assert ( cast( - TimeWindowPartitionsSubset, partitions_def.deserialize_subset(subset.serialize()) + TimeWindowPartitionsSubset, + partitions_def.deserialize_subset(subset.serialize()), ).included_time_windows == subset.included_time_windows ) @@ -822,7 +842,7 @@ def test_time_partitions_subset_identical_serialization(): @pytest.mark.parametrize( "partitions_subset_class", - [TimePartitionKeyPartitionsSubset, TimeWindowPartitionsSubset], + [PartitionKeysTimeWindowPartitionsSubset, TimeWindowPartitionsSubset], ) @pytest.mark.parametrize( "initial, added", @@ -1231,7 +1251,10 @@ def test_has_partition_key(): ), ( WeeklyPartitionsDefinition(start_date="2022-01-01", end_date="2022-02-01"), - ["2022-01-02", "2022-01-09"], # 2022-01-02 is Sunday, weekly cron starts from Sunday + [ + "2022-01-02", + "2022-01-09", + ], # 2022-01-02 is Sunday, weekly cron starts from Sunday ["2022-01-23", "2022-01-30"], 4, "%Y-%m-%d",