Skip to content

Commit

Permalink
first stab
Browse files Browse the repository at this point in the history
continue

time window partitions subset changes

asset backfill serialization

partition mapping update

continue refactor

fix more tests

more test fixes

fix partition mapping tests

adjust test

fix more tests

add tests
  • Loading branch information
clairelin135 committed Nov 6, 2023
1 parent 3e7006e commit 1f9247e
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -519,14 +519,18 @@ def get_2d_run_length_encoded_partitions(
)

if (
not isinstance(materialized_partitions_subset.partitions_def, MultiPartitionsDefinition)
or not isinstance(failed_partitions_subset.partitions_def, MultiPartitionsDefinition)
or not isinstance(in_progress_partitions_subset.partitions_def, MultiPartitionsDefinition)
not isinstance(
materialized_partitions_subset.get_partitions_def(), MultiPartitionsDefinition
)
or not isinstance(failed_partitions_subset.get_partitions_def(), MultiPartitionsDefinition)
or not isinstance(
in_progress_partitions_subset.get_partitions_def(), MultiPartitionsDefinition
)
):
check.failed("Can only fetch 2D run length encoded partitions for multipartitioned assets")

primary_dim = materialized_partitions_subset.partitions_def.primary_dimension
secondary_dim = materialized_partitions_subset.partitions_def.secondary_dimension
primary_dim = materialized_partitions_subset.get_partitions_def().primary_dimension
secondary_dim = materialized_partitions_subset.get_partitions_def().secondary_dimension

dim2_materialized_partition_subset_by_dim1: Dict[str, PartitionsSubset] = defaultdict(
lambda: secondary_dim.partitions_def.empty_subset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def from_serialized(cls, cursor: str, asset_graph: AssetGraph) -> "AssetDaemonCu
and isinstance(partitions_def, TimeWindowPartitionsDefinition)
and any(
time_window.start < partitions_def.start
for time_window in subset.included_time_windows
for time_window in subset.get_included_time_windows()
)
):
subset = partitions_def.empty_subset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ def to_storage_dict(
for key, value in self.partitions_subsets_by_asset_key.items()
},
"serializable_partitions_def_ids_by_asset_key": {
key.to_user_string(): value.partitions_def.get_serializable_unique_identifier(
key.to_user_string(): value.get_partitions_def().get_serializable_unique_identifier(
dynamic_partitions_store=dynamic_partitions_store
)
for key, value in self.partitions_subsets_by_asset_key.items()
},
"partitions_def_class_names_by_asset_key": {
key.to_user_string(): value.partitions_def.__class__.__name__
key.to_user_string(): value.get_partitions_def().__class__.__name__
for key, value in self.partitions_subsets_by_asset_key.items()
},
"non_partitioned_asset_keys": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def _calculate_data_time_partitioned(
if not isinstance(partition_subset, BaseTimeWindowPartitionsSubset):
check.failed(f"Invalid partition subset {type(partition_subset)}")

sorted_time_windows = sorted(partition_subset.included_time_windows)
sorted_time_windows = sorted(partition_subset.get_included_time_windows())
# no time windows, no data
if len(sorted_time_windows) == 0:
return None
Expand Down
24 changes: 14 additions & 10 deletions python_modules/dagster/dagster/_core/definitions/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ def with_partition_key_range(
dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
) -> "PartitionsSubset[T_str]":
return self.with_partition_keys(
self.partitions_def.get_partition_keys_in_range(
self.get_partitions_def().get_partition_keys_in_range(
partition_key_range, dynamic_partitions_store=dynamic_partitions_store
)
)
Expand All @@ -989,16 +989,22 @@ def __or__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]":

def __sub__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]":
if self is other:
return self.partitions_def.empty_subset()
return self.partitions_def.empty_subset().with_partition_keys(
set(self.get_partition_keys()).difference(set(other.get_partition_keys()))
return self.get_partitions_def().empty_subset()
return (
self.get_partitions_def()
.empty_subset()
.with_partition_keys(
set(self.get_partition_keys()).difference(set(other.get_partition_keys()))
)
)

def __and__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]":
if self is other:
return self
return self.partitions_def.empty_subset().with_partition_keys(
set(self.get_partition_keys()) & set(other.get_partition_keys())
return (
self.get_partitions_def()
.empty_subset()
.with_partition_keys(set(self.get_partition_keys()) & set(other.get_partition_keys()))
)

@abstractmethod
Expand All @@ -1023,9 +1029,8 @@ def can_deserialize(
) -> bool:
...

@property
@abstractmethod
def partitions_def(self) -> PartitionsDefinition[T_str]:
def get_partitions_def(self) -> PartitionsDefinition[T_str]:
...

@abstractmethod
Expand Down Expand Up @@ -1186,8 +1191,7 @@ def can_deserialize(
data.get("subset") is not None and data.get("version") == cls.SERIALIZATION_VERSION
)

@property
def partitions_def(self) -> PartitionsDefinition[T_str]:
def get_partitions_def(self) -> PartitionsDefinition[T_str]:
return self._partitions_def

def __eq__(self, other: object) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def get_upstream_mapped_partitions_result_for_partitions(
if downstream_partitions_subset is None:
check.failed("downstream asset is not partitioned")

if downstream_partitions_subset.partitions_def == upstream_partitions_def:
if downstream_partitions_subset.get_partitions_def() == upstream_partitions_def:
return UpstreamPartitionsResult(downstream_partitions_subset, [])

upstream_partition_keys = set(
Expand All @@ -141,7 +141,7 @@ def get_downstream_partitions_for_partitions(
if upstream_partitions_subset is None:
check.failed("upstream asset is not partitioned")

if upstream_partitions_subset.partitions_def == downstream_partitions_def:
if upstream_partitions_subset.get_partitions_def() == downstream_partitions_def:
return upstream_partitions_subset

upstream_partition_keys = set(upstream_partitions_subset.get_partition_keys())
Expand Down Expand Up @@ -218,8 +218,10 @@ def get_downstream_partitions_for_partitions(
current_time: Optional[datetime] = None,
dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
) -> PartitionsSubset:
last_upstream_partition = upstream_partitions_subset.partitions_def.get_last_partition_key(
current_time=current_time, dynamic_partitions_store=dynamic_partitions_store
last_upstream_partition = (
upstream_partitions_subset.get_partitions_def().get_last_partition_key(
current_time=current_time, dynamic_partitions_store=dynamic_partitions_store
)
)
if last_upstream_partition and last_upstream_partition in upstream_partitions_subset:
return downstream_partitions_def.subset_with_all_partitions(
Expand Down Expand Up @@ -495,7 +497,7 @@ def get_upstream_mapped_partitions_result_for_partitions(
check.failed("downstream asset is not partitioned")

result = self._get_dependency_partitions_subset(
cast(MultiPartitionsDefinition, downstream_partitions_subset.partitions_def),
cast(MultiPartitionsDefinition, downstream_partitions_subset.get_partitions_def()),
downstream_partitions_subset,
cast(MultiPartitionsDefinition, upstream_partitions_def),
a_upstream_of_b=False,
Expand All @@ -519,7 +521,7 @@ def get_downstream_partitions_for_partitions(
check.failed("upstream asset is not partitioned")

result = self._get_dependency_partitions_subset(
cast(MultiPartitionsDefinition, upstream_partitions_subset.partitions_def),
cast(MultiPartitionsDefinition, upstream_partitions_subset.get_partitions_def()),
upstream_partitions_subset,
cast(MultiPartitionsDefinition, downstream_partitions_def),
a_upstream_of_b=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def get_upstream_mapped_partitions_result_for_partitions(
check.failed("downstream_partitions_subset must be a BaseTimeWindowPartitionsSubset")

return self._map_partitions(
downstream_partitions_subset.partitions_def,
downstream_partitions_subset.get_partitions_def(),
upstream_partitions_def,
downstream_partitions_subset,
start_offset=self.start_offset,
Expand All @@ -137,7 +137,7 @@ def get_downstream_partitions_for_partitions(
if not provided.
"""
return self._map_partitions(
upstream_partitions_subset.partitions_def,
upstream_partitions_subset.get_partitions_def(),
downstream_partitions_def,
upstream_partitions_subset,
end_offset=-self.start_offset,
Expand Down Expand Up @@ -200,7 +200,7 @@ def _map_partitions(
last_window = to_partitions_def.get_last_partition_window(current_time=current_time)

time_windows = []
for from_partition_time_window in from_partitions_subset.included_time_windows:
for from_partition_time_window in from_partitions_subset.get_included_time_windows():
from_start_dt, from_end_dt = from_partition_time_window

offsetted_start_dt = _offsetted_datetime(
Expand Down Expand Up @@ -363,7 +363,7 @@ def _do_cheap_partition_mapping_if_possible(
TimeWindowPartitionsSubset(
partitions_def=to_partitions_def,
num_partitions=None,
included_time_windows=from_partitions_subset.included_time_windows,
included_time_windows=from_partitions_subset.get_included_time_windows(),
),
[],
)
Expand All @@ -377,7 +377,7 @@ def _do_cheap_partition_mapping_if_possible(
else:
required_but_nonexistent_partition_keys = [
pk
for time_window in from_partitions_subset.included_time_windows
for time_window in from_partitions_subset.get_included_time_windows()
for pk in to_partitions_def.get_partition_keys_in_time_window(
time_window=time_window
)
Expand Down
Loading

0 comments on commit 1f9247e

Please sign in to comment.