diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index ad2c8c12297ee..b2621c0e9d069 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -5,6 +5,7 @@ from abc import abstractmethod, abstractproperty from datetime import datetime from enum import Enum +from functools import cached_property from typing import ( AbstractSet, Any, @@ -30,7 +31,12 @@ from dagster._serdes import ( whitelist_for_serdes, ) -from dagster._serdes.serdes import FieldSerializer, deserialize_value, serialize_value +from dagster._serdes.serdes import ( + FieldSerializer, + NamedTupleSerializer, + deserialize_value, + serialize_value, +) from dagster._utils import utc_datetime_from_timestamp from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE from dagster._utils.schedules import ( @@ -1814,26 +1820,25 @@ def with_partitions_def( ) -class TimeWindowPartitionsDefinitionSerializer(FieldSerializer): - """Serializes a TimeWindowPartitionsDefinition by converting it to a SerializableTimeWindowPartitionsDefinition.""" - - def pack(self, partitions_def: TimeWindowPartitionsDefinition, **_kwargs) -> str: - return serialize_value(partitions_def.to_serializable_time_window_partitions_def()) - - def unpack( - self, - serialized_time_window_partitions_def: str, - **_kwargs, - ) -> TimeWindowPartitionsDefinition: - return deserialize_value( - serialized_time_window_partitions_def, SerializableTimeWindowPartitionsDefinition - ).to_time_window_partitions_def() +class TimeWindowPartitionsSubsetSerializer(NamedTupleSerializer): + # TimeWindowPartitionsSubsets have custom logic to delay calculating num_partitions until it + # is needed to improve performance. When serializing, we want to serialize the number of + # partitions, so we force calculatation. + def before_pack(self, value: "TimeWindowPartitionsSubset") -> "TimeWindowPartitionsSubset": + if value._asdict()["num_partitions"] is None: + return TimeWindowPartitionsSubset( + partitions_def=value.partitions_def, + num_partitions=value.num_partitions, + included_time_windows=value.included_time_windows, + ) + return value @whitelist_for_serdes( - field_serializers={"partitions_def": TimeWindowPartitionsDefinitionSerializer} + serializer=TimeWindowPartitionsSubsetSerializer, ) class TimeWindowPartitionsSubset( + BaseTimeWindowPartitionsSubset, NamedTuple( "_TimeWindowPartitionsSubset", [ @@ -1842,11 +1847,11 @@ class TimeWindowPartitionsSubset( ("included_time_windows", Sequence[TimeWindow]), ], ), - BaseTimeWindowPartitionsSubset, ): """A PartitionsSubset for a TimeWindowPartitionsDefinition, which internally represents the included partitions using TimeWindows. """ + def __new__( cls, partitions_def: TimeWindowPartitionsDefinition, diff --git a/python_modules/dagster/dagster/_serdes/serdes.py b/python_modules/dagster/dagster/_serdes/serdes.py index 7e249801a60a2..182538307f700 100644 --- a/python_modules/dagster/dagster/_serdes/serdes.py +++ b/python_modules/dagster/dagster/_serdes/serdes.py @@ -518,7 +518,7 @@ def pack( ) -> Dict[str, JsonSerializableValue]: packed: Dict[str, JsonSerializableValue] = {} packed["__class__"] = self.get_storage_name() - for key, inner_value in value._asdict().items(): + for key, inner_value in self.before_pack(value)._asdict().items(): if key in self.skip_when_empty_fields and inner_value in EMPTY_VALUES_TO_SKIP: continue storage_key = self.storage_field_names.get(key, key) @@ -540,6 +540,10 @@ def pack( packed = self.after_pack(**packed) return packed + # Hook: Modify the contents of the named tuple before packing + def before_pack(self, value: T_NamedTuple) -> T_NamedTuple: + return value + # Hook: Modify the contents of the packed, json-serializable dict before it is converted to a # string. def after_pack(self, **packed_dict: JsonSerializableValue) -> Dict[str, JsonSerializableValue]: diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitions_subset.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitions_subset.py index b77c5884176c6..dca4797644170 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitions_subset.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitions_subset.py @@ -114,3 +114,23 @@ def test_time_window_partitions_subset_serialization_deserialization( ) assert deserialized == subset assert deserialized.get_partition_keys() == ["2023-01-01"] + + +def test_time_window_partitions_subset_num_partitions_serialization(): + daily_partitions_def = DailyPartitionsDefinition("2023-01-01") + time_partitions_def = TimeWindowPartitionsDefinition( + start=daily_partitions_def.start, + end=daily_partitions_def.end, + cron_schedule="0 0 * * *", + fmt="%Y-%m-%d", + timezone=daily_partitions_def.timezone, + end_offset=daily_partitions_def.end_offset, + ) + + tw = time_partitions_def.time_window_for_partition_key("2023-01-01") + + subset = TimeWindowPartitionsSubset( + time_partitions_def, num_partitions=None, included_time_windows=[tw] + ) + deserialized = deserialize_value(serialize_value(subset), TimeWindowPartitionsSubset) + assert deserialized._asdict()["num_partitions"] is not None diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py index 52393646d6875..5537fbb5c058a 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py @@ -48,13 +48,10 @@ def my_partitioned_config(_start, _end): assert partitions_def == DailyPartitionsDefinition(start_date="2021-05-05") assert partitions_def.get_next_partition_key("2021-05-05") == "2021-05-06" assert ( - partitions_def.get_last_partition_key(pendulum.parser.parse("2021-05-06")) - == "2021-05-05" + partitions_def.get_last_partition_key(pendulum.parser.parse("2021-05-06")) == "2021-05-05" ) assert ( - partitions_def.get_last_partition_key( - pendulum.parser.parse("2021-05-06").add(minutes=1) - ) + partitions_def.get_last_partition_key(pendulum.parser.parse("2021-05-06").add(minutes=1)) == "2021-05-05" ) assert ( @@ -67,9 +64,7 @@ def my_partitioned_config(_start, _end): assert [ partitions_def.time_window_for_partition_key(key) - for key in partitions_def.get_partition_keys( - datetime.strptime("2021-05-07", DATE_FORMAT) - ) + for key in partitions_def.get_partition_keys(datetime.strptime("2021-05-07", DATE_FORMAT)) ] == [ time_window("2021-05-05", "2021-05-06"), time_window("2021-05-06", "2021-05-07"), @@ -88,9 +83,7 @@ def my_partitioned_config(_start, _end): partitions_def = my_partitioned_config.partitions_def assert [ partitions_def.time_window_for_partition_key(key) - for key in partitions_def.get_partition_keys( - datetime.strptime("2021-05-07", DATE_FORMAT) - ) + for key in partitions_def.get_partition_keys(datetime.strptime("2021-05-07", DATE_FORMAT)) ] == [ time_window("2021-05-05", "2021-05-06"), time_window("2021-05-06", "2021-05-07"), @@ -107,9 +100,7 @@ def my_partitioned_config(_start, _end): partitions_def = my_partitioned_config.partitions_def assert [ partitions_def.time_window_for_partition_key(key) - for key in partitions_def.get_partition_keys( - datetime.strptime("2021-05-07", DATE_FORMAT) - ) + for key in partitions_def.get_partition_keys(datetime.strptime("2021-05-07", DATE_FORMAT)) ] == [ time_window("2021-05-01", "2021-05-02"), time_window("2021-05-02", "2021-05-03"), @@ -124,18 +115,12 @@ def my_partitioned_config(_start, _end): return {} partitions_def = my_partitioned_config.partitions_def - assert partitions_def == DailyPartitionsDefinition( - start_date="2021-05-05", minute_offset=15 - ) + assert partitions_def == DailyPartitionsDefinition(start_date="2021-05-05", minute_offset=15) - partition_keys = partitions_def.get_partition_keys( - datetime.strptime("2021-05-07", DATE_FORMAT) - ) + partition_keys = partitions_def.get_partition_keys(datetime.strptime("2021-05-07", DATE_FORMAT)) assert partition_keys == ["2021-05-05"] - assert [ - partitions_def.time_window_for_partition_key(key) for key in partition_keys - ] == [ + assert [partitions_def.time_window_for_partition_key(key) for key in partition_keys] == [ time_window("2021-05-05T00:15:00", "2021-05-06T00:15:00"), ] @@ -154,9 +139,7 @@ def my_partitioned_config(_start, _end): assert [ partitions_def.time_window_for_partition_key(key) - for key in partitions_def.get_partition_keys( - datetime.strptime("2021-07-03", DATE_FORMAT) - ) + for key in partitions_def.get_partition_keys(datetime.strptime("2021-07-03", DATE_FORMAT)) ] == [ time_window("2021-05-01", "2021-06-01"), time_window("2021-06-01", "2021-07-01"), @@ -175,9 +158,7 @@ def my_partitioned_config(_start, _end): partitions_def = my_partitioned_config.partitions_def assert [ partitions_def.time_window_for_partition_key(key) - for key in partitions_def.get_partition_keys( - datetime.strptime("2021-07-03", DATE_FORMAT) - ) + for key in partitions_def.get_partition_keys(datetime.strptime("2021-07-03", DATE_FORMAT)) ] == [ time_window("2021-05-01", "2021-06-01"), time_window("2021-06-01", "2021-07-01"), @@ -201,17 +182,13 @@ def my_partitioned_config(_start, _end): start_date="2021-05-01", minute_offset=15, hour_offset=3, day_offset=12 ) - partition_keys = partitions_def.get_partition_keys( - datetime.strptime("2021-07-13", DATE_FORMAT) - ) + partition_keys = partitions_def.get_partition_keys(datetime.strptime("2021-07-13", DATE_FORMAT)) assert partition_keys == [ "2021-05-12", "2021-06-12", ] - assert [ - partitions_def.time_window_for_partition_key(key) for key in partition_keys - ] == [ + assert [partitions_def.time_window_for_partition_key(key) for key in partition_keys] == [ time_window("2021-05-12T03:15:00", "2021-06-12T03:15:00"), time_window("2021-06-12T03:15:00", "2021-07-12T03:15:00"), ] @@ -237,16 +214,14 @@ def my_partitioned_config(_start, _end): "2021-05-05-02:00", ] - assert [ - partitions_def.time_window_for_partition_key(key) for key in partition_keys - ] == [ + assert [partitions_def.time_window_for_partition_key(key) for key in partition_keys] == [ time_window("2021-05-05T01:00:00", "2021-05-05T02:00:00"), time_window("2021-05-05T02:00:00", "2021-05-05T03:00:00"), ] - assert partitions_def.time_window_for_partition_key( - "2021-05-05-01:00" - ) == time_window("2021-05-05T01:00:00", "2021-05-05T02:00:00") + assert partitions_def.time_window_for_partition_key("2021-05-05-01:00") == time_window( + "2021-05-05T01:00:00", "2021-05-05T02:00:00" + ) def test_hourly_partitions_with_time_offset(): @@ -267,16 +242,14 @@ def my_partitioned_config(_start, _end): "2021-05-05-02:15", ] - assert [ - partitions_def.time_window_for_partition_key(key) for key in partition_keys - ] == [ + assert [partitions_def.time_window_for_partition_key(key) for key in partition_keys] == [ time_window("2021-05-05T01:15:00", "2021-05-05T02:15:00"), time_window("2021-05-05T02:15:00", "2021-05-05T03:15:00"), ] - assert partitions_def.time_window_for_partition_key( - "2021-05-05-01:00" - ) == time_window("2021-05-05T01:15:00", "2021-05-05T02:15:00") + assert partitions_def.time_window_for_partition_key("2021-05-05-01:00") == time_window( + "2021-05-05T01:15:00", "2021-05-05T02:15:00" + ) def test_weekly_partitions(): @@ -290,9 +263,7 @@ def my_partitioned_config(_start, _end): partitions_def = my_partitioned_config.partitions_def assert [ partitions_def.time_window_for_partition_key(key) - for key in partitions_def.get_partition_keys( - datetime.strptime("2021-05-18", DATE_FORMAT) - ) + for key in partitions_def.get_partition_keys(datetime.strptime("2021-05-18", DATE_FORMAT)) ] == [ time_window("2021-05-02", "2021-05-09"), time_window("2021-05-09", "2021-05-16"), @@ -315,17 +286,13 @@ def my_partitioned_config(_start, _end): start_date="2021-05-01", minute_offset=15, hour_offset=4, day_offset=3 ) - partition_keys = partitions_def.get_partition_keys( - datetime.strptime("2021-05-20", DATE_FORMAT) - ) + partition_keys = partitions_def.get_partition_keys(datetime.strptime("2021-05-20", DATE_FORMAT)) assert partition_keys == [ "2021-05-05", "2021-05-12", ] - assert [ - partitions_def.time_window_for_partition_key(key) for key in partition_keys - ] == [ + assert [partitions_def.time_window_for_partition_key(key) for key in partition_keys] == [ time_window("2021-05-05T04:15:00", "2021-05-12T04:15:00"), time_window("2021-05-12T04:15:00", "2021-05-19T04:15:00"), ] @@ -336,23 +303,15 @@ def my_partitioned_config(_start, _end): def test_partitioned_config_invalid_offsets(): - with pytest.raises( - DagsterInvalidDefinitionError, match="Found invalid cron schedule" - ): + with pytest.raises(DagsterInvalidDefinitionError, match="Found invalid cron schedule"): - @weekly_partitioned_config( - start_date=datetime(year=2021, month=1, day=1), day_offset=8 - ) + @weekly_partitioned_config(start_date=datetime(year=2021, month=1, day=1), day_offset=8) def my_weekly_partitioned_config(_start, _end): return {} - with pytest.raises( - DagsterInvalidDefinitionError, match="Found invalid cron schedule" - ): + with pytest.raises(DagsterInvalidDefinitionError, match="Found invalid cron schedule"): - @monthly_partitioned_config( - start_date=datetime(year=2021, month=1, day=1), day_offset=32 - ) + @monthly_partitioned_config(start_date=datetime(year=2021, month=1, day=1), day_offset=32) def my_monthly_partitioned_config(_start, _end): return {} @@ -360,13 +319,9 @@ def my_monthly_partitioned_config(_start, _end): def assert_expected_partition_keys( generated_partition_keys: Sequence[str], expected_partition_keys: Sequence[str] ): - assert all( - isinstance(generated_key, str) for generated_key in generated_partition_keys - ) + assert all(isinstance(generated_key, str) for generated_key in generated_partition_keys) assert len(generated_partition_keys) == len(expected_partition_keys) - for generated_key, expected_key in zip( - generated_partition_keys, expected_partition_keys - ): + for generated_key, expected_key in zip(generated_partition_keys, expected_partition_keys): assert generated_key == expected_key @@ -600,9 +555,7 @@ def test_time_partitions_weekly_partitions( current_time, expected_partition_keys: Sequence[str], ): - partitions_def = WeeklyPartitionsDefinition( - start_date=start, end_offset=partition_weeks_offset - ) + partitions_def = WeeklyPartitionsDefinition(start_date=start, end_offset=partition_weeks_offset) assert_expected_partition_keys( partitions_def.get_partition_keys(current_time=current_time), @@ -762,13 +715,9 @@ def test_time_partitions_hourly_partitions( ], ], ) -def test_get_partition_keys_in_range( - partitions_def, range_start, range_end, partition_keys -): +def test_get_partition_keys_in_range(partitions_def, range_start, range_end, partition_keys): assert ( - partitions_def.get_partition_keys_in_range( - PartitionKeyRange(range_start, range_end) - ) + partitions_def.get_partition_keys_in_range(PartitionKeyRange(range_start, range_end)) == partition_keys ) @@ -782,9 +731,7 @@ def test_twice_daily_partitions(): assert [ partitions_def.time_window_for_partition_key(key) - for key in partitions_def.get_partition_keys( - datetime.strptime("2021-05-07", DATE_FORMAT) - ) + for key in partitions_def.get_partition_keys(datetime.strptime("2021-05-07", DATE_FORMAT)) ] == [ time_window("2021-05-05T00:00:00", "2021-05-05T11:00:00"), time_window("2021-05-05T11:00:00", "2021-05-06T00:00:00"), @@ -792,12 +739,12 @@ def test_twice_daily_partitions(): time_window("2021-05-06T11:00:00", "2021-05-07T00:00:00"), ] - assert partitions_def.time_window_for_partition_key( - "2021-05-08-00:00" - ) == time_window("2021-05-08T00:00:00", "2021-05-08T11:00:00") - assert partitions_def.time_window_for_partition_key( - "2021-05-08-11:00" - ) == time_window("2021-05-08T11:00:00", "2021-05-09T00:00:00") + assert partitions_def.time_window_for_partition_key("2021-05-08-00:00") == time_window( + "2021-05-08T00:00:00", "2021-05-08T11:00:00" + ) + assert partitions_def.time_window_for_partition_key("2021-05-08-11:00") == time_window( + "2021-05-08T11:00:00", "2021-05-09T00:00:00" + ) def test_start_not_aligned(): @@ -809,9 +756,7 @@ def test_start_not_aligned(): assert [ partitions_def.time_window_for_partition_key(key) - for key in partitions_def.get_partition_keys( - datetime.strptime("2021-05-08", DATE_FORMAT) - ) + for key in partitions_def.get_partition_keys(datetime.strptime("2021-05-08", DATE_FORMAT)) ] == [ time_window("2021-05-05T07:00:00", "2021-05-06T07:00:00"), time_window("2021-05-06T07:00:00", "2021-05-07T07:00:00"), @@ -855,9 +800,7 @@ def test_partition_subset_get_partition_keys_not_in_subset( subset = cast( BaseTimeWindowPartitionsSubset, - partitions_subset_class.empty_subset(partitions_def).with_partition_keys( - subset_keys - ), + partitions_subset_class.empty_subset(partitions_def).with_partition_keys(subset_keys), ) for partition_key in subset_keys: assert partition_key in subset @@ -1053,9 +996,7 @@ def test_time_window_partiitons_deserialize_backwards_compatible(): def test_current_time_window_partitions_serialization(): partitions_def = DailyPartitionsDefinition(start_date="2015-01-01") serialized = ( - partitions_def.empty_subset() - .with_partition_keys(["2015-01-02", "2015-01-04"]) - .serialize() + partitions_def.empty_subset().with_partition_keys(["2015-01-02", "2015-01-04"]).serialize() ) deserialized = partitions_def.deserialize_subset(serialized) assert partitions_def.deserialize_subset(serialized) @@ -1083,39 +1024,25 @@ def test_time_window_partitions_contains(): def test_unique_identifier(): assert ( - DailyPartitionsDefinition( - start_date="2015-01-01" - ).get_serializable_unique_identifier() - != DailyPartitionsDefinition( - start_date="2015-01-02" - ).get_serializable_unique_identifier() + DailyPartitionsDefinition(start_date="2015-01-01").get_serializable_unique_identifier() + != DailyPartitionsDefinition(start_date="2015-01-02").get_serializable_unique_identifier() ) assert ( - DailyPartitionsDefinition( - start_date="2015-01-01" - ).get_serializable_unique_identifier() - == DailyPartitionsDefinition( - start_date="2015-01-01" - ).get_serializable_unique_identifier() + DailyPartitionsDefinition(start_date="2015-01-01").get_serializable_unique_identifier() + == DailyPartitionsDefinition(start_date="2015-01-01").get_serializable_unique_identifier() ) def test_time_window_partition_len(): - partitions_def = HourlyPartitionsDefinition( - start_date="2021-05-05-01:00", minute_offset=15 - ) - assert partitions_def.get_num_partitions() == len( - partitions_def.get_partition_keys() - ) + partitions_def = HourlyPartitionsDefinition(start_date="2021-05-05-01:00", minute_offset=15) + assert partitions_def.get_num_partitions() == len(partitions_def.get_partition_keys()) assert ( partitions_def.get_partition_keys_between_indexes(50, 51) == partitions_def.get_partition_keys()[50:51] ) current_time = datetime.strptime("2021-05-07-03:15", "%Y-%m-%d-%H:%M") assert ( - partitions_def.get_partition_keys_between_indexes( - 50, 51, current_time=current_time - ) + partitions_def.get_partition_keys_between_indexes(50, 51, current_time=current_time) == partitions_def.get_partition_keys(current_time)[50:51] ) @@ -1123,21 +1050,15 @@ def test_time_window_partition_len(): def my_partitioned_config(_start, _end): return {} - partitions_def = cast( - TimeWindowPartitionsDefinition, my_partitioned_config.partitions_def - ) - assert partitions_def.get_num_partitions() == len( - partitions_def.get_partition_keys() - ) + partitions_def = cast(TimeWindowPartitionsDefinition, my_partitioned_config.partitions_def) + assert partitions_def.get_num_partitions() == len(partitions_def.get_partition_keys()) assert ( partitions_def.get_partition_keys_between_indexes(50, 53) == partitions_def.get_partition_keys()[50:53] ) current_time = datetime.strptime("2021-06-23", "%Y-%m-%d") assert ( - partitions_def.get_partition_keys_between_indexes( - 50, 53, current_time=current_time - ) + partitions_def.get_partition_keys_between_indexes(50, 53, current_time=current_time) == partitions_def.get_partition_keys(current_time)[50:53] ) @@ -1147,9 +1068,7 @@ def my_partitioned_config(_start, _end): ) current_time = datetime.strptime("2023-01-21", "%Y-%m-%d") assert ( - weekly_partitions_def.get_partition_keys_between_indexes( - 50, 53, current_time=current_time - ) + weekly_partitions_def.get_partition_keys_between_indexes(50, 53, current_time=current_time) == weekly_partitions_def.get_partition_keys(current_time)[50:53] ) @@ -1157,14 +1076,10 @@ def my_partitioned_config(_start, _end): def my_partitioned_config_2(_start, _end): return {} - partitions_def = cast( - TimeWindowPartitionsDefinition, my_partitioned_config_2.partitions_def - ) + partitions_def = cast(TimeWindowPartitionsDefinition, my_partitioned_config_2.partitions_def) current_time = datetime.strptime("2021-06-20", "%Y-%m-%d") assert ( - partitions_def.get_partition_keys_between_indexes( - 50, 53, current_time=current_time - ) + partitions_def.get_partition_keys_between_indexes(50, 53, current_time=current_time) == partitions_def.get_partition_keys(current_time=current_time)[50:53] ) @@ -1178,14 +1093,10 @@ def test_get_first_partition_window(): start_date="2023-01-01", end_offset=1 ).get_first_partition_window( current_time=datetime.strptime("2023-01-01", "%Y-%m-%d") - ) == time_window( - "2023-01-01", "2023-01-02" - ) + ) == time_window("2023-01-01", "2023-01-02") assert ( - DailyPartitionsDefinition( - start_date="2023-02-15", end_offset=1 - ).get_first_partition_window( + DailyPartitionsDefinition(start_date="2023-02-15", end_offset=1).get_first_partition_window( current_time=datetime.strptime("2023-02-14", "%Y-%m-%d") ) is None @@ -1195,24 +1106,18 @@ def test_get_first_partition_window(): start_date="2023-01-01", end_offset=2 ).get_first_partition_window( current_time=datetime.strptime("2023-01-02", "%Y-%m-%d") - ) == time_window( - "2023-01-01", "2023-01-02" - ) + ) == time_window("2023-01-01", "2023-01-02") assert MonthlyPartitionsDefinition( start_date="2023-01-01", end_offset=1 ).get_first_partition_window( current_time=datetime.strptime("2023-01-15", "%Y-%m-%d") - ) == time_window( - "2023-01-01", "2023-02-01" - ) + ) == time_window("2023-01-01", "2023-02-01") assert ( DailyPartitionsDefinition( start_date="2023-01-15", end_offset=-1 - ).get_first_partition_window( - current_time=datetime.strptime("2023-01-16", "%Y-%m-%d") - ) + ).get_first_partition_window(current_time=datetime.strptime("2023-01-16", "%Y-%m-%d")) is None ) @@ -1220,16 +1125,12 @@ def test_get_first_partition_window(): start_date="2023-01-15", end_offset=-1 ).get_first_partition_window( current_time=datetime.strptime("2023-01-17", "%Y-%m-%d") - ) == time_window( - "2023-01-15", "2023-01-16" - ) + ) == time_window("2023-01-15", "2023-01-16") assert ( DailyPartitionsDefinition( start_date="2023-01-15", end_offset=-2 - ).get_first_partition_window( - current_time=datetime.strptime("2023-01-17", "%Y-%m-%d") - ) + ).get_first_partition_window(current_time=datetime.strptime("2023-01-17", "%Y-%m-%d")) is None ) @@ -1237,23 +1138,17 @@ def test_get_first_partition_window(): start_date="2023-01-15", end_offset=-2 ).get_first_partition_window( current_time=datetime.strptime("2023-01-18", "%Y-%m-%d") - ) == time_window( - "2023-01-15", "2023-01-16" - ) + ) == time_window("2023-01-15", "2023-01-16") assert ( MonthlyPartitionsDefinition( start_date="2023-01-01", end_offset=-1 - ).get_first_partition_window( - current_time=datetime.strptime("2023-01-15", "%Y-%m-%d") - ) + ).get_first_partition_window(current_time=datetime.strptime("2023-01-15", "%Y-%m-%d")) is None ) assert ( - DailyPartitionsDefinition( - start_date="2023-01-15", end_offset=1 - ).get_first_partition_window( + DailyPartitionsDefinition(start_date="2023-01-15", end_offset=1).get_first_partition_window( current_time=datetime.strptime("2023-01-14", "%Y-%m-%d") ) is None @@ -1263,25 +1158,17 @@ def test_get_first_partition_window(): start_date="2023-01-15", end_offset=1 ).get_first_partition_window( current_time=datetime(year=2023, month=1, day=15, hour=12, minute=0, second=0) - ) == time_window( - "2023-01-15", "2023-01-16" - ) + ) == time_window("2023-01-15", "2023-01-16") assert DailyPartitionsDefinition( start_date="2023-01-15", end_offset=1 ).get_first_partition_window( current_time=datetime(year=2023, month=1, day=14, hour=12, minute=0, second=0) - ) == time_window( - "2023-01-15", "2023-01-16" - ) + ) == time_window("2023-01-15", "2023-01-16") assert ( - DailyPartitionsDefinition( - start_date="2023-01-15", end_offset=1 - ).get_first_partition_window( - current_time=datetime( - year=2023, month=1, day=13, hour=12, minute=0, second=0 - ) + DailyPartitionsDefinition(start_date="2023-01-15", end_offset=1).get_first_partition_window( + current_time=datetime(year=2023, month=1, day=13, hour=12, minute=0, second=0) ) is None ) @@ -1289,18 +1176,14 @@ def test_get_first_partition_window(): assert ( MonthlyPartitionsDefinition( start_date="2023-01-01", end_offset=-1 - ).get_first_partition_window( - current_time=datetime.strptime("2023-01-15", "%Y-%m-%d") - ) + ).get_first_partition_window(current_time=datetime.strptime("2023-01-15", "%Y-%m-%d")) is None ) assert ( MonthlyPartitionsDefinition( start_date="2023-01-01", end_offset=-1 - ).get_first_partition_window( - current_time=datetime.strptime("2023-02-01", "%Y-%m-%d") - ) + ).get_first_partition_window(current_time=datetime.strptime("2023-02-01", "%Y-%m-%d")) is None ) @@ -1308,9 +1191,7 @@ def test_get_first_partition_window(): start_date="2023-01-01", end_offset=-1 ).get_first_partition_window( current_time=datetime.strptime("2023-03-01", "%Y-%m-%d") - ) == time_window( - "2023-01-01", "2023-02-01" - ) + ) == time_window("2023-01-01", "2023-02-01") def test_invalid_cron_schedule(): @@ -1355,9 +1236,7 @@ def test_has_partition_key(): "partitions_def,first_partition_window,last_partition_window,number_of_partitions,fmt", [ ( - HourlyPartitionsDefinition( - start_date="2022-01-01-00:00", end_date="2022-02-01-00:00" - ), + HourlyPartitionsDefinition(start_date="2022-01-01-00:00", end_date="2022-02-01-00:00"), ["2022-01-01-00:00", "2022-01-01-01:00"], ["2022-01-31-23:00", "2022-02-01-00:00"], 744, @@ -1397,30 +1276,19 @@ def test_partition_with_end_date( fmt: str, ): first_partition_window_ = TimeWindow( - start=pendulum.instance( - datetime.strptime(first_partition_window[0], fmt), tz="UTC" - ), - end=pendulum.instance( - datetime.strptime(first_partition_window[1], fmt), tz="UTC" - ), + start=pendulum.instance(datetime.strptime(first_partition_window[0], fmt), tz="UTC"), + end=pendulum.instance(datetime.strptime(first_partition_window[1], fmt), tz="UTC"), ) last_partition_window_ = TimeWindow( - start=pendulum.instance( - datetime.strptime(last_partition_window[0], fmt), tz="UTC" - ), - end=pendulum.instance( - datetime.strptime(last_partition_window[1], fmt), tz="UTC" - ), + start=pendulum.instance(datetime.strptime(last_partition_window[0], fmt), tz="UTC"), + end=pendulum.instance(datetime.strptime(last_partition_window[1], fmt), tz="UTC"), ) # get_last_partition_window assert partitions_def.get_last_partition_window() == last_partition_window_ # get_next_partition_window - assert ( - partitions_def.get_next_partition_window(partitions_def.start) - == first_partition_window_ - ) + assert partitions_def.get_next_partition_window(partitions_def.start) == first_partition_window_ assert ( partitions_def.get_next_partition_window(last_partition_window_.start) == last_partition_window_