Skip to content

Commit

Permalink
responding to feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 7, 2023
1 parent a1344c0 commit e9dc40f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from abc import abstractmethod, abstractproperty
from datetime import datetime
from enum import Enum
from functools import cached_property
from typing import (
AbstractSet,
Any,
Expand All @@ -27,7 +28,6 @@
import dagster._check as check
from dagster._annotations import PublicAttr, public
from dagster._core.instance import DynamicPartitionsStore
from dagster._utils.cached_method import cached_method
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
from dagster._utils.schedules import (
cron_string_iterator,
Expand Down Expand Up @@ -1669,8 +1669,7 @@ def with_partition_keys(
included_partition_keys=new_partitions,
)

@property
@cached_method
@cached_property
def included_time_windows(self) -> Sequence[TimeWindow]:
result_time_windows, _ = self._add_partitions_to_time_windows(
initial_windows=[],
Expand All @@ -1683,8 +1682,7 @@ def included_time_windows(self) -> Sequence[TimeWindow]:
def partitions_def(self) -> TimeWindowPartitionsDefinition:
return self._partitions_def

@property
@cached_method
@cached_property
def num_partitions(self) -> int:
return len(self._included_partition_keys)

Expand Down Expand Up @@ -1769,37 +1767,22 @@ def with_partitions_def(
included_partition_keys=self._included_partition_keys,
)

def resolve(self) -> "TimeWindowPartitionsSubset":
return TimeWindowPartitionsSubset(
partitions_def=self._partitions_def,
num_partitions=self.num_partitions,
included_time_windows=self.included_time_windows,
)


class TimeWindowPartitionsSubset(BaseTimeWindowPartitionsSubset):
def __init__(
self,
partitions_def: TimeWindowPartitionsDefinition,
num_partitions: Optional[int] = None,
included_time_windows: Sequence[TimeWindow] = [],
num_partitions: Optional[int],
included_time_windows: Sequence[TimeWindow],
):
check.opt_int_param(num_partitions, "num_partitions")
self._partitions_def = check.inst_param(
partitions_def, "partitions_def", TimeWindowPartitionsDefinition
)
self._num_partitions = (
num_partitions
if num_partitions
else self._num_partitions_from_time_windows(partitions_def, included_time_windows)
)
self._num_partitions = check.opt_int_param(num_partitions, "num_partitions")
self._included_time_windows = check.sequence_param(
included_time_windows, "included_time_windows", of_type=TimeWindow
)

def get_included_time_windows(self) -> Sequence[TimeWindow]:
return self._included_time_windows

@property
def partitions_def(self) -> TimeWindowPartitionsDefinition:
return self._partitions_def
Expand All @@ -1824,13 +1807,15 @@ def cheap_ends_before(self, dt: datetime, dt_cron_schedule: str) -> bool:
Args:
dt_cron_schedule (str): A cron schedule that dt is on one of the ticks of.
"""
if self._included_time_windows is not None:
return self._included_time_windows[-1].end <= dt

return False
return self._included_time_windows[-1].end <= dt

@property
@cached_property
def num_partitions(self) -> int:
if self._num_partitions is None:
return sum(
len(self._partitions_def.get_partition_keys_in_time_window(time_window))
for time_window in self.included_time_windows
)
return self._num_partitions

@property
Expand Down Expand Up @@ -1861,7 +1846,7 @@ def with_partition_keys(self, partition_keys: Iterable[str]) -> "TimeWindowParti

return TimeWindowPartitionsSubset(
self._partitions_def,
num_partitions=self._num_partitions + added_partitions,
num_partitions=self.num_partitions + added_partitions,
included_time_windows=result_windows,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@
active_backfill_targets=[
{
AssetKey("daily"): TimeWindowPartitionsSubset(
daily_partitions_def
daily_partitions_def, num_partitions=None, included_time_windows=[]
).with_partition_keys(["2013-01-06"])
},
{
AssetKey("hourly"): TimeWindowPartitionsSubset(
hourly_partitions_def
hourly_partitions_def, num_partitions=None, included_time_windows=[]
).with_partition_keys(
[
"2013-01-06-01:00",
Expand All @@ -160,7 +160,7 @@
active_backfill_targets=[
{
AssetKey("hourly"): TimeWindowPartitionsSubset(
hourly_partitions_def
hourly_partitions_def, num_partitions=None, included_time_windows=[]
).with_partition_keys(
[
"2013-01-05-00:00",
Expand All @@ -172,7 +172,9 @@
{
AssetKey(
"non_existant_asset" # ignored since can't be loaded
): TimeWindowPartitionsSubset(hourly_partitions_def).with_partition_keys(
): TimeWindowPartitionsSubset(
hourly_partitions_def, num_partitions=None, included_time_windows=[]
).with_partition_keys(
[
"2013-01-05-00:00",
],
Expand All @@ -193,7 +195,7 @@
active_backfill_targets=[
{
AssetKey("hourly"): TimeWindowPartitionsSubset(
hourly_partitions_def
hourly_partitions_def, num_partitions=None, included_time_windows=[]
).with_partition_keys(
hourly_partitions_def.get_partition_keys_in_range(
PartitionKeyRange(start="2013-01-05-00:00", end="2013-01-07-03:00")
Expand Down Expand Up @@ -229,7 +231,7 @@
active_backfill_targets=[
{
AssetKey("hourly"): TimeWindowPartitionsSubset(
hourly_partitions_def
hourly_partitions_def, num_partitions=None, included_time_windows=[]
).with_partition_keys(["2013-01-05-04:00"])
}
],
Expand Down Expand Up @@ -287,7 +289,7 @@
active_backfill_targets=[
{
AssetKey("hourly"): TimeWindowPartitionsSubset(
hourly_partitions_def,
hourly_partitions_def, num_partitions=None, included_time_windows=[]
).with_partition_keys(["2013-01-05-04:00"])
}
],
Expand Down

0 comments on commit e9dc40f

Please sign in to comment.