Skip to content

Commit

Permalink
rename to PartitionKeysTimeWindowPartitionsSubset
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 9, 2023
1 parent e9dc40f commit af983ea
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ def less_than(self, partition_key1: str, partition_key2: str) -> bool:

@property
def partitions_subset_class(self) -> Type["PartitionsSubset"]:
return TimePartitionKeyPartitionsSubset
return PartitionKeysTimeWindowPartitionsSubset

def empty_subset(self) -> "PartitionsSubset":
return self.partitions_subset_class.empty_subset(self)
Expand Down Expand Up @@ -1380,12 +1380,16 @@ def inner(


class BaseTimeWindowPartitionsSubset(PartitionsSubset):
"""A base class that represents PartitionSubsets for TimeWindowPartitionsDefinitions.
Contains shared logic for time window partitions subsets, such as building time windows
from partition keys.
"""

# Every time we change the serialization format, we should increment the version number.
# This will ensure that we can gracefully degrade when deserializing old data.
SERIALIZATION_VERSION = 1

@property
@abstractmethod
@abstractproperty
def included_time_windows(self) -> Sequence[TimeWindow]:
...

Expand Down Expand Up @@ -1647,7 +1651,11 @@ def __eq__(self, other):
)


class TimePartitionKeyPartitionsSubset(BaseTimeWindowPartitionsSubset):
class PartitionKeysTimeWindowPartitionsSubset(BaseTimeWindowPartitionsSubset):
"""A PartitionsSubset for a TimeWindowPartitionsDefinition, which internally represents the
included partitions using strings.
"""

def __init__(
self,
partitions_def: TimeWindowPartitionsDefinition,
Expand All @@ -1664,7 +1672,7 @@ def with_partition_keys(
self, partition_keys: Iterable[str]
) -> "BaseTimeWindowPartitionsSubset":
new_partitions = {*(self._included_partition_keys or []), *partition_keys}
return TimePartitionKeyPartitionsSubset(
return PartitionKeysTimeWindowPartitionsSubset(
self._partitions_def,
included_partition_keys=new_partitions,
)
Expand Down Expand Up @@ -1742,10 +1750,10 @@ def __contains__(self, partition_key: str) -> bool:

def __eq__(self, other):
return (
isinstance(other, TimePartitionKeyPartitionsSubset)
isinstance(other, PartitionKeysTimeWindowPartitionsSubset)
and self.partitions_def == other.partitions_def
and self._included_partition_keys == other._included_partition_keys
) or super(TimePartitionKeyPartitionsSubset, self).__eq__(other)
) or super(PartitionKeysTimeWindowPartitionsSubset, self).__eq__(other)

@classmethod
def empty_subset(cls, partitions_def: PartitionsDefinition) -> "PartitionsSubset":
Expand All @@ -1762,13 +1770,17 @@ def with_partitions_def(
"num_partitions would become inaccurate if the partitions_defs had different cron"
" schedules",
)
return TimePartitionKeyPartitionsSubset(
return PartitionKeysTimeWindowPartitionsSubset(
partitions_def=partitions_def,
included_partition_keys=self._included_partition_keys,
)


class TimeWindowPartitionsSubset(BaseTimeWindowPartitionsSubset):
"""A PartitionsSubset for a TimeWindowPartitionsDefinition, which internally represents the
included partitions using TimeWindows.
"""

def __init__(
self,
partitions_def: TimeWindowPartitionsDefinition,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import pytest
from dagster import DailyPartitionsDefinition, MultiPartitionsDefinition, StaticPartitionsDefinition
from dagster import (
DailyPartitionsDefinition,
MultiPartitionsDefinition,
StaticPartitionsDefinition,
)
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsSubset
from dagster._core.definitions.partition import DefaultPartitionsSubset
from dagster._core.definitions.time_window_partitions import (
TimePartitionKeyPartitionsSubset,
PartitionKeysTimeWindowPartitionsSubset,
TimeWindowPartitionsSubset,
)
from dagster._core.errors import DagsterInvalidDeserializationVersionError
Expand Down Expand Up @@ -78,4 +82,4 @@ def test_get_subset_type():
def test_empty_subsets():
assert type(composite.empty_subset()) is MultiPartitionsSubset
assert type(static_partitions.empty_subset()) is DefaultPartitionsSubset
assert type(time_window_partitions.empty_subset()) is TimePartitionKeyPartitionsSubset
assert type(time_window_partitions.empty_subset()) is PartitionKeysTimeWindowPartitionsSubset
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from dagster._check import CheckError
from dagster._core.definitions.time_window_partitions import (
BaseTimeWindowPartitionsSubset,
PartitionKeysTimeWindowPartitionsSubset,
ScheduleType,
TimePartitionKeyPartitionsSubset,
TimeWindow,
TimeWindowPartitionsSubset,
)
Expand All @@ -33,7 +33,8 @@

def time_window(start: str, end: str) -> TimeWindow:
return TimeWindow(
cast(datetime, pendulum.parser.parse(start)), cast(datetime, pendulum.parser.parse(end))
cast(datetime, pendulum.parser.parse(start)),
cast(datetime, pendulum.parser.parse(end)),
)


Expand Down Expand Up @@ -352,7 +353,14 @@ def assert_expected_partition_keys(
datetime(year=2021, month=1, day=1),
1,
create_pendulum_time(2021, 1, 6, 1, 20),
["2021-01-01", "2021-01-02", "2021-01-03", "2021-01-04", "2021-01-05", "2021-01-06"],
[
"2021-01-01",
"2021-01-02",
"2021-01-03",
"2021-01-04",
"2021-01-05",
"2021-01-06",
],
None,
),
(
Expand Down Expand Up @@ -418,7 +426,8 @@ def test_time_partitions_daily_partitions(
)

assert_expected_partition_keys(
partitions_def.get_partition_keys(current_time=current_time), expected_partition_keys
partitions_def.get_partition_keys(current_time=current_time),
expected_partition_keys,
)


Expand Down Expand Up @@ -480,7 +489,8 @@ def test_time_partitions_monthly_partitions(
)

assert_expected_partition_keys(
partitions_def.get_partition_keys(current_time=current_time), expected_partition_keys
partitions_def.get_partition_keys(current_time=current_time),
expected_partition_keys,
)


Expand Down Expand Up @@ -515,7 +525,14 @@ def test_time_partitions_monthly_partitions(
datetime(year=2021, month=1, day=1),
2,
create_pendulum_time(2021, 1, 31, 1, 20),
["2021-01-03", "2021-01-10", "2021-01-17", "2021-01-24", "2021-01-31", "2021-02-07"],
[
"2021-01-03",
"2021-01-10",
"2021-01-17",
"2021-01-24",
"2021-01-31",
"2021-02-07",
],
),
(
datetime(year=2021, month=1, day=1),
Expand All @@ -540,7 +557,8 @@ def test_time_partitions_weekly_partitions(
partitions_def = WeeklyPartitionsDefinition(start_date=start, end_offset=partition_weeks_offset)

assert_expected_partition_keys(
partitions_def.get_partition_keys(current_time=current_time), expected_partition_keys
partitions_def.get_partition_keys(current_time=current_time),
expected_partition_keys,
)


Expand Down Expand Up @@ -674,7 +692,8 @@ def test_time_partitions_hourly_partitions(
)

assert_expected_partition_keys(
partitions_def.get_partition_keys(current_time=current_time), expected_partition_keys
partitions_def.get_partition_keys(current_time=current_time),
expected_partition_keys,
)


Expand Down Expand Up @@ -745,7 +764,7 @@ def test_start_not_aligned():

@pytest.mark.parametrize(
"partitions_subset_class",
[TimePartitionKeyPartitionsSubset, TimeWindowPartitionsSubset],
[PartitionKeysTimeWindowPartitionsSubset, TimeWindowPartitionsSubset],
)
@pytest.mark.parametrize(
"case_str",
Expand Down Expand Up @@ -792,7 +811,8 @@ def test_partition_subset_get_partition_keys_not_in_subset(
)
assert (
cast(
TimeWindowPartitionsSubset, partitions_def.deserialize_subset(subset.serialize())
TimeWindowPartitionsSubset,
partitions_def.deserialize_subset(subset.serialize()),
).included_time_windows
== subset.included_time_windows
)
Expand Down Expand Up @@ -822,7 +842,7 @@ def test_time_partitions_subset_identical_serialization():

@pytest.mark.parametrize(
"partitions_subset_class",
[TimePartitionKeyPartitionsSubset, TimeWindowPartitionsSubset],
[PartitionKeysTimeWindowPartitionsSubset, TimeWindowPartitionsSubset],
)
@pytest.mark.parametrize(
"initial, added",
Expand Down Expand Up @@ -1231,7 +1251,10 @@ def test_has_partition_key():
),
(
WeeklyPartitionsDefinition(start_date="2022-01-01", end_date="2022-02-01"),
["2022-01-02", "2022-01-09"], # 2022-01-02 is Sunday, weekly cron starts from Sunday
[
"2022-01-02",
"2022-01-09",
], # 2022-01-02 is Sunday, weekly cron starts from Sunday
["2022-01-23", "2022-01-30"],
4,
"%Y-%m-%d",
Expand Down

0 comments on commit af983ea

Please sign in to comment.