Skip to content

Commit

Permalink
switch to using asdict
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 8, 2023
1 parent fd0613a commit ebb715c
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ def build_partition_statuses(
graphene_ranges = []
for r in ranges:
partition_key_range = cast(
TimeWindowPartitionsDefinition, materialized_partitions_subset.get_partitions_def()
TimeWindowPartitionsDefinition, materialized_partitions_subset.partitions_def
).get_partition_key_range_for_time_window(r.time_window)
graphene_ranges.append(
GrapheneTimePartitionRangeStatus(
Expand Down Expand Up @@ -520,7 +520,7 @@ def get_2d_run_length_encoded_partitions(

partitions_defs = set(
[
subset.get_partitions_def()
subset.partitions_def
for subset in [
materialized_partitions_subset,
failed_partitions_subset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def from_serialized(cls, cursor: str, asset_graph: AssetGraph) -> "AssetDaemonCu
and isinstance(partitions_def, TimeWindowPartitionsDefinition)
and any(
time_window.start < partitions_def.start
for time_window in subset.get_included_time_windows()
for time_window in subset.included_time_windows
)
):
subset = partitions_def.empty_subset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ def to_storage_dict(
for key, value in self.partitions_subsets_by_asset_key.items()
},
"serializable_partitions_def_ids_by_asset_key": {
key.to_user_string(): value.get_partitions_def().get_serializable_unique_identifier(
key.to_user_string(): value.partitions_def.get_serializable_unique_identifier(
dynamic_partitions_store=dynamic_partitions_store
)
for key, value in self.partitions_subsets_by_asset_key.items()
},
"partitions_def_class_names_by_asset_key": {
key.to_user_string(): value.get_partitions_def().__class__.__name__
key.to_user_string(): value.partitions_def.__class__.__name__
for key, value in self.partitions_subsets_by_asset_key.items()
},
"non_partitioned_asset_keys": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def _calculate_data_time_partitioned(
if not isinstance(partition_subset, BaseTimeWindowPartitionsSubset):
check.failed(f"Invalid partition subset {type(partition_subset)}")

sorted_time_windows = sorted(partition_subset.get_included_time_windows())
sorted_time_windows = sorted(partition_subset.included_time_windows)
# no time windows, no data
if len(sorted_time_windows) == 0:
return None
Expand Down
27 changes: 11 additions & 16 deletions python_modules/dagster/dagster/_core/definitions/partition.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import copy
import hashlib
import json
from abc import ABC, abstractmethod
from abc import ABC, abstractmethod, abstractproperty
from collections import defaultdict
from datetime import (
datetime,
Expand Down Expand Up @@ -977,7 +977,7 @@ def with_partition_key_range(
dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
) -> "PartitionsSubset[T_str]":
return self.with_partition_keys(
self.get_partitions_def().get_partition_keys_in_range(
self.partitions_def.get_partition_keys_in_range(
partition_key_range, dynamic_partitions_store=dynamic_partitions_store
)
)
Expand All @@ -989,22 +989,16 @@ def __or__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]":

def __sub__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]":
if self is other:
return self.get_partitions_def().empty_subset()
return (
self.get_partitions_def()
.empty_subset()
.with_partition_keys(
set(self.get_partition_keys()).difference(set(other.get_partition_keys()))
)
return self.partitions_def.empty_subset()
return self.partitions_def.empty_subset().with_partition_keys(
set(self.get_partition_keys()).difference(set(other.get_partition_keys()))
)

def __and__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]":
if self is other:
return self
return (
self.get_partitions_def()
.empty_subset()
.with_partition_keys(set(self.get_partition_keys()) & set(other.get_partition_keys()))
return self.partitions_def.empty_subset().with_partition_keys(
set(self.get_partition_keys()) & set(other.get_partition_keys())
)

@abstractmethod
Expand All @@ -1029,8 +1023,8 @@ def can_deserialize(
) -> bool:
...

@abstractmethod
def get_partitions_def(self) -> PartitionsDefinition[T_str]:
@abstractproperty
def partitions_def(self) -> PartitionsDefinition[T_str]:
...

@abstractmethod
Expand Down Expand Up @@ -1191,7 +1185,8 @@ def can_deserialize(
data.get("subset") is not None and data.get("version") == cls.SERIALIZATION_VERSION
)

def get_partitions_def(self) -> PartitionsDefinition[T_str]:
@property
def partitions_def(self) -> PartitionsDefinition[T_str]:
return self._partitions_def

def __eq__(self, other: object) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def get_upstream_mapped_partitions_result_for_partitions(
if downstream_partitions_subset is None:
check.failed("downstream asset is not partitioned")

if downstream_partitions_subset.get_partitions_def() == upstream_partitions_def:
if downstream_partitions_subset.partitions_def == upstream_partitions_def:
return UpstreamPartitionsResult(downstream_partitions_subset, [])

upstream_partition_keys = set(
Expand All @@ -141,7 +141,7 @@ def get_downstream_partitions_for_partitions(
if upstream_partitions_subset is None:
check.failed("upstream asset is not partitioned")

if upstream_partitions_subset.get_partitions_def() == downstream_partitions_def:
if upstream_partitions_subset.partitions_def == downstream_partitions_def:
return upstream_partitions_subset

upstream_partition_keys = set(upstream_partitions_subset.get_partition_keys())
Expand Down Expand Up @@ -218,10 +218,8 @@ def get_downstream_partitions_for_partitions(
current_time: Optional[datetime] = None,
dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
) -> PartitionsSubset:
last_upstream_partition = (
upstream_partitions_subset.get_partitions_def().get_last_partition_key(
current_time=current_time, dynamic_partitions_store=dynamic_partitions_store
)
last_upstream_partition = upstream_partitions_subset.partitions_def.get_last_partition_key(
current_time=current_time, dynamic_partitions_store=dynamic_partitions_store
)
if last_upstream_partition and last_upstream_partition in upstream_partitions_subset:
return downstream_partitions_def.subset_with_all_partitions(
Expand Down Expand Up @@ -497,7 +495,7 @@ def get_upstream_mapped_partitions_result_for_partitions(
check.failed("downstream asset is not partitioned")

result = self._get_dependency_partitions_subset(
cast(MultiPartitionsDefinition, downstream_partitions_subset.get_partitions_def()),
cast(MultiPartitionsDefinition, downstream_partitions_subset.partitions_def),
downstream_partitions_subset,
cast(MultiPartitionsDefinition, upstream_partitions_def),
a_upstream_of_b=False,
Expand All @@ -521,7 +519,7 @@ def get_downstream_partitions_for_partitions(
check.failed("upstream asset is not partitioned")

result = self._get_dependency_partitions_subset(
cast(MultiPartitionsDefinition, upstream_partitions_subset.get_partitions_def()),
cast(MultiPartitionsDefinition, upstream_partitions_subset.partitions_def),
upstream_partitions_subset,
cast(MultiPartitionsDefinition, downstream_partitions_def),
a_upstream_of_b=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def get_upstream_mapped_partitions_result_for_partitions(
check.failed("downstream_partitions_subset must be a BaseTimeWindowPartitionsSubset")

return self._map_partitions(
downstream_partitions_subset.get_partitions_def(),
downstream_partitions_subset.partitions_def,
upstream_partitions_def,
downstream_partitions_subset,
start_offset=self.start_offset,
Expand All @@ -137,7 +137,7 @@ def get_downstream_partitions_for_partitions(
if not provided.
"""
return self._map_partitions(
upstream_partitions_subset.get_partitions_def(),
upstream_partitions_subset.partitions_def,
downstream_partitions_def,
upstream_partitions_subset,
end_offset=-self.start_offset,
Expand Down Expand Up @@ -200,7 +200,7 @@ def _map_partitions(
last_window = to_partitions_def.get_last_partition_window(current_time=current_time)

time_windows = []
for from_partition_time_window in from_partitions_subset.get_included_time_windows():
for from_partition_time_window in from_partitions_subset.included_time_windows:
from_start_dt, from_end_dt = from_partition_time_window

offsetted_start_dt = _offsetted_datetime(
Expand Down Expand Up @@ -363,7 +363,7 @@ def _do_cheap_partition_mapping_if_possible(
TimeWindowPartitionsSubset(
partitions_def=to_partitions_def,
num_partitions=None,
included_time_windows=from_partitions_subset.get_included_time_windows(),
included_time_windows=from_partitions_subset.included_time_windows,
),
[],
)
Expand All @@ -377,7 +377,7 @@ def _do_cheap_partition_mapping_if_possible(
else:
required_but_nonexistent_partition_keys = [
pk
for time_window in from_partitions_subset.get_included_time_windows()
for time_window in from_partitions_subset.included_time_windows
for pk in to_partitions_def.get_partition_keys_in_time_window(
time_window=time_window
)
Expand Down
Loading

0 comments on commit ebb715c

Please sign in to comment.