From c4e4110f9e4e6341a3dd0c38aa94988321daba2c Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Fri, 3 Nov 2023 16:49:58 -0700 Subject: [PATCH] first stab continue time window partitions subset changes asset backfill serialization partition mapping update continue refactor fix more tests more test fixes fix partition mapping tests adjust test fix more tests add tests --- .../implementation/fetch_assets.py | 60 ++++----- .../dagster_graphql/schema/asset_graph.py | 13 +- .../dagster_graphql/schema/backfill.py | 4 +- .../_core/definitions/asset_daemon_context.py | 2 +- .../_core/definitions/asset_daemon_cursor.py | 1 + .../dagster/_core/definitions/asset_graph.py | 9 +- .../_core/definitions/asset_graph_subset.py | 12 +- .../multi_asset_sensor_definition.py | 1 + .../multi_dimensional_partitions.py | 30 +---- .../dagster/_core/definitions/partition.py | 119 +++++++++--------- .../_core/definitions/partition_mapping.py | 37 ++++-- .../time_window_partition_mapping.py | 8 +- .../definitions/time_window_partitions.py | 13 +- .../dagster/_core/execution/context/input.py | 10 +- .../dagster/_core/execution/context/system.py | 18 ++- .../_utils/caching_instance_queryer.py | 1 + .../test_asset_partition_mappings.py | 23 ++-- .../test_multipartition_partition_mapping.py | 54 ++++++-- .../test_static_partition_mapping.py | 14 ++- .../test_time_window_partition_mapping.py | 77 ++++++++++-- .../asset_defs_tests/test_asset_graph.py | 8 +- .../test_partitioned_assets.py | 10 +- .../execution_tests/test_asset_backfill.py | 6 +- .../partition_tests/test_partition.py | 4 +- .../auto_materialize_tests/base_scenario.py | 26 ++-- .../scenarios/partition_scenarios.py | 10 +- .../test_flatten_time_window_ranges.py | 98 +++++++++------ .../test_multi_partitions.py | 6 +- .../test_time_window_partitions.py | 3 +- .../storage_tests/test_fs_io_manager.py | 2 + 30 files changed, 413 insertions(+), 266 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py index cab8902d1a677..dac8e85222dee 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py @@ -20,18 +20,13 @@ DagsterEventType, DagsterInstance, EventRecordsFilter, - MultiPartitionKey, MultiPartitionsDefinition, _check as check, ) from dagster._core.definitions.data_time import CachingDataTimeResolver from dagster._core.definitions.external_asset_graph import ExternalAssetGraph -from dagster._core.definitions.multi_dimensional_partitions import ( - MultiPartitionsSubset, -) from dagster._core.definitions.partition import ( CachingDynamicPartitionsLoader, - DefaultPartitionsSubset, PartitionsDefinition, PartitionsSubset, ) @@ -420,6 +415,7 @@ def build_partition_statuses( materialized_partitions_subset: Optional[PartitionsSubset], failed_partitions_subset: Optional[PartitionsSubset], in_progress_partitions_subset: Optional[PartitionsSubset], + partitions_def: Optional[PartitionsDefinition], ) -> Union[ "GrapheneTimePartitionStatuses", "GrapheneDefaultPartitionStatuses", @@ -469,7 +465,7 @@ def build_partition_statuses( graphene_ranges = [] for r in ranges: partition_key_range = cast( - TimeWindowPartitionsDefinition, materialized_partitions_subset.partitions_def + TimeWindowPartitionsDefinition, partitions_def ).get_partition_key_range_for_time_window(r.time_window) graphene_ranges.append( GrapheneTimePartitionRangeStatus( @@ -481,14 +477,15 @@ def build_partition_statuses( ) ) return GrapheneTimePartitionStatuses(ranges=graphene_ranges) - elif isinstance(materialized_partitions_subset, MultiPartitionsSubset): + elif isinstance(partitions_def, MultiPartitionsDefinition): return get_2d_run_length_encoded_partitions( dynamic_partitions_store, materialized_partitions_subset, failed_partitions_subset, in_progress_partitions_subset, + partitions_def, ) - elif isinstance(materialized_partitions_subset, DefaultPartitionsSubset): + elif partitions_def: materialized_keys = materialized_partitions_subset.get_partition_keys() failed_keys = failed_partitions_subset.get_partition_keys() in_progress_keys = in_progress_partitions_subset.get_partition_keys() @@ -499,7 +496,7 @@ def build_partition_statuses( - set(in_progress_keys), failedPartitions=failed_keys, unmaterializedPartitions=materialized_partitions_subset.get_partition_keys_not_in_subset( - dynamic_partitions_store=dynamic_partitions_store + partitions_def=partitions_def, dynamic_partitions_store=dynamic_partitions_store ), materializingPartitions=in_progress_keys, ) @@ -512,57 +509,51 @@ def get_2d_run_length_encoded_partitions( materialized_partitions_subset: PartitionsSubset, failed_partitions_subset: PartitionsSubset, in_progress_partitions_subset: PartitionsSubset, + partitions_def: MultiPartitionsDefinition, ) -> "GrapheneMultiPartitionStatuses": from ..schema.pipelines.pipeline import ( GrapheneMultiPartitionRangeStatuses, GrapheneMultiPartitionStatuses, ) - if ( - not isinstance(materialized_partitions_subset.partitions_def, MultiPartitionsDefinition) - or not isinstance(failed_partitions_subset.partitions_def, MultiPartitionsDefinition) - or not isinstance(in_progress_partitions_subset.partitions_def, MultiPartitionsDefinition) - ): + if not isinstance(partitions_def, MultiPartitionsDefinition): check.failed("Can only fetch 2D run length encoded partitions for multipartitioned assets") - primary_dim = materialized_partitions_subset.partitions_def.primary_dimension - secondary_dim = materialized_partitions_subset.partitions_def.secondary_dimension + primary_dim = partitions_def.primary_dimension + secondary_dim = partitions_def.secondary_dimension dim2_materialized_partition_subset_by_dim1: Dict[str, PartitionsSubset] = defaultdict( lambda: secondary_dim.partitions_def.empty_subset() ) - for partition_key in cast( - Sequence[MultiPartitionKey], materialized_partitions_subset.get_partition_keys() - ): + for partition_key in materialized_partitions_subset.get_partition_keys(): + multipartition_key = partitions_def.get_partition_key_from_str(partition_key) dim2_materialized_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] + multipartition_key.keys_by_dimension[primary_dim.name] ] = dim2_materialized_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] - ].with_partition_keys([partition_key.keys_by_dimension[secondary_dim.name]]) + multipartition_key.keys_by_dimension[primary_dim.name] + ].with_partition_keys([multipartition_key.keys_by_dimension[secondary_dim.name]]) dim2_failed_partition_subset_by_dim1: Dict[str, PartitionsSubset] = defaultdict( lambda: secondary_dim.partitions_def.empty_subset() ) - for partition_key in cast( - Sequence[MultiPartitionKey], failed_partitions_subset.get_partition_keys() - ): + for partition_key in failed_partitions_subset.get_partition_keys(): + multipartition_key = partitions_def.get_partition_key_from_str(partition_key) dim2_failed_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] + multipartition_key.keys_by_dimension[primary_dim.name] ] = dim2_failed_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] - ].with_partition_keys([partition_key.keys_by_dimension[secondary_dim.name]]) + multipartition_key.keys_by_dimension[primary_dim.name] + ].with_partition_keys([multipartition_key.keys_by_dimension[secondary_dim.name]]) dim2_in_progress_partition_subset_by_dim1: Dict[str, PartitionsSubset] = defaultdict( lambda: secondary_dim.partitions_def.empty_subset() ) - for partition_key in cast( - Sequence[MultiPartitionKey], in_progress_partitions_subset.get_partition_keys() - ): + for partition_key in in_progress_partitions_subset.get_partition_keys(): + multipartition_key = partitions_def.get_partition_key_from_str(partition_key) dim2_in_progress_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] + multipartition_key.keys_by_dimension[primary_dim.name] ] = dim2_in_progress_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] - ].with_partition_keys([partition_key.keys_by_dimension[secondary_dim.name]]) + multipartition_key.keys_by_dimension[primary_dim.name] + ].with_partition_keys([multipartition_key.keys_by_dimension[secondary_dim.name]]) materialized_2d_ranges = [] @@ -626,6 +617,7 @@ def get_2d_run_length_encoded_partitions( dim2_materialized_partition_subset_by_dim1[start_key], dim2_failed_partition_subset_by_dim1[start_key], dim2_in_progress_partition_subset_by_dim1[start_key], + secondary_dim.partitions_def, ), ) ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 1715f0553e12d..1ea46d2542968 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -934,6 +934,12 @@ def resolve_assetPartitionStatuses( if not self._dynamic_partitions_loader: check.failed("dynamic_partitions_loader must be provided to get partition keys") + partitions_def = ( + self._external_asset_node.partitions_def_data.get_partitions_definition() + if self._external_asset_node.partitions_def_data + else None + ) + ( materialized_partition_subset, failed_partition_subset, @@ -942,11 +948,7 @@ def resolve_assetPartitionStatuses( graphene_info.context.instance, asset_key, self._dynamic_partitions_loader, - ( - self._external_asset_node.partitions_def_data.get_partitions_definition() - if self._external_asset_node.partitions_def_data - else None - ), + partitions_def, ) return build_partition_statuses( @@ -954,6 +956,7 @@ def resolve_assetPartitionStatuses( materialized_partition_subset, failed_partition_subset, in_progress_subset, + partitions_def, ) def resolve_partitionStats( diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index c66eb3fe482c0..edc30a4510120 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -133,7 +133,9 @@ def __init__(self, partition_subset: PartitionsSubset): if isinstance(partition_subset, BaseTimeWindowPartitionsSubset): ranges = [ GraphenePartitionKeyRange(start, end) - for start, end in partition_subset.get_partition_key_ranges() + for start, end in partition_subset.get_partition_key_ranges( + partition_subset.get_partitions_def() + ) ] partition_keys = None else: # Default partitions subset diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index de6e3366eac67..3c96475d0e0fa 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -714,7 +714,7 @@ def _build_run_requests_with_backfill_policy( ) -> Sequence[RunRequest]: run_requests = [] partition_subset = partitions_def.subset_with_partition_keys(partition_keys) - partition_key_ranges = partition_subset.get_partition_key_ranges() + partition_key_ranges = partition_subset.get_partition_key_ranges(partitions_def) for partition_key_range in partition_key_ranges: # We might resolve more than one partition key range for the given partition keys. # We can only apply chunking on individual partition key ranges. diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index 10c90b6569440..8a19d55e282fa 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -77,6 +77,7 @@ def get_unhandled_partitions( ) return handled_subset.get_partition_keys_not_in_subset( + partitions_def=partitions_def, current_time=current_time, dynamic_partitions_store=dynamic_partitions_store, ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 572285e188142..3f1524a0d88b9 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -364,6 +364,7 @@ def get_child_partition_keys_of_parent( partition_mapping = self.get_partition_mapping(child_asset_key, parent_asset_key) child_partitions_subset = partition_mapping.get_downstream_partitions_for_partitions( parent_partitions_def.empty_subset().with_partition_keys([parent_partition_key]), + parent_partitions_def, downstream_partitions_def=child_partitions_def, dynamic_partitions_store=dynamic_partitions_store, current_time=current_time, @@ -437,7 +438,7 @@ def get_parent_partition_keys_for_child( """ partition_key = check.opt_str_param(partition_key, "partition_key") - child_partitions_def = self.get_partitions_def(child_asset_key) + child_partitions_def = cast(PartitionsDefinition, self.get_partitions_def(child_asset_key)) parent_partitions_def = self.get_partitions_def(parent_asset_key) if parent_partitions_def is None: @@ -449,12 +450,11 @@ def get_parent_partition_keys_for_child( return partition_mapping.get_upstream_mapped_partitions_result_for_partitions( ( - cast(PartitionsDefinition, child_partitions_def).subset_with_partition_keys( - [partition_key] - ) + child_partitions_def.subset_with_partition_keys([partition_key]) if partition_key else None ), + downstream_partitions_def=child_partitions_def, upstream_partitions_def=parent_partitions_def, dynamic_partitions_store=dynamic_partitions_store, current_time=current_time, @@ -611,6 +611,7 @@ def bfs_filter_subsets( child_partitions_subset = ( partition_mapping.get_downstream_partitions_for_partitions( partitions_subset, + check.not_none(self.get_partitions_def(asset_key)), downstream_partitions_def=child_partitions_def, dynamic_partitions_store=dynamic_partitions_store, current_time=current_time, diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py index 773ac4b159361..efe084745b58f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py @@ -94,14 +94,18 @@ def to_storage_dict( for key, value in self.partitions_subsets_by_asset_key.items() }, "serializable_partitions_def_ids_by_asset_key": { - key.to_user_string(): value.partitions_def.get_serializable_unique_identifier( + key.to_user_string(): check.not_none( + self._asset_graph.get_partitions_def(key) + ).get_serializable_unique_identifier( dynamic_partitions_store=dynamic_partitions_store ) - for key, value in self.partitions_subsets_by_asset_key.items() + for key, _ in self.partitions_subsets_by_asset_key.items() }, "partitions_def_class_names_by_asset_key": { - key.to_user_string(): value.partitions_def.__class__.__name__ - for key, value in self.partitions_subsets_by_asset_key.items() + key.to_user_string(): check.not_none( + self._asset_graph.get_partitions_def(key) + ).__class__.__name__ + for key, _ in self.partitions_subsets_by_asset_key.items() }, "non_partitioned_asset_keys": [ key.to_user_string() for key in self._non_partitioned_asset_keys diff --git a/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py index 0fcd25945dc05..d4cdd574a4806 100644 --- a/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py @@ -718,6 +718,7 @@ def get_downstream_partition_keys( downstream_partition_key_subset = ( partition_mapping.get_downstream_partitions_for_partitions( from_asset.partitions_def.empty_subset().with_partition_keys([partition_key]), + from_asset.partitions_def, downstream_partitions_def=to_partitions_def, dynamic_partitions_store=self.instance, ) diff --git a/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py b/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py index 9acffb0e1aff2..1e4d95760dc10 100644 --- a/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py @@ -4,7 +4,6 @@ from functools import lru_cache, reduce from typing import ( Dict, - Iterable, List, Mapping, NamedTuple, @@ -216,7 +215,7 @@ def __init__(self, partitions_defs: Mapping[str, PartitionsDefinition]): @property def partitions_subset_class(self) -> Type["PartitionsSubset"]: - return MultiPartitionsSubset + return DefaultPartitionsSubset def get_serializable_unique_identifier( self, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None @@ -509,33 +508,6 @@ def get_num_partitions( return reduce(lambda x, y: x * y, dimension_counts, 1) -class MultiPartitionsSubset(DefaultPartitionsSubset): - def __init__( - self, - partitions_def: MultiPartitionsDefinition, - subset: Optional[Set[str]] = None, - ): - check.inst_param(partitions_def, "partitions_def", MultiPartitionsDefinition) - subset = ( - set( - [ - partitions_def.get_partition_key_from_str(key) - for key in subset - if MULTIPARTITION_KEY_DELIMITER in key - ] - ) - if subset - else set() - ) - super(MultiPartitionsSubset, self).__init__(partitions_def, subset) - - def with_partition_keys(self, partition_keys: Iterable[str]) -> "MultiPartitionsSubset": - return MultiPartitionsSubset( - cast(MultiPartitionsDefinition, self._partitions_def), - self._subset | set(partition_keys), - ) - - def get_tags_from_multi_partition_key(multi_partition_key: MultiPartitionKey) -> Mapping[str, str]: check.inst_param(multi_partition_key, "multi_partition_key", MultiPartitionKey) diff --git a/python_modules/dagster/dagster/_core/definitions/partition.py b/python_modules/dagster/dagster/_core/definitions/partition.py index 1d8e181408335..4ed465792d503 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition.py +++ b/python_modules/dagster/dagster/_core/definitions/partition.py @@ -9,6 +9,7 @@ ) from enum import Enum from typing import ( + AbstractSet, Any, Callable, Dict, @@ -18,7 +19,6 @@ NamedTuple, Optional, Sequence, - Set, Type, Union, cast, @@ -139,8 +139,8 @@ class PartitionsDefinition(ABC, Generic[T_str]): """ @property - def partitions_subset_class(self) -> Type["PartitionsSubset[T_str]"]: - return DefaultPartitionsSubset[T_str] + def partitions_subset_class(self) -> Type["PartitionsSubset"]: + return DefaultPartitionsSubset @abstractmethod @public @@ -217,26 +217,24 @@ def get_partition_keys_in_range( + 1 ] - def empty_subset(self) -> "PartitionsSubset[T_str]": + def empty_subset(self) -> "PartitionsSubset": return self.partitions_subset_class.empty_subset(self) - def subset_with_partition_keys( - self, partition_keys: Iterable[str] - ) -> "PartitionsSubset[T_str]": + def subset_with_partition_keys(self, partition_keys: Iterable[str]) -> "PartitionsSubset": return self.empty_subset().with_partition_keys(partition_keys) def subset_with_all_partitions( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, - ) -> "PartitionsSubset[T_str]": + ) -> "PartitionsSubset": return self.subset_with_partition_keys( self.get_partition_keys( current_time=current_time, dynamic_partitions_store=dynamic_partitions_store ) ) - def deserialize_subset(self, serialized: str) -> "PartitionsSubset[T_str]": + def deserialize_subset(self, serialized: str) -> "PartitionsSubset": return self.partitions_subset_class.from_serialized(self, serialized) def can_deserialize_subset( @@ -949,6 +947,7 @@ class PartitionsSubset(ABC, Generic[T_str]): @abstractmethod def get_partition_keys_not_in_subset( self, + partitions_def: PartitionsDefinition[T_str], current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[T_str]: @@ -962,6 +961,7 @@ def get_partition_keys(self, current_time: Optional[datetime] = None) -> Iterabl @abstractmethod def get_partition_key_ranges( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: @@ -973,11 +973,12 @@ def with_partition_keys(self, partition_keys: Iterable[str]) -> "PartitionsSubse def with_partition_key_range( self, + partitions_def: PartitionsDefinition[T_str], partition_key_range: PartitionKeyRange, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> "PartitionsSubset[T_str]": return self.with_partition_keys( - self.partitions_def.get_partition_keys_in_range( + partitions_def.get_partition_keys_in_range( partition_key_range, dynamic_partitions_store=dynamic_partitions_store ) ) @@ -989,15 +990,15 @@ def __or__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]": def __sub__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]": if self is other: - return self.partitions_def.empty_subset() - return self.partitions_def.empty_subset().with_partition_keys( + return self.empty_subset(self.get_partitions_def()) + return self.empty_subset(self.get_partitions_def()).with_partition_keys( set(self.get_partition_keys()).difference(set(other.get_partition_keys())) ) def __and__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]": if self is other: return self - return self.partitions_def.empty_subset().with_partition_keys( + return self.empty_subset(self.get_partitions_def()).with_partition_keys( set(self.get_partition_keys()) & set(other.get_partition_keys()) ) @@ -1023,11 +1024,6 @@ def can_deserialize( ) -> bool: ... - @property - @abstractmethod - def partitions_def(self) -> PartitionsDefinition[T_str]: - ... - @abstractmethod def __len__(self) -> int: ... @@ -1038,7 +1034,13 @@ def __contains__(self, value) -> bool: @classmethod @abstractmethod - def empty_subset(cls, partitions_def: PartitionsDefinition[T_str]) -> "PartitionsSubset[T_str]": + def empty_subset( + cls, partitions_def: Optional[PartitionsDefinition] = None + ) -> "PartitionsSubset[T_str]": + ... + + @abstractmethod + def get_partitions_def(self) -> Optional[PartitionsDefinition]: ... @@ -1078,48 +1080,51 @@ def deserialize(self, partitions_def: PartitionsDefinition) -> PartitionsSubset: return partitions_def.deserialize_subset(self.serialized_subset) -class DefaultPartitionsSubset(PartitionsSubset[T_str]): +@whitelist_for_serdes +class DefaultPartitionsSubset( + PartitionsSubset, + NamedTuple("_DefaultPartitionsSubset", [("subset", AbstractSet[str])]), +): # Every time we change the serialization format, we should increment the version number. # This will ensure that we can gracefully degrade when deserializing old data. SERIALIZATION_VERSION = 1 - def __init__( - self, partitions_def: PartitionsDefinition[T_str], subset: Optional[Set[T_str]] = None + def __new__( + cls, + subset: Optional[AbstractSet[str]] = None, ): check.opt_set_param(subset, "subset") - self._partitions_def = partitions_def - self._subset = subset or set() + return super(DefaultPartitionsSubset, cls).__new__(cls, subset or set()) def get_partition_keys_not_in_subset( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[str]: - return ( - set( - self._partitions_def.get_partition_keys( - current_time=current_time, dynamic_partitions_store=dynamic_partitions_store - ) + return set( + partitions_def.get_partition_keys( + current_time=current_time, dynamic_partitions_store=dynamic_partitions_store ) - - self._subset - ) + ) - set(self.subset) def get_partition_keys(self, current_time: Optional[datetime] = None) -> Iterable[str]: - return self._subset + return self.subset def get_partition_key_ranges( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: - partition_keys = self._partitions_def.get_partition_keys( + partition_keys = partitions_def.get_partition_keys( current_time, dynamic_partitions_store=dynamic_partitions_store ) cur_range_start = None cur_range_end = None result = [] for partition_key in partition_keys: - if partition_key in self._subset: + if partition_key in self.subset: if cur_range_start is None: cur_range_start = partition_key cur_range_end = partition_key @@ -1133,12 +1138,12 @@ def get_partition_key_ranges( return result - def with_partition_keys( - self, partition_keys: Iterable[T_str] - ) -> "DefaultPartitionsSubset[T_str]": + def get_partitions_def(self) -> Optional[PartitionsDefinition]: + return None + + def with_partition_keys(self, partition_keys: Iterable[str]) -> "DefaultPartitionsSubset": return DefaultPartitionsSubset( - self._partitions_def, - self._subset | set(partition_keys), + self.subset | set(partition_keys), ) def serialize(self) -> str: @@ -1148,32 +1153,32 @@ def serialize(self) -> str: { "version": self.SERIALIZATION_VERSION, # sort to ensure that equivalent partition subsets have identical serialized forms - "subset": sorted(list(self._subset)), + "subset": sorted(list(self.subset)), } ) @classmethod def from_serialized( - cls, partitions_def: PartitionsDefinition[T_str], serialized: str - ) -> "PartitionsSubset[T_str]": + cls, partitions_def: PartitionsDefinition, serialized: str + ) -> "PartitionsSubset": # Check the version number, so only valid versions can be deserialized. data = json.loads(serialized) if isinstance(data, list): # backwards compatibility - return cls(subset=set(data), partitions_def=partitions_def) + return cls(subset=set(data)) else: if data.get("version") != cls.SERIALIZATION_VERSION: raise DagsterInvalidDeserializationVersionError( f"Attempted to deserialize partition subset with version {data.get('version')}," f" but only version {cls.SERIALIZATION_VERSION} is supported." ) - return cls(subset=set(data.get("subset")), partitions_def=partitions_def) + return cls(subset=set(data.get("subset"))) @classmethod def can_deserialize( cls, - partitions_def: PartitionsDefinition[T_str], + partitions_def: PartitionsDefinition, serialized: str, serialized_partitions_def_unique_id: Optional[str], serialized_partitions_def_class_name: Optional[str], @@ -1186,28 +1191,20 @@ def can_deserialize( data.get("subset") is not None and data.get("version") == cls.SERIALIZATION_VERSION ) - @property - def partitions_def(self) -> PartitionsDefinition[T_str]: - return self._partitions_def - def __eq__(self, other: object) -> bool: - return ( - isinstance(other, DefaultPartitionsSubset) - and self._partitions_def == other._partitions_def - and self._subset == other._subset - ) + return isinstance(other, DefaultPartitionsSubset) and self.subset == other.subset def __len__(self) -> int: - return len(self._subset) + return len(self.subset) def __contains__(self, value) -> bool: - return value in self._subset + return value in self.subset def __repr__(self) -> str: - return ( - f"DefaultPartitionsSubset(subset={self._subset}, partitions_def={self._partitions_def})" - ) + return f"DefaultPartitionsSubset(subset={self.subset})" @classmethod - def empty_subset(cls, partitions_def: PartitionsDefinition[T_str]) -> "PartitionsSubset[T_str]": - return cls(partitions_def=partitions_def) + def empty_subset( + cls, partitions_def: Optional[PartitionsDefinition] = None + ) -> "DefaultPartitionsSubset": + return cls() diff --git a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py index 5a5dddbbc6017..84263cccd5e20 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py @@ -62,6 +62,7 @@ class may change at any time. def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -81,6 +82,7 @@ def get_downstream_partitions_for_partitions( def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -107,6 +109,7 @@ class IdentityPartitionMapping(PartitionMapping, NamedTuple("_IdentityPartitionM def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -114,7 +117,7 @@ def get_upstream_mapped_partitions_result_for_partitions( if downstream_partitions_subset is None: check.failed("downstream asset is not partitioned") - if downstream_partitions_subset.partitions_def == upstream_partitions_def: + if downstream_partitions_def == upstream_partitions_def: return UpstreamPartitionsResult(downstream_partitions_subset, []) upstream_partition_keys = set( @@ -134,6 +137,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -141,7 +145,7 @@ def get_downstream_partitions_for_partitions( if upstream_partitions_subset is None: check.failed("upstream asset is not partitioned") - if upstream_partitions_subset.partitions_def == downstream_partitions_def: + if upstream_partitions_def == downstream_partitions_def: return upstream_partitions_subset upstream_partition_keys = set(upstream_partitions_subset.get_partition_keys()) @@ -167,6 +171,7 @@ class AllPartitionMapping(PartitionMapping, NamedTuple("_AllPartitionMapping", [ def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -179,6 +184,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -197,6 +203,7 @@ class LastPartitionMapping(PartitionMapping, NamedTuple("_LastPartitionMapping", def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -214,11 +221,14 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> PartitionsSubset: - last_upstream_partition = upstream_partitions_subset.partitions_def.get_last_partition_key( + last_upstream_partition = check.not_none( + upstream_partitions_subset.get_partitions_def() + ).get_last_partition_key( current_time=current_time, dynamic_partitions_store=dynamic_partitions_store ) if last_upstream_partition and last_upstream_partition in upstream_partitions_subset: @@ -259,6 +269,7 @@ def a_downstream(upstream): def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -270,6 +281,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -325,9 +337,8 @@ def _get_dependency_partitions_subset( a_partition_keys_by_dimension = defaultdict(set) if isinstance(a_partitions_def, MultiPartitionsDefinition): for partition_key in a_partitions_subset.get_partition_keys(): - for dimension_name, key in cast( - MultiPartitionKey, partition_key - ).keys_by_dimension.items(): + key = a_partitions_def.get_partition_key_from_str(partition_key) + for dimension_name, key in key.keys_by_dimension.items(): a_partition_keys_by_dimension[dimension_name].add(key) else: for partition_key in a_partitions_subset.get_partition_keys(): @@ -383,6 +394,7 @@ def _get_dependency_partitions_subset( a_dimension_partitions_def.empty_subset().with_partition_keys( [key] ), + a_dimension_partitions_def, b_dimension_partitions_def, current_time=current_time, dynamic_partitions_store=dynamic_partitions_store, @@ -421,6 +433,7 @@ def _get_dependency_partitions_subset( a_dimension_partitions_def.empty_subset().with_partition_keys( [key] ), + a_dimension_partitions_def, b_dimension_partitions_def, current_time=current_time, dynamic_partitions_store=dynamic_partitions_store, @@ -447,7 +460,9 @@ def _get_dependency_partitions_subset( [ dep_b_keys_by_a_dim_and_key[dim_name][ ( - cast(MultiPartitionKey, key).keys_by_dimension[dim_name] + cast(MultiPartitionsDefinition, a_partitions_def) + .get_partition_key_from_str(key) + .keys_by_dimension[dim_name] if dim_name else key ) @@ -487,6 +502,7 @@ def _get_dependency_partitions_subset( def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -495,7 +511,7 @@ def get_upstream_mapped_partitions_result_for_partitions( check.failed("downstream asset is not partitioned") result = self._get_dependency_partitions_subset( - cast(MultiPartitionsDefinition, downstream_partitions_subset.partitions_def), + cast(MultiPartitionsDefinition, downstream_partitions_def), downstream_partitions_subset, cast(MultiPartitionsDefinition, upstream_partitions_def), a_upstream_of_b=False, @@ -511,6 +527,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -519,7 +536,7 @@ def get_downstream_partitions_for_partitions( check.failed("upstream asset is not partitioned") result = self._get_dependency_partitions_subset( - cast(MultiPartitionsDefinition, upstream_partitions_subset.partitions_def), + cast(MultiPartitionsDefinition, upstream_partitions_def), upstream_partitions_subset, cast(MultiPartitionsDefinition, downstream_partitions_def), a_upstream_of_b=True, @@ -855,6 +872,7 @@ def _check_downstream(self, *, downstream_partitions_def: PartitionsDefinition): def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -870,6 +888,7 @@ def get_downstream_partitions_for_partitions( def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py index c7d02fd595c7f..82c2b889ea128 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py @@ -108,6 +108,7 @@ def __new__( def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -116,7 +117,7 @@ def get_upstream_mapped_partitions_result_for_partitions( check.failed("downstream_partitions_subset must be a BaseTimeWindowPartitionsSubset") return self._map_partitions( - downstream_partitions_subset.partitions_def, + downstream_partitions_subset.get_partitions_def(), upstream_partitions_def, downstream_partitions_subset, start_offset=self.start_offset, @@ -127,6 +128,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: Optional[PartitionsDefinition], current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -137,7 +139,7 @@ def get_downstream_partitions_for_partitions( if not provided. """ return self._map_partitions( - upstream_partitions_subset.partitions_def, + upstream_partitions_def, downstream_partitions_def, upstream_partitions_subset, end_offset=-self.start_offset, @@ -200,7 +202,7 @@ def _map_partitions( last_window = to_partitions_def.get_last_partition_window(current_time=current_time) time_windows = [] - for from_partition_time_window in from_partitions_subset.included_time_windows: + for from_partition_time_window in from_partitions_subset.get_included_time_windows(): from_start_dt, from_end_dt = from_partition_time_window offsetted_start_dt = _offsetted_datetime( diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index c82e622bb7a17..c4762a2ea6b59 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -1482,6 +1482,7 @@ def _get_partition_time_windows_not_in_subset( def get_partition_keys_not_in_subset( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[str]: @@ -1791,7 +1792,9 @@ def __eq__(self, other): ) or super(TimePartitionKeyPartitionsSubset, self).__eq__(other) @classmethod - def empty_subset(cls, partitions_def: PartitionsDefinition) -> "PartitionsSubset": + def empty_subset( + cls, partitions_def: Optional[PartitionsDefinition] = None + ) -> "PartitionsSubset": if not isinstance(partitions_def, TimeWindowPartitionsDefinition): check.failed("Partitions definition must be a TimeWindowPartitionsDefinition") partitions_def = cast(TimeWindowPartitionsDefinition, partitions_def) @@ -1925,18 +1928,18 @@ def with_partitions_def( self, partitions_def: TimeWindowPartitionsDefinition ) -> "BaseTimeWindowPartitionsSubset": check.invariant( - partitions_def.cron_schedule == self._partitions_def.cron_schedule, + partitions_def.cron_schedule == self.partitions_def.cron_schedule, "num_partitions would become inaccurate if the partitions_defs had different cron" " schedules", ) return TimeWindowPartitionsSubset( partitions_def=partitions_def, - num_partitions=self._num_partitions, - included_time_windows=self._included_time_windows, + num_partitions=self.num_partitions, + included_time_windows=self.included_time_windows, ) def __repr__(self) -> str: - return f"TimeWindowPartitionsSubset({self.get_partition_key_ranges()})" + return f"TimeWindowPartitionsSubset({self.get_partition_key_ranges(self.partitions_def)})" class PartitionRangeStatus(Enum): diff --git a/python_modules/dagster/dagster/_core/execution/context/input.py b/python_modules/dagster/dagster/_core/execution/context/input.py index 42d5318cb3faf..5df13b74efe2f 100644 --- a/python_modules/dagster/dagster/_core/execution/context/input.py +++ b/python_modules/dagster/dagster/_core/execution/context/input.py @@ -365,7 +365,7 @@ def asset_partition_key_range(self) -> PartitionKeyRange: ) partition_key_ranges = subset.get_partition_key_ranges( - dynamic_partitions_store=self.instance + self.asset_partitions_def, dynamic_partitions_store=self.instance ) if len(partition_key_ranges) != 1: check.failed( @@ -608,7 +608,7 @@ def build_input_context( ) if asset_partitions_def and asset_partition_key_range: asset_partitions_subset = asset_partitions_def.empty_subset().with_partition_key_range( - asset_partition_key_range, dynamic_partitions_store=instance + asset_partitions_def, asset_partition_key_range, dynamic_partitions_store=instance ) elif asset_partition_key_range: asset_partitions_subset = KeyRangeNoPartitionsDefPartitionsSubset(asset_partition_key_range) @@ -643,6 +643,7 @@ def __init__(self, key_range: PartitionKeyRange): def get_partition_keys_not_in_subset( self, + partitions_def: "PartitionsDefinition", current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[str]: @@ -656,6 +657,7 @@ def get_partition_keys(self, current_time: Optional[datetime] = None) -> Iterabl def get_partition_key_ranges( self, + partitions_def: "PartitionsDefinition", current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: @@ -700,5 +702,7 @@ def can_deserialize( raise NotImplementedError() @classmethod - def empty_subset(cls, partitions_def: "PartitionsDefinition") -> "PartitionsSubset": + def empty_subset( + cls, partitions_def: Optional["PartitionsDefinition"] = None + ) -> "PartitionsSubset": raise NotImplementedError() diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index a5486bee1eca7..2dafef2e5c9d9 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1101,8 +1101,19 @@ def has_asset_partitions_for_input(self, input_name: str) -> bool: def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: subset = self.asset_partitions_subset_for_input(input_name) + + # TODO refactor upstream_asset_partitions_def as method + asset_layer = self.job_def.asset_layer + upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name) + check.not_none(upstream_asset_key) + upstream_asset_partitions_def = asset_layer.partitions_def_for_asset( + cast(AssetKey, upstream_asset_key) + ) + check.not_none(upstream_asset_partitions_def) + partition_key_ranges = subset.get_partition_key_ranges( - dynamic_partitions_store=self.instance + partitions_def=cast(PartitionsDefinition, upstream_asset_partitions_def), + dynamic_partitions_store=self.instance, ) if len(partition_key_ranges) != 1: @@ -1127,7 +1138,9 @@ def asset_partitions_subset_for_input( partitions_def = assets_def.partitions_def if assets_def else None partitions_subset = ( partitions_def.empty_subset().with_partition_key_range( - self.asset_partition_key_range, dynamic_partitions_store=self.instance + partitions_def, + self.asset_partition_key_range, + dynamic_partitions_store=self.instance, ) if partitions_def else None @@ -1142,6 +1155,7 @@ def asset_partitions_subset_for_input( mapped_partitions_result = ( partition_mapping.get_upstream_mapped_partitions_result_for_partitions( partitions_subset, + partitions_def, upstream_asset_partitions_def, dynamic_partitions_store=self.instance, ) diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index 36eda647b5b96..eb6a5b49146f3 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -613,6 +613,7 @@ def asset_partitions_with_newly_updated_parents_and_new_latest_storage_id( child_partitions_subset = ( partition_mapping.get_downstream_partitions_for_partitions( partitions_subset, + partitions_def, downstream_partitions_def=child_partitions_def, dynamic_partitions_store=self, current_time=self.evaluation_time, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py index e1c8a0654b79f..9ac4dfd9e9886 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py @@ -64,6 +64,7 @@ class TrailingWindowPartitionMapping(PartitionMapping): def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -74,7 +75,8 @@ def get_upstream_mapped_partitions_result_for_partitions( partition_keys = list(downstream_partitions_subset.get_partition_keys()) return UpstreamPartitionsResult( upstream_partitions_def.empty_subset().with_partition_key_range( - PartitionKeyRange(str(max(1, int(partition_keys[0]) - 1)), partition_keys[-1]) + upstream_partitions_def, + PartitionKeyRange(str(max(1, int(partition_keys[0]) - 1)), partition_keys[-1]), ), [], ) @@ -82,6 +84,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -260,24 +263,24 @@ def test_specific_partitions_partition_mapping_downstream_partitions(): # cases where at least one of the specific partitions is in the upstream partitions subset for partition_subset in [ - DefaultPartitionsSubset(upstream_partitions_def, {"a"}), - DefaultPartitionsSubset(upstream_partitions_def, {"a", "b"}), - DefaultPartitionsSubset(upstream_partitions_def, {"a", "b", "c", "d"}), + DefaultPartitionsSubset({"a"}), + DefaultPartitionsSubset({"a", "b"}), + DefaultPartitionsSubset({"a", "b", "c", "d"}), ]: assert ( partition_mapping.get_downstream_partitions_for_partitions( - partition_subset, downstream_partitions_def + partition_subset, upstream_partitions_def, downstream_partitions_def ) == downstream_partitions_def.subset_with_all_partitions() ) for partition_subset in [ - DefaultPartitionsSubset(upstream_partitions_def, {"c"}), - DefaultPartitionsSubset(upstream_partitions_def, {"c", "d"}), + DefaultPartitionsSubset({"c"}), + DefaultPartitionsSubset({"c", "d"}), ]: assert ( partition_mapping.get_downstream_partitions_for_partitions( - partition_subset, downstream_partitions_def + partition_subset, upstream_partitions_def, downstream_partitions_def ) == downstream_partitions_def.empty_subset() ) @@ -564,13 +567,13 @@ def test_identity_partition_mapping(): zx = StaticPartitionsDefinition(["z", "x"]) result = IdentityPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( - zx.empty_subset().with_partition_keys(["z", "x"]), xy + zx.empty_subset().with_partition_keys(["z", "x"]), zx, xy ) assert result.partitions_subset.get_partition_keys() == set(["x"]) assert result.required_but_nonexistent_partition_keys == ["z"] result = IdentityPartitionMapping().get_downstream_partitions_for_partitions( - zx.empty_subset().with_partition_keys(["z", "x"]), xy + zx.empty_subset().with_partition_keys(["z", "x"]), zx, xy ) assert result.get_partition_keys() == set(["x"]) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py index beeb5ab3b4163..791aeb4d61fc2 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py @@ -40,10 +40,11 @@ def test_get_downstream_partitions_single_key_in_range(): ) single_dimension_subset = single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("a", "a") + single_dimension_def, PartitionKeyRange("a", "a") ) result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=single_dimension_subset, + upstream_partitions_def=single_dimension_def, downstream_partitions_def=multipartitions_def, ) multipartitions_subset = multipartitions_def.empty_subset().with_partition_keys( @@ -57,6 +58,7 @@ def test_get_downstream_partitions_single_key_in_range(): result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=multipartitions_subset, + upstream_partitions_def=multipartitions_def, downstream_partitions_def=single_dimension_def, ) assert result == single_dimension_subset @@ -67,12 +69,12 @@ def test_get_downstream_partitions_single_key_in_range(): result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("b", "b") + partitions_def=single_dimension_def, partition_key_range=PartitionKeyRange("b", "b") ), + upstream_partitions_def=single_dimension_def, downstream_partitions_def=multipartitions_def, ) assert result == DefaultPartitionsSubset( - multipartitions_def, { MultiPartitionKey({"abc": "b", "xyz": "x"}), MultiPartitionKey({"abc": "b", "xyz": "y"}), @@ -87,7 +89,7 @@ def test_get_downstream_partitions_multiple_keys_in_range(): {"abc": single_dimension_def, "123": StaticPartitionsDefinition(["1", "2", "3"])} ) single_dimension_subset = single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("a", "b") + single_dimension_def, PartitionKeyRange("a", "b") ) multipartitions_subset = multipartitions_def.empty_subset().with_partition_keys( { @@ -102,12 +104,14 @@ def test_get_downstream_partitions_multiple_keys_in_range(): result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=single_dimension_subset, + upstream_partitions_def=single_dimension_def, downstream_partitions_def=multipartitions_def, ) assert result == multipartitions_subset result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=multipartitions_subset, + upstream_partitions_def=multipartitions_def, downstream_partitions_def=single_dimension_def, ) assert result == single_dimension_subset @@ -127,17 +131,19 @@ def test_get_downstream_partitions_multiple_keys_in_range(): @pytest.mark.parametrize( - "upstream_partitions_def,upstream_partitions_subset,downstream_partitions_subset", + "upstream_partitions_def,upstream_partitions_subset,downstream_partitions_subset,downstream_partitions_def", [ ( static_partitions_def, static_partitions_def.empty_subset().with_partition_keys({"a"}), static_multipartitions_def.empty_subset().with_partition_key_range( + static_multipartitions_def, PartitionKeyRange( MultiPartitionKey({"abc": "a", "123": "1"}), MultiPartitionKey({"abc": "a", "123": "1"}), - ) + ), ), + static_multipartitions_def, ), ( static_partitions_def, @@ -148,6 +154,7 @@ def test_get_downstream_partitions_multiple_keys_in_range(): MultiPartitionKey({"abc": "a", "123": "2"}), } ), + static_multipartitions_def, ), ( static_multipartitions_def, @@ -159,6 +166,7 @@ def test_get_downstream_partitions_multiple_keys_in_range(): } ), static_partitions_def.empty_subset().with_partition_keys({"a"}), + static_partitions_def, ), ( static_multipartitions_def, @@ -173,12 +181,17 @@ def test_get_downstream_partitions_multiple_keys_in_range(): } ), static_partitions_def.empty_subset().with_partition_keys({"a", "b"}), + static_partitions_def, ), ( daily_partitions_def, daily_partitions_def.empty_subset() - .with_partition_key_range(PartitionKeyRange(start="2023-01-08", end="2023-01-14")) - .with_partition_key_range(PartitionKeyRange(start="2023-01-29", end="2023-02-04")), + .with_partition_key_range( + daily_partitions_def, PartitionKeyRange(start="2023-01-08", end="2023-01-14") + ) + .with_partition_key_range( + daily_partitions_def, PartitionKeyRange(start="2023-01-29", end="2023-02-04") + ), weekly_multipartitions_def.empty_subset().with_partition_keys( { MultiPartitionKey({"ab": "a", "week": "2023-01-08"}), @@ -187,6 +200,7 @@ def test_get_downstream_partitions_multiple_keys_in_range(): MultiPartitionKey({"ab": "b", "week": "2023-01-29"}), } ), + weekly_multipartitions_def, ), ], ) @@ -194,11 +208,13 @@ def test_get_upstream_single_dimension_to_multi_partition_mapping( upstream_partitions_def, upstream_partitions_subset, downstream_partitions_subset, + downstream_partitions_def, ): assert ( MultiToSingleDimensionPartitionMapping() .get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset, + downstream_partitions_def, upstream_partitions_def, ) .partitions_subset @@ -219,38 +235,44 @@ def test_error_thrown_when_no_partition_dimension_name_provided(): with pytest.raises(CheckError, match="dimension name must be specified"): MultiToSingleDimensionPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( multipartitions_def.empty_subset().with_partition_key_range( + multipartitions_def, PartitionKeyRange( MultiPartitionKey({"a": "1", "b": "1"}), MultiPartitionKey({"a": "1", "b": "1"}), - ) + ), ), + multipartitions_def, single_dimension_def, ) with pytest.raises(CheckError, match="dimension name must be specified"): MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( multipartitions_def.empty_subset().with_partition_key_range( + multipartitions_def, PartitionKeyRange( MultiPartitionKey({"a": "1", "b": "1"}), MultiPartitionKey({"a": "1", "b": "1"}), - ) + ), ), + multipartitions_def, single_dimension_def, ) with pytest.raises(CheckError, match="dimension name must be specified"): MultiToSingleDimensionPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("1", "1") + single_dimension_def, PartitionKeyRange("1", "1") ), + single_dimension_def, multipartitions_def, ) with pytest.raises(CheckError, match="dimension name must be specified"): MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("1", "1") + single_dimension_def, PartitionKeyRange("1", "1") ), + single_dimension_def, multipartitions_def, ) @@ -629,6 +651,7 @@ def test_multipartitions_mapping_get_upstream_partitions( ): result = partitions_mapping.get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_def.empty_subset().with_partition_keys(downstream_partition_keys), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset.get_partition_keys() == set(upstream_partition_keys) @@ -665,6 +688,7 @@ def test_multipartitions_required_but_invalid_upstream_partitions(): MultiPartitionKey({"time": "2023-06-01", "123": "1"}), ] ), + may_multipartitions_def, june_multipartitions_def, ) assert result.partitions_subset.get_partition_keys() == set( @@ -689,6 +713,7 @@ def test_multipartitions_mapping_get_downstream_partitions( ): assert partitions_mapping.get_downstream_partitions_for_partitions( upstream_partitions_def.empty_subset().with_partition_keys(upstream_partition_keys), + upstream_partitions_def, downstream_partitions_def, ).get_partition_keys() == set(downstream_partition_keys) @@ -715,6 +740,7 @@ def test_multipartitions_mapping_dynamic(): downstream_partitions_def.empty_subset().with_partition_keys( [MultiPartitionKey({"dynamic": "a", "123": "1"})] ), + downstream_partitions_def, upstream_partitions_def, dynamic_partitions_store=instance, ) @@ -749,7 +775,9 @@ def test_error_multipartitions_mapping(): "other nonexistent dimension", SpecificPartitionsPartitionMapping(["c"]) ) } - ).get_upstream_mapped_partitions_result_for_partitions(weekly_abc.empty_subset(), daily_123) + ).get_upstream_mapped_partitions_result_for_partitions( + weekly_abc.empty_subset(), weekly_abc, daily_123 + ) def test_multi_partition_mapping_with_asset_deps(): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_static_partition_mapping.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_static_partition_mapping.py index 6f4b6f11fe233..00598139b9091 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_static_partition_mapping.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_static_partition_mapping.py @@ -16,19 +16,21 @@ def test_single_valued_static_mapping(): upstream_partitions_subset=upstream_parts.empty_subset().with_partition_keys( ["p1", "p3", "q2", "r1"] ), + upstream_partitions_def=upstream_parts, downstream_partitions_def=downstream_parts, ) - assert result == DefaultPartitionsSubset(downstream_parts, {"p", "r"}) + assert result == DefaultPartitionsSubset({"p", "r"}) result = mapping.get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=downstream_parts.empty_subset().with_partition_keys( ["p", "q"] ), + downstream_partitions_def=downstream_parts, upstream_partitions_def=upstream_parts, ) - assert result.partitions_subset == DefaultPartitionsSubset(upstream_parts, {"p1", "p2", "p3"}) + assert result.partitions_subset == DefaultPartitionsSubset({"p1", "p2", "p3"}) def test_multi_valued_static_mapping(): @@ -39,19 +41,21 @@ def test_multi_valued_static_mapping(): result = mapping.get_downstream_partitions_for_partitions( upstream_partitions_subset=upstream_parts.empty_subset().with_partition_keys(["p", "r"]), + upstream_partitions_def=upstream_parts, downstream_partitions_def=downstream_parts, ) - assert result == DefaultPartitionsSubset(downstream_parts, {"p1", "p2", "p3"}) + assert result == DefaultPartitionsSubset({"p1", "p2", "p3"}) result = mapping.get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=downstream_parts.empty_subset().with_partition_keys( ["p2", "p3", "q"] ), + downstream_partitions_def=downstream_parts, upstream_partitions_def=upstream_parts, ) - assert result.partitions_subset == DefaultPartitionsSubset(upstream_parts, {"p", "q1", "q2"}) + assert result.partitions_subset == DefaultPartitionsSubset({"p", "q1", "q2"}) def test_error_on_extra_keys_in_mapping(): @@ -63,6 +67,7 @@ def test_error_on_extra_keys_in_mapping(): {"p": "p", "q": {"q", "OTHER"}} ).get_downstream_partitions_for_partitions( upstream_partitions_subset=upstream_parts.empty_subset(), + upstream_partitions_def=upstream_parts, downstream_partitions_def=downstream_parts, ) @@ -71,6 +76,7 @@ def test_error_on_extra_keys_in_mapping(): {"p": "p", "q": "q", "OTHER": "q"} ).get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=downstream_parts.empty_subset(), + downstream_partitions_def=downstream_parts, upstream_partitions_def=upstream_parts, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py index 03202ece6c007..33d9b1d05016a 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py @@ -30,6 +30,7 @@ def test_get_upstream_partitions_for_partition_range_same_partitioning(): # single partition key result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, ["2021-05-07"]), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset == upstream_partitions_def.empty_subset().with_partition_keys( @@ -39,6 +40,7 @@ def test_get_upstream_partitions_for_partition_range_same_partitioning(): # range of partition keys result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07", "2021-05-09"), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset == subset_with_key_range( @@ -52,6 +54,7 @@ def test_get_upstream_partitions_for_partition_range_same_partitioning_different result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07", "2021-05-09"), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset == subset_with_key_range( @@ -70,6 +73,7 @@ def test_get_upstream_partitions_for_partition_range_hourly_downstream_daily_ups upstream_partitions_def = DailyPartitionsDefinition(start_date="2021-05-05") result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, ["2021-05-07-05:00"]), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset == upstream_partitions_def.empty_subset().with_partition_keys( @@ -78,6 +82,7 @@ def test_get_upstream_partitions_for_partition_range_hourly_downstream_daily_ups result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07-05:00", "2021-05-09-09:00"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -93,6 +98,7 @@ def test_get_upstream_partitions_for_partition_range_daily_downstream_hourly_ups upstream_partitions_def = HourlyPartitionsDefinition(start_date="2021-05-05-00:00") result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, ["2021-05-07"]), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -104,6 +110,7 @@ def test_get_upstream_partitions_for_partition_range_daily_downstream_hourly_ups result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07", "2021-05-09"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -119,6 +126,7 @@ def test_get_upstream_partitions_for_partition_range_monthly_downstream_daily_up upstream_partitions_def = DailyPartitionsDefinition(start_date="2021-05-01") result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-01", "2021-07-01"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -139,6 +147,7 @@ def test_get_upstream_partitions_for_partition_range_twice_daily_downstream_dail ) result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-01", "2021-05-03"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -159,6 +168,7 @@ def test_get_upstream_partitions_for_partition_range_daily_downstream_twice_dail ) result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-01 00:00", "2021-05-03 00:00"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -179,6 +189,7 @@ def test_get_upstream_partitions_for_partition_range_daily_non_aligned(): ) result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-02", "2021-05-04"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -197,6 +208,7 @@ def test_get_upstream_partitions_for_partition_range_weekly_with_offset(): result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(partitions_def, "2022-09-11", "2022-09-11"), partitions_def, + partitions_def, ) assert result.partitions_subset.get_partition_keys() == ( partitions_def.get_partition_keys_in_range(PartitionKeyRange("2022-09-11", "2022-09-11")) @@ -211,17 +223,23 @@ def test_daily_to_daily_lag(): # single partition key assert mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-07"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-07"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-06"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2021-05-06"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2021-05-06"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2021-05-07"] # first partition key assert ( mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-05"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-05"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == [] ) @@ -229,17 +247,20 @@ def test_daily_to_daily_lag(): # range of partition keys assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07", "2021-05-09"), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-06", "2021-05-07", "2021-05-08"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_key_range(downstream_partitions_def, "2021-05-06", "2021-05-08"), + subset_with_key_range(upstream_partitions_def, "2021-05-06", "2021-05-08"), + upstream_partitions_def, downstream_partitions_def, ).get_partition_keys() == ["2021-05-07", "2021-05-08", "2021-05-09"] # range overlaps start assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-05", "2021-05-07"), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-05", "2021-05-06"] @@ -252,17 +273,23 @@ def test_exotic_cron_schedule_lag(): mapping = TimeWindowPartitionMapping(start_offset=-1, end_offset=-1) # single partition key assert mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-06_04"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-06_04"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-06_00"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2021-05-06_00"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2021-05-06_00"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2021-05-06_04"] # first partition key assert ( mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-05_00"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-05_00"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == [] ) @@ -270,17 +297,20 @@ def test_exotic_cron_schedule_lag(): # range of partition keys assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07_04", "2021-05-07_12"), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-07_00", "2021-05-07_04", "2021-05-07_08"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_key_range(downstream_partitions_def, "2021-05-07_04", "2021-05-07_12"), + subset_with_key_range(upstream_partitions_def, "2021-05-07_04", "2021-05-07_12"), + upstream_partitions_def, downstream_partitions_def, ).get_partition_keys() == ["2021-05-07_08", "2021-05-07_12", "2021-05-07_16"] # range overlaps start assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-05_00", "2021-05-05_08"), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-05_00", "2021-05-05_04"] @@ -291,11 +321,15 @@ def test_daily_to_daily_lag_different_start_date(): mapping = TimeWindowPartitionMapping(start_offset=-1, end_offset=-1) assert mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-06"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-06"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-05"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2021-05-05"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2021-05-05"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2021-05-06"] @@ -305,25 +339,33 @@ def test_daily_to_daily_many_to_one(): mapping = TimeWindowPartitionMapping(start_offset=-1) assert mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2022-07-04"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2022-07-04"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2022-07-03", "2022-07-04"] assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, ["2022-07-04", "2022-07-05"]), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2022-07-03", "2022-07-04", "2022-07-05"] assert mapping.get_downstream_partitions_for_partitions( subset_with_keys(upstream_partitions_def, ["2022-07-03", "2022-07-04"]), + upstream_partitions_def, downstream_partitions_def, ).get_partition_keys() == ["2022-07-03", "2022-07-04", "2022-07-05"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2022-07-03"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2022-07-03"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2022-07-03", "2022-07-04"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2022-07-04"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2022-07-04"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2022-07-04", "2022-07-05"] @@ -385,6 +427,7 @@ def test_get_downstream_with_current_time( assert ( mapping.get_downstream_partitions_for_partitions( subset_with_keys(upstream_partitions_def, upstream_keys), + upstream_partitions_def, downstream_partitions_def, current_time=current_time, ).get_partition_keys() @@ -489,6 +532,7 @@ def test_get_upstream_with_current_time( upstream_partitions_result = mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, downstream_keys), + downstream_partitions_def, upstream_partitions_def, current_time=current_time, ) @@ -508,6 +552,7 @@ def test_different_start_time_partitions_defs(): TimeWindowPartitionMapping() .get_downstream_partitions_for_partitions( upstream_partitions_subset=subset_with_keys(jan_start, ["2023-01-15"]), + upstream_partitions_def=jan_start, downstream_partitions_def=feb_start, ) .get_partition_keys() @@ -517,6 +562,7 @@ def test_different_start_time_partitions_defs(): upstream_partitions_result = ( TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=subset_with_keys(jan_start, ["2023-01-15"]), + downstream_partitions_def=jan_start, upstream_partitions_def=feb_start, ) ) @@ -530,6 +576,7 @@ def test_different_end_time_partitions_defs(): assert TimeWindowPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=subset_with_keys(jan_partitions_def, ["2023-01-15"]), + upstream_partitions_def=jan_partitions_def, downstream_partitions_def=jan_feb_partitions_def, ).get_partition_keys() == ["2023-01-15"] @@ -537,6 +584,7 @@ def test_different_end_time_partitions_defs(): TimeWindowPartitionMapping() .get_downstream_partitions_for_partitions( upstream_partitions_subset=subset_with_keys(jan_feb_partitions_def, ["2023-02-15"]), + upstream_partitions_def=jan_feb_partitions_def, downstream_partitions_def=jan_partitions_def, ) .get_partition_keys() @@ -546,6 +594,7 @@ def test_different_end_time_partitions_defs(): upstream_partitions_result = ( TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=subset_with_keys(jan_feb_partitions_def, ["2023-02-15"]), + downstream_partitions_def=jan_feb_partitions_def, upstream_partitions_def=jan_partitions_def, ) ) @@ -566,6 +615,7 @@ def test_daily_upstream_of_yearly(): allow_nonexistent_upstream_partitions=True ).get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=subset_with_keys(yearly, ["2023-01-01"]), + downstream_partitions_def=yearly, upstream_partitions_def=daily, current_time=datetime(2023, 1, 5, 0), ).partitions_subset.get_partition_keys() == [ @@ -633,6 +683,7 @@ def test_downstream_partition_has_valid_upstream_partitions( allow_nonexistent_upstream_partitions=allow_nonexistent_upstream_partitions ).get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=downstream_partitions_subset, + downstream_partitions_def=downstream_partitions_subset.get_partitions_def(), upstream_partitions_def=upstream_partitions_def, current_time=current_time, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index 8c4756cc93c53..77667b0616a11 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -213,6 +213,7 @@ class TrailingWindowPartitionMapping(PartitionMapping): def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -223,7 +224,8 @@ def get_upstream_mapped_partitions_result_for_partitions( partition_keys = list(downstream_partitions_subset.get_partition_keys()) return UpstreamPartitionsResult( upstream_partitions_def.empty_subset().with_partition_key_range( - PartitionKeyRange(str(max(1, int(partition_keys[0]) - 1)), partition_keys[-1]) + upstream_partitions_def, + PartitionKeyRange(str(max(1, int(partition_keys[0]) - 1)), partition_keys[-1]), ), [], ) @@ -231,6 +233,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -405,7 +408,8 @@ def include_all(asset_key, partitions_subset): asset_graph, partitions_subsets_by_asset_key={ asset3.key: asset3.partitions_def.empty_subset().with_partition_key_range( - PartitionKeyRange("2022-01-02-00:00", "2022-01-03-23:00") + asset3.partitions_def, + PartitionKeyRange("2022-01-02-00:00", "2022-01-03-23:00"), ), }, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index 0be1074d919a5..756e86eb377be 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -71,10 +71,13 @@ def get_upstream_partitions_for_partition_range( upstream_partitions_subset = ( downstream_partition_mapping.get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset, + downstream_partitions_def, upstream_partitions_def, ).partitions_subset ) - upstream_key_ranges = upstream_partitions_subset.get_partition_key_ranges() + upstream_key_ranges = upstream_partitions_subset.get_partition_key_ranges( + upstream_partitions_def + ) check.invariant(len(upstream_key_ranges) == 1) return upstream_key_ranges[0] @@ -101,10 +104,13 @@ def get_downstream_partitions_for_partition_range( downstream_partitions_subset = ( downstream_partition_mapping.get_downstream_partitions_for_partitions( upstream_partitions_subset, + upstream_partitions_def, downstream_assets_def.partitions_def, ) ) - downstream_key_ranges = downstream_partitions_subset.get_partition_key_ranges() + downstream_key_ranges = downstream_partitions_subset.get_partition_key_ranges( + downstream_assets_def.partitions_def + ) check.invariant(len(downstream_key_ranges) == 1) return downstream_key_ranges[0] diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 8be4569414b0c..44764135259a3 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -1389,9 +1389,11 @@ def foo_child(): ) assert asset_backfill_data.target_subset.partitions_subsets_by_asset_key == { - foo.key: foo_partitions_def.empty_subset().with_partition_key_range(partition_key_range), + foo.key: foo_partitions_def.empty_subset().with_partition_key_range( + foo_partitions_def, partition_key_range + ), foo_child.key: foo_partitions_def.empty_subset().with_partition_key_range( - partition_key_range + foo_partitions_def, partition_key_range ), } diff --git a/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py b/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py index 1399ed7a1d84d..d341aa18d2b53 100644 --- a/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py +++ b/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py @@ -142,10 +142,10 @@ def test_static_partitions_subset(): assert len(subset) == 0 assert "bar" not in subset with_some_partitions = subset.with_partition_keys(["foo", "bar"]) - assert with_some_partitions.get_partition_keys_not_in_subset() == {"baz", "qux"} + assert with_some_partitions.get_partition_keys_not_in_subset(partitions) == {"baz", "qux"} serialized = with_some_partitions.serialize() deserialized = partitions.deserialize_subset(serialized) - assert deserialized.get_partition_keys_not_in_subset() == {"baz", "qux"} + assert deserialized.get_partition_keys_not_in_subset(partitions) == {"baz", "qux"} assert len(with_some_partitions) == 2 assert len(deserialized) == 2 assert "bar" in with_some_partitions diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index b2ba65afb62f9..3807e8be2f8ff 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -165,7 +165,10 @@ class AssetReconciliationScenario( ("cursor_from", Optional["AssetReconciliationScenario"]), ("current_time", Optional[datetime.datetime]), ("asset_selection", Optional[AssetSelection]), - ("active_backfill_targets", Optional[Sequence[Mapping[AssetKey, PartitionsSubset]]]), + ( + "active_backfill_targets", + Optional[Sequence[Union[Mapping[AssetKey, PartitionsSubset], Sequence[AssetKey]]]], + ), ("dagster_runs", Optional[Sequence[DagsterRun]]), ("event_log_entries", Optional[Sequence[EventLogEntry]]), ("expected_run_requests", Optional[Sequence[RunRequest]]), @@ -189,7 +192,9 @@ def __new__( cursor_from: Optional["AssetReconciliationScenario"] = None, current_time: Optional[datetime.datetime] = None, asset_selection: Optional[AssetSelection] = None, - active_backfill_targets: Optional[Sequence[Mapping[AssetKey, PartitionsSubset]]] = None, + active_backfill_targets: Optional[ + Sequence[Union[Mapping[AssetKey, PartitionsSubset], Sequence[AssetKey]]] + ] = None, dagster_runs: Optional[Sequence[DagsterRun]] = None, event_log_entries: Optional[Sequence[EventLogEntry]] = None, expected_run_requests: Optional[Sequence[RunRequest]] = None, @@ -299,11 +304,18 @@ def repo(): # add any backfills to the instance for i, target in enumerate(self.active_backfill_targets or []): - target_subset = AssetGraphSubset( - asset_graph=repo.asset_graph, - partitions_subsets_by_asset_key=target, - non_partitioned_asset_keys=set(), - ) + if isinstance(target, Mapping): + target_subset = AssetGraphSubset( + asset_graph=repo.asset_graph, + partitions_subsets_by_asset_key=target, + non_partitioned_asset_keys=set(), + ) + else: + target_subset = AssetGraphSubset( + asset_graph=repo.asset_graph, + partitions_subsets_by_asset_key={}, + non_partitioned_asset_keys=target, + ) empty_subset = AssetGraphSubset( asset_graph=repo.asset_graph, partitions_subsets_by_asset_key={}, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py index 14e02d5efee14..20384626628dd 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py @@ -169,15 +169,7 @@ ], ), }, - { - AssetKey( - "non_existant_asset" # ignored since can't be loaded - ): TimeWindowPartitionsSubset(hourly_partitions_def).with_partition_keys( - [ - "2013-01-05-00:00", - ], - ), - }, + [AssetKey("non_existant_asset")], # ignored since can't be loaded ], current_time=create_pendulum_time(year=2013, month=1, day=5, hour=17), expected_run_requests=[ diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_flatten_time_window_ranges.py b/python_modules/dagster/dagster_tests/definitions_tests/test_flatten_time_window_ranges.py index 0cd5a9c0057e2..5042929338d4b 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_flatten_time_window_ranges.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_flatten_time_window_ranges.py @@ -38,10 +38,10 @@ def test_no_overlap() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-06", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-06", "2022-01-06") ), }, [ @@ -57,13 +57,13 @@ def test_no_overlap() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-06", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-06", "2022-01-06") ), PartitionRangeStatus.MATERIALIZING: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-8", "2022-01-09") + partitions_def, PartitionKeyRange("2022-01-8", "2022-01-09") ), }, [ @@ -86,10 +86,10 @@ def test_overlapped() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-05", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -106,10 +106,10 @@ def test_overlapped() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-05", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -132,10 +132,10 @@ def test_materialized_spans_failed() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-10") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-10") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-05", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -159,12 +159,18 @@ def test_materialized_spans_many_failed() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-01", "2022-12-10") + partitions_def, PartitionKeyRange("2022-01-01", "2022-12-10") ), PartitionRangeStatus.FAILED: ( - empty_subset.with_partition_key_range(PartitionKeyRange("2022-01-05", "2022-01-06")) - .with_partition_key_range(PartitionKeyRange("2022-03-01", "2022-03-10")) - .with_partition_key_range(PartitionKeyRange("2022-09-01", "2022-10-01")) + empty_subset.with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-03-01", "2022-03-10") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-09-01", "2022-10-01") + ) ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -221,8 +227,10 @@ def test_empty() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-10") - ).with_partition_key_range(PartitionKeyRange("2022-01-20", "2022-02-10")), + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-10") + ).with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-20", "2022-02-10") + ), PartitionRangeStatus.FAILED: empty_subset, PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -244,8 +252,10 @@ def test_empty() -> None: { PartitionRangeStatus.MATERIALIZED: empty_subset, PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-10") - ).with_partition_key_range(PartitionKeyRange("2022-01-20", "2022-02-10")), + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-10") + ).with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-20", "2022-02-10") + ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, [ @@ -267,8 +277,10 @@ def test_empty() -> None: PartitionRangeStatus.MATERIALIZED: empty_subset, PartitionRangeStatus.FAILED: empty_subset, PartitionRangeStatus.MATERIALIZING: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-10") - ).with_partition_key_range(PartitionKeyRange("2022-01-20", "2022-02-10")), + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-10") + ).with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-20", "2022-02-10") + ), }, [ { @@ -289,10 +301,12 @@ def test_cancels_out() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") - ).with_partition_key_range(PartitionKeyRange("2022-01-12", "2022-01-13")), + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") + ).with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-12", "2022-01-13") + ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -315,27 +329,37 @@ def test_lots() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: ( - empty_subset.with_partition_key_range(PartitionKeyRange("2022-01-02", "2022-01-05")) - .with_partition_key_range(PartitionKeyRange("2022-01-12", "2022-01-13")) - .with_partition_key_range(PartitionKeyRange("2022-01-15", "2022-01-17")) - .with_partition_key_range(PartitionKeyRange("2022-01-19", "2022-01-20")) - .with_partition_key_range(PartitionKeyRange("2022-01-22", "2022-01-24")) + empty_subset.with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-12", "2022-01-13") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-15", "2022-01-17") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-19", "2022-01-20") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-22", "2022-01-24") + ) ), PartitionRangeStatus.FAILED: ( empty_subset.with_partition_key_range( - PartitionKeyRange("2021-12-30", "2021-12-31") + partitions_def, PartitionKeyRange("2021-12-30", "2021-12-31") ) # before materialized subset .with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-03") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-03") ) # within materialized subset .with_partition_key_range( - PartitionKeyRange("2022-01-05", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") ) # directly after materialized subset .with_partition_key_range( - PartitionKeyRange("2022-01-08", "2022-01-09") + partitions_def, PartitionKeyRange("2022-01-08", "2022-01-09") ) # between materialized subsets .with_partition_key_range( - PartitionKeyRange("2022-01-11", "2022-01-14") + partitions_def, PartitionKeyRange("2022-01-11", "2022-01-14") ) # encompasses materialized subset .with_partition_keys(["2022-01-20"]) # at end materialized subset .with_partition_keys( @@ -381,13 +405,13 @@ def test_multiple_overlap_types(): _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-01", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-01", "2022-01-06") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.MATERIALIZING: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-03", "2022-01-04") + partitions_def, PartitionKeyRange("2022-01-03", "2022-01-04") ), }, [ diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py index e36c500ec4a43..0ddbb8e329f6c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py @@ -320,9 +320,9 @@ def test_multipartitions_subset_addition(initial, added): assert added_subset.get_partition_keys(current_time=current_day) == set( added_subset_keys + initial_subset_keys ) - assert added_subset.get_partition_keys_not_in_subset(current_time=current_day) == set( - expected_keys_not_in_updated_subset - ) + assert added_subset.get_partition_keys_not_in_subset( + multipartitions_def, current_time=current_day + ) == set(expected_keys_not_in_updated_subset) def test_asset_partition_key_is_multipartition_key(): diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py index 1876a80eea8dd..0b151c58927d6 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py @@ -926,7 +926,8 @@ def test_partition_subset_with_partition_keys( assert all(partition_key in updated_subset for partition_key in added_subset_keys) assert ( updated_subset.get_partition_keys_not_in_subset( - current_time=partitions_def.end_time_for_partition_key(full_set_keys[-1]) + partitions_def=partitions_def, + current_time=partitions_def.end_time_for_partition_key(full_set_keys[-1]), ) == expected_keys_not_in_updated_subset ) diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py index a0356e6a03eee..173c50405345b 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py @@ -295,6 +295,7 @@ class NoPartitionsPartitionMapping(PartitionMapping): def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -304,6 +305,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset, + upstream_partitions_def, downstream_partitions_def, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,