From 5335af4a5f8176057f44e7c690dc027f77aff225 Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Tue, 7 Nov 2023 15:49:17 -0800 Subject: [PATCH] switch to using asdict --- .../implementation/fetch_assets.py | 4 +- .../_core/definitions/asset_daemon_cursor.py | 2 +- .../_core/definitions/asset_graph_subset.py | 4 +- .../dagster/_core/definitions/data_time.py | 2 +- .../dagster/_core/definitions/partition.py | 27 +++--- .../_core/definitions/partition_mapping.py | 14 ++- .../time_window_partition_mapping.py | 10 +- .../definitions/time_window_partitions.py | 91 ++++++++++--------- .../dagster/_core/execution/context/input.py | 5 +- .../test_time_window_partitions.py | 12 +-- 10 files changed, 82 insertions(+), 89 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py index 3eeae87dc6de2..aab52f4b9cd34 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py @@ -469,7 +469,7 @@ def build_partition_statuses( graphene_ranges = [] for r in ranges: partition_key_range = cast( - TimeWindowPartitionsDefinition, materialized_partitions_subset.get_partitions_def() + TimeWindowPartitionsDefinition, materialized_partitions_subset.partitions_def ).get_partition_key_range_for_time_window(r.time_window) graphene_ranges.append( GrapheneTimePartitionRangeStatus( @@ -520,7 +520,7 @@ def get_2d_run_length_encoded_partitions( partitions_defs = set( [ - subset.get_partitions_def() + subset.partitions_def for subset in [ materialized_partitions_subset, failed_partitions_subset, diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index 28d29cd02cd3c..f867987a187cc 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -237,7 +237,7 @@ def from_serialized(cls, cursor: str, asset_graph: AssetGraph) -> "AssetDaemonCu and isinstance(partitions_def, TimeWindowPartitionsDefinition) and any( time_window.start < partitions_def.start - for time_window in subset.get_included_time_windows() + for time_window in subset.included_time_windows ) ): subset = partitions_def.empty_subset() diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py index bdf4539770d2e..773ac4b159361 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py @@ -94,13 +94,13 @@ def to_storage_dict( for key, value in self.partitions_subsets_by_asset_key.items() }, "serializable_partitions_def_ids_by_asset_key": { - key.to_user_string(): value.get_partitions_def().get_serializable_unique_identifier( + key.to_user_string(): value.partitions_def.get_serializable_unique_identifier( dynamic_partitions_store=dynamic_partitions_store ) for key, value in self.partitions_subsets_by_asset_key.items() }, "partitions_def_class_names_by_asset_key": { - key.to_user_string(): value.get_partitions_def().__class__.__name__ + key.to_user_string(): value.partitions_def.__class__.__name__ for key, value in self.partitions_subsets_by_asset_key.items() }, "non_partitioned_asset_keys": [ diff --git a/python_modules/dagster/dagster/_core/definitions/data_time.py b/python_modules/dagster/dagster/_core/definitions/data_time.py index 8c913c2a8c457..ecd969308d8c5 100644 --- a/python_modules/dagster/dagster/_core/definitions/data_time.py +++ b/python_modules/dagster/dagster/_core/definitions/data_time.py @@ -92,7 +92,7 @@ def _calculate_data_time_partitioned( if not isinstance(partition_subset, BaseTimeWindowPartitionsSubset): check.failed(f"Invalid partition subset {type(partition_subset)}") - sorted_time_windows = sorted(partition_subset.get_included_time_windows()) + sorted_time_windows = sorted(partition_subset.included_time_windows) # no time windows, no data if len(sorted_time_windows) == 0: return None diff --git a/python_modules/dagster/dagster/_core/definitions/partition.py b/python_modules/dagster/dagster/_core/definitions/partition.py index 5f267bbefa45b..542c6838339d2 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition.py +++ b/python_modules/dagster/dagster/_core/definitions/partition.py @@ -1,7 +1,7 @@ import copy import hashlib import json -from abc import ABC, abstractmethod +from abc import ABC, abstractmethod, abstractproperty from collections import defaultdict from datetime import ( datetime, @@ -977,7 +977,7 @@ def with_partition_key_range( dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> "PartitionsSubset[T_str]": return self.with_partition_keys( - self.get_partitions_def().get_partition_keys_in_range( + self.partitions_def.get_partition_keys_in_range( partition_key_range, dynamic_partitions_store=dynamic_partitions_store ) ) @@ -989,22 +989,16 @@ def __or__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]": def __sub__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]": if self is other: - return self.get_partitions_def().empty_subset() - return ( - self.get_partitions_def() - .empty_subset() - .with_partition_keys( - set(self.get_partition_keys()).difference(set(other.get_partition_keys())) - ) + return self.partitions_def.empty_subset() + return self.partitions_def.empty_subset().with_partition_keys( + set(self.get_partition_keys()).difference(set(other.get_partition_keys())) ) def __and__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]": if self is other: return self - return ( - self.get_partitions_def() - .empty_subset() - .with_partition_keys(set(self.get_partition_keys()) & set(other.get_partition_keys())) + return self.partitions_def.empty_subset().with_partition_keys( + set(self.get_partition_keys()) & set(other.get_partition_keys()) ) @abstractmethod @@ -1029,8 +1023,8 @@ def can_deserialize( ) -> bool: ... - @abstractmethod - def get_partitions_def(self) -> PartitionsDefinition[T_str]: + @abstractproperty + def partitions_def(self) -> PartitionsDefinition[T_str]: ... @abstractmethod @@ -1191,7 +1185,8 @@ def can_deserialize( data.get("subset") is not None and data.get("version") == cls.SERIALIZATION_VERSION ) - def get_partitions_def(self) -> PartitionsDefinition[T_str]: + @property + def partitions_def(self) -> PartitionsDefinition[T_str]: return self._partitions_def def __eq__(self, other: object) -> bool: diff --git a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py index f317c11fb3eb0..5a5dddbbc6017 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py @@ -114,7 +114,7 @@ def get_upstream_mapped_partitions_result_for_partitions( if downstream_partitions_subset is None: check.failed("downstream asset is not partitioned") - if downstream_partitions_subset.get_partitions_def() == upstream_partitions_def: + if downstream_partitions_subset.partitions_def == upstream_partitions_def: return UpstreamPartitionsResult(downstream_partitions_subset, []) upstream_partition_keys = set( @@ -141,7 +141,7 @@ def get_downstream_partitions_for_partitions( if upstream_partitions_subset is None: check.failed("upstream asset is not partitioned") - if upstream_partitions_subset.get_partitions_def() == downstream_partitions_def: + if upstream_partitions_subset.partitions_def == downstream_partitions_def: return upstream_partitions_subset upstream_partition_keys = set(upstream_partitions_subset.get_partition_keys()) @@ -218,10 +218,8 @@ def get_downstream_partitions_for_partitions( current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> PartitionsSubset: - last_upstream_partition = ( - upstream_partitions_subset.get_partitions_def().get_last_partition_key( - current_time=current_time, dynamic_partitions_store=dynamic_partitions_store - ) + last_upstream_partition = upstream_partitions_subset.partitions_def.get_last_partition_key( + current_time=current_time, dynamic_partitions_store=dynamic_partitions_store ) if last_upstream_partition and last_upstream_partition in upstream_partitions_subset: return downstream_partitions_def.subset_with_all_partitions( @@ -497,7 +495,7 @@ def get_upstream_mapped_partitions_result_for_partitions( check.failed("downstream asset is not partitioned") result = self._get_dependency_partitions_subset( - cast(MultiPartitionsDefinition, downstream_partitions_subset.get_partitions_def()), + cast(MultiPartitionsDefinition, downstream_partitions_subset.partitions_def), downstream_partitions_subset, cast(MultiPartitionsDefinition, upstream_partitions_def), a_upstream_of_b=False, @@ -521,7 +519,7 @@ def get_downstream_partitions_for_partitions( check.failed("upstream asset is not partitioned") result = self._get_dependency_partitions_subset( - cast(MultiPartitionsDefinition, upstream_partitions_subset.get_partitions_def()), + cast(MultiPartitionsDefinition, upstream_partitions_subset.partitions_def), upstream_partitions_subset, cast(MultiPartitionsDefinition, downstream_partitions_def), a_upstream_of_b=True, diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py index 5fb1c33fc45ed..9d4f0d7ef1dd8 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py @@ -116,7 +116,7 @@ def get_upstream_mapped_partitions_result_for_partitions( check.failed("downstream_partitions_subset must be a BaseTimeWindowPartitionsSubset") return self._map_partitions( - downstream_partitions_subset.get_partitions_def(), + downstream_partitions_subset.partitions_def, upstream_partitions_def, downstream_partitions_subset, start_offset=self.start_offset, @@ -137,7 +137,7 @@ def get_downstream_partitions_for_partitions( if not provided. """ return self._map_partitions( - upstream_partitions_subset.get_partitions_def(), + upstream_partitions_subset.partitions_def, downstream_partitions_def, upstream_partitions_subset, end_offset=-self.start_offset, @@ -200,7 +200,7 @@ def _map_partitions( last_window = to_partitions_def.get_last_partition_window(current_time=current_time) time_windows = [] - for from_partition_time_window in from_partitions_subset.get_included_time_windows(): + for from_partition_time_window in from_partitions_subset.included_time_windows: from_start_dt, from_end_dt = from_partition_time_window offsetted_start_dt = _offsetted_datetime( @@ -363,7 +363,7 @@ def _do_cheap_partition_mapping_if_possible( TimeWindowPartitionsSubset( partitions_def=to_partitions_def, num_partitions=None, - included_time_windows=from_partitions_subset.get_included_time_windows(), + included_time_windows=from_partitions_subset.included_time_windows, ), [], ) @@ -377,7 +377,7 @@ def _do_cheap_partition_mapping_if_possible( else: required_but_nonexistent_partition_keys = [ pk - for time_window in from_partitions_subset.get_included_time_windows() + for time_window in from_partitions_subset.included_time_windows for pk in to_partitions_def.get_partition_keys_in_time_window( time_window=time_window ) diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index c1b2c39d64e83..9b20127068da4 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -1469,16 +1469,16 @@ class BaseTimeWindowPartitionsSubset(PartitionsSubset): # This will ensure that we can gracefully degrade when deserializing old data. SERIALIZATION_VERSION = 1 - @abstractmethod - def get_included_time_windows(self) -> Sequence[TimeWindow]: + @abstractproperty + def included_time_windows(self) -> Sequence[TimeWindow]: ... - @abstractmethod - def get_num_partitions(self) -> int: + @abstractproperty + def num_partitions(self) -> int: ... - @abstractmethod - def get_partitions_def(self) -> TimeWindowPartitionsDefinition: + @abstractproperty + def partitions_def(self) -> TimeWindowPartitionsDefinition: ... def _get_partition_time_windows_not_in_subset( @@ -1489,45 +1489,43 @@ def _get_partition_time_windows_not_in_subset( Each time window is a single partition. """ first_tw = cast( - TimeWindowPartitionsDefinition, self.get_partitions_def() + TimeWindowPartitionsDefinition, self.partitions_def ).get_first_partition_window(current_time=current_time) last_tw = cast( - TimeWindowPartitionsDefinition, self.get_partitions_def() + TimeWindowPartitionsDefinition, self.partitions_def ).get_last_partition_window(current_time=current_time) if not first_tw or not last_tw: check.failed("No partitions found") - if len(self.get_included_time_windows()) == 0: + if len(self.included_time_windows) == 0: return [TimeWindow(first_tw.start, last_tw.end)] time_windows = [] - if first_tw.start < self.get_included_time_windows()[0].start: - time_windows.append( - TimeWindow(first_tw.start, self.get_included_time_windows()[0].start) - ) + if first_tw.start < self.included_time_windows[0].start: + time_windows.append(TimeWindow(first_tw.start, self.included_time_windows[0].start)) - for i in range(len(self.get_included_time_windows()) - 1): - if self.get_included_time_windows()[i].start >= last_tw.end: + for i in range(len(self.included_time_windows) - 1): + if self.included_time_windows[i].start >= last_tw.end: break - if self.get_included_time_windows()[i].end < last_tw.end: - if self.get_included_time_windows()[i + 1].start <= last_tw.end: + if self.included_time_windows[i].end < last_tw.end: + if self.included_time_windows[i + 1].start <= last_tw.end: time_windows.append( TimeWindow( - self.get_included_time_windows()[i].end, - self.get_included_time_windows()[i + 1].start, + self.included_time_windows[i].end, + self.included_time_windows[i + 1].start, ) ) else: time_windows.append( TimeWindow( - self.get_included_time_windows()[i].end, + self.included_time_windows[i].end, last_tw.end, ) ) - if last_tw.end > self.get_included_time_windows()[-1].end: - time_windows.append(TimeWindow(self.get_included_time_windows()[-1].end, last_tw.end)) + if last_tw.end > self.included_time_windows[-1].end: + time_windows.append(TimeWindow(self.included_time_windows[-1].end, last_tw.end)) return time_windows @@ -1540,7 +1538,7 @@ def get_partition_keys_not_in_subset( for tw in self._get_partition_time_windows_not_in_subset(current_time): partition_keys.extend( cast( - TimeWindowPartitionsDefinition, self.get_partitions_def() + TimeWindowPartitionsDefinition, self.partitions_def ).get_partition_keys_in_time_window(tw) ) return partition_keys @@ -1570,9 +1568,9 @@ def get_partition_key_ranges( ) -> Sequence[PartitionKeyRange]: return [ cast( - TimeWindowPartitionsDefinition, self.get_partitions_def() + TimeWindowPartitionsDefinition, self.partitions_def ).get_partition_key_range_for_time_window(window) - for window in self.get_included_time_windows() + for window in self.included_time_windows ] def _add_partitions_to_time_windows( @@ -1586,7 +1584,7 @@ def _add_partitions_to_time_windows( """ result_windows = [*initial_windows] time_windows = cast( - TimeWindowPartitionsDefinition, self.get_partitions_def() + TimeWindowPartitionsDefinition, self.partitions_def ).time_windows_for_partition_keys(frozenset(partition_keys), validate=validate) num_added_partitions = 0 for window in sorted(time_windows): @@ -1638,9 +1636,9 @@ def serialize(self) -> str: # stable serialization between identical subsets "time_windows": [ (window.start.timestamp(), window.end.timestamp()) - for window in self.get_included_time_windows() + for window in self.included_time_windows ], - "num_partitions": self.get_num_partitions(), + "num_partitions": self.num_partitions, } ) @@ -1716,24 +1714,24 @@ def can_deserialize( ) def __len__(self) -> int: - return self.get_num_partitions() + return self.num_partitions def __contains__(self, partition_key: str) -> bool: time_window = cast( - TimeWindowPartitionsDefinition, self.get_partitions_def() + TimeWindowPartitionsDefinition, self.partitions_def ).time_window_for_partition_key(partition_key) return any( time_window.start >= included_time_window.start and time_window.start < included_time_window.end - for included_time_window in self.get_included_time_windows() + for included_time_window in self.included_time_windows ) def __eq__(self, other): return ( isinstance(other, BaseTimeWindowPartitionsSubset) - and self.get_partitions_def() == other.get_partitions_def() - and self.get_included_time_windows() == other.get_included_time_windows() + and self.partitions_def == other.partitions_def + and self.included_time_windows == other.included_time_windows ) @@ -1750,7 +1748,8 @@ def __init__( included_partition_keys, "included_partition_keys", of_type=str ) - def get_partitions_def(self) -> TimeWindowPartitionsDefinition: + @property + def partitions_def(self) -> TimeWindowPartitionsDefinition: return self._partitions_def def with_partition_keys( @@ -1788,11 +1787,11 @@ def first_start(self) -> datetime: next(iter(self._included_partition_keys)) ) else: - if len(self.get_included_time_windows()) == 0: + if len(self.included_time_windows) == 0: check.failed( f"Empty subset. self._included_partition_keys: {self._included_partition_keys}" ) - return self.get_included_time_windows()[0].start + return self.included_time_windows[0].start @property def is_empty(self) -> bool: @@ -1877,7 +1876,6 @@ def unpack( field_serializers={"partitions_def": TimeWindowPartitionsDefinitionSerializer} ) class TimeWindowPartitionsSubset( - BaseTimeWindowPartitionsSubset, NamedTuple( "_TimeWindowPartitionsSubset", [ @@ -1886,6 +1884,7 @@ class TimeWindowPartitionsSubset( ("included_time_windows", Sequence[TimeWindow]), ], ), + BaseTimeWindowPartitionsSubset, ): def __new__( cls, @@ -1912,11 +1911,13 @@ def __new__( included_time_windows=time_windows_with_timezone, ) - def get_included_time_windows(self) -> Sequence[TimeWindow]: - return self.included_time_windows + @property + def included_time_windows(self) -> Sequence[TimeWindow]: + return self._asdict()["included_time_windows"] - def get_partitions_def(self) -> TimeWindowPartitionsDefinition: - return self.partitions_def + @property + def partitions_def(self) -> TimeWindowPartitionsDefinition: + return self._asdict()["partitions_def"] @property def first_start(self) -> datetime: @@ -1940,8 +1941,8 @@ def cheap_ends_before(self, dt: datetime, dt_cron_schedule: str) -> bool: """ return self.included_time_windows[-1].end <= dt - @cached_method - def get_num_partitions(self) -> int: + @cached_property + def num_partitions(self) -> int: if self.num_partitions is None: return sum( len(self.partitions_def.get_partition_keys_in_time_window(time_window)) @@ -1973,7 +1974,7 @@ def with_partition_keys(self, partition_keys: Iterable[str]) -> "TimeWindowParti return TimeWindowPartitionsSubset( self.partitions_def, - num_partitions=self.get_num_partitions() + added_partitions, + num_partitions=self.num_partitions + added_partitions, included_time_windows=result_windows, ) @@ -2127,7 +2128,7 @@ def fetch_flattened_time_window_ranges( flattened_time_window_statuses = [] for status, subset in prioritized_subsets: subset_time_window_statuses = [ - PartitionTimeWindowStatus(tw, status) for tw in subset.get_included_time_windows() + PartitionTimeWindowStatus(tw, status) for tw in subset.included_time_windows ] flattened_time_window_statuses = _flatten( flattened_time_window_statuses, subset_time_window_statuses diff --git a/python_modules/dagster/dagster/_core/execution/context/input.py b/python_modules/dagster/dagster/_core/execution/context/input.py index 42d5318cb3faf..e821e95fdb194 100644 --- a/python_modules/dagster/dagster/_core/execution/context/input.py +++ b/python_modules/dagster/dagster/_core/execution/context/input.py @@ -411,7 +411,7 @@ def asset_partitions_time_window(self) -> TimeWindow: " with time windows.", ) - time_windows = subset.get_included_time_windows() + time_windows = subset.included_time_windows if len(time_windows) != 1: check.failed( "Tried to access asset_partitions_time_window, but there are " @@ -674,7 +674,8 @@ def with_partition_key_range( def serialize(self) -> str: raise NotImplementedError() - def get_partitions_def(self) -> "PartitionsDefinition": + @property + def partitions_def(self) -> "PartitionsDefinition": raise NotImplementedError() def __len__(self) -> int: diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py index a4355b391e013..e7282b381b0b9 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py @@ -787,19 +787,19 @@ def test_partition_subset_get_partition_keys_not_in_subset( assert partition_key in subset assert ( subset.get_partition_keys_not_in_subset( - current_time=partitions_def.end_time_for_partition_key(full_set_keys[-1]), + current_time=partitions_def.end_time_for_partition_key(full_set_keys[-1]) ) == expected_keys_not_in_subset ) assert ( cast( TimeWindowPartitionsSubset, partitions_def.deserialize_subset(subset.serialize()) - ).get_included_time_windows() - == subset.get_included_time_windows() + ).included_time_windows + == subset.included_time_windows ) expected_range_count = case_str.count("-+") + (1 if case_str[0] == "+" else 0) - assert len(subset.get_included_time_windows()) == expected_range_count, case_str + assert len(subset.included_time_windows) == expected_range_count, case_str assert len(subset) == case_str.count("+") @@ -936,9 +936,7 @@ def test_partition_subset_with_partition_keys( expected_range_count = updated_subset_str.count("-+") + ( 1 if updated_subset_str[0] == "+" else 0 ) - assert ( - len(updated_subset.get_included_time_windows()) == expected_range_count - ), updated_subset_str + assert len(updated_subset.included_time_windows) == expected_range_count, updated_subset_str assert len(updated_subset) == updated_subset_str.count("+")