From 71ba29ebb2fd0e45a5811a304454c0e6d541bc98 Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Fri, 3 Nov 2023 16:49:58 -0700 Subject: [PATCH] first stab continue time window partitions subset changes asset backfill serialization partition mapping update continue refactor fix more tests more test fixes fix partition mapping tests adjust test fix more tests add tests --- .../implementation/fetch_assets.py | 62 +++++----- .../dagster_graphql/schema/asset_graph.py | 13 ++- .../dagster_graphql/schema/backfill.py | 4 +- .../_core/definitions/asset_daemon_context.py | 2 +- .../_core/definitions/asset_daemon_cursor.py | 1 + .../dagster/_core/definitions/asset_graph.py | 9 +- .../_core/definitions/asset_graph_subset.py | 12 +- .../multi_asset_sensor_definition.py | 1 + .../multi_dimensional_partitions.py | 30 +---- .../dagster/_core/definitions/partition.py | 109 +++++++++--------- .../_core/definitions/partition_mapping.py | 31 +++-- .../time_window_partition_mapping.py | 4 +- .../definitions/time_window_partitions.py | 8 +- .../dagster/_core/execution/context/input.py | 12 +- .../dagster/_core/execution/context/system.py | 17 ++- .../_utils/caching_instance_queryer.py | 1 + .../test_asset_partition_mappings.py | 26 +++-- .../test_multipartition_partition_mapping.py | 55 ++++++--- .../test_static_partition_mapping.py | 14 ++- .../test_time_window_partition_mapping.py | 77 ++++++++++--- .../asset_defs_tests/test_asset_graph.py | 8 +- .../test_partitioned_assets.py | 10 +- .../execution_tests/test_asset_backfill.py | 6 +- .../partition_tests/test_partition.py | 4 +- .../auto_materialize_tests/base_scenario.py | 26 +++-- .../scenarios/partition_scenarios.py | 12 +- .../test_flatten_time_window_ranges.py | 98 ++++++++++------ .../test_multi_partitions.py | 6 +- .../test_time_window_partitions.py | 6 +- .../storage_tests/test_fs_io_manager.py | 2 + 30 files changed, 409 insertions(+), 257 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py index a0c34b17a4ce5..733aa51f8e727 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py @@ -20,18 +20,13 @@ DagsterEventType, DagsterInstance, EventRecordsFilter, - MultiPartitionKey, MultiPartitionsDefinition, _check as check, ) from dagster._core.definitions.data_time import CachingDataTimeResolver from dagster._core.definitions.external_asset_graph import ExternalAssetGraph -from dagster._core.definitions.multi_dimensional_partitions import ( - MultiPartitionsSubset, -) from dagster._core.definitions.partition import ( CachingDynamicPartitionsLoader, - DefaultPartitionsSubset, PartitionsDefinition, PartitionsSubset, ) @@ -420,6 +415,7 @@ def build_partition_statuses( materialized_partitions_subset: Optional[PartitionsSubset], failed_partitions_subset: Optional[PartitionsSubset], in_progress_partitions_subset: Optional[PartitionsSubset], + partitions_def: Optional[PartitionsDefinition], ) -> Union[ "GrapheneTimePartitionStatuses", "GrapheneDefaultPartitionStatuses", @@ -481,14 +477,15 @@ def build_partition_statuses( ) ) return GrapheneTimePartitionStatuses(ranges=graphene_ranges) - elif isinstance(materialized_partitions_subset, MultiPartitionsSubset): + elif isinstance(partitions_def, MultiPartitionsDefinition): return get_2d_run_length_encoded_partitions( dynamic_partitions_store, materialized_partitions_subset, failed_partitions_subset, in_progress_partitions_subset, + partitions_def, ) - elif isinstance(materialized_partitions_subset, DefaultPartitionsSubset): + elif partitions_def: materialized_keys = materialized_partitions_subset.get_partition_keys() failed_keys = failed_partitions_subset.get_partition_keys() in_progress_keys = in_progress_partitions_subset.get_partition_keys() @@ -499,7 +496,7 @@ def build_partition_statuses( - set(in_progress_keys), failedPartitions=failed_keys, unmaterializedPartitions=materialized_partitions_subset.get_partition_keys_not_in_subset( - dynamic_partitions_store=dynamic_partitions_store + partitions_def=partitions_def, dynamic_partitions_store=dynamic_partitions_store ), materializingPartitions=in_progress_keys, ) @@ -512,57 +509,53 @@ def get_2d_run_length_encoded_partitions( materialized_partitions_subset: PartitionsSubset, failed_partitions_subset: PartitionsSubset, in_progress_partitions_subset: PartitionsSubset, + partitions_def: MultiPartitionsDefinition, ) -> "GrapheneMultiPartitionStatuses": from ..schema.pipelines.pipeline import ( GrapheneMultiPartitionRangeStatuses, GrapheneMultiPartitionStatuses, ) - if ( - not isinstance(materialized_partitions_subset.partitions_def, MultiPartitionsDefinition) - or not isinstance(failed_partitions_subset.partitions_def, MultiPartitionsDefinition) - or not isinstance(in_progress_partitions_subset.partitions_def, MultiPartitionsDefinition) - ): - check.failed("Can only fetch 2D run length encoded partitions for multipartitioned assets") + check.invariant( + isinstance(partitions_def, MultiPartitionsDefinition), + "Partitions definition should be multipartitioned", + ) - primary_dim = materialized_partitions_subset.partitions_def.primary_dimension - secondary_dim = materialized_partitions_subset.partitions_def.secondary_dimension + primary_dim = partitions_def.primary_dimension + secondary_dim = partitions_def.secondary_dimension dim2_materialized_partition_subset_by_dim1: Dict[str, PartitionsSubset] = defaultdict( lambda: secondary_dim.partitions_def.empty_subset() ) - for partition_key in cast( - Sequence[MultiPartitionKey], materialized_partitions_subset.get_partition_keys() - ): + for partition_key in materialized_partitions_subset.get_partition_keys(): + multipartition_key = partitions_def.get_partition_key_from_str(partition_key) dim2_materialized_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] + multipartition_key.keys_by_dimension[primary_dim.name] ] = dim2_materialized_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] - ].with_partition_keys([partition_key.keys_by_dimension[secondary_dim.name]]) + multipartition_key.keys_by_dimension[primary_dim.name] + ].with_partition_keys([multipartition_key.keys_by_dimension[secondary_dim.name]]) dim2_failed_partition_subset_by_dim1: Dict[str, PartitionsSubset] = defaultdict( lambda: secondary_dim.partitions_def.empty_subset() ) - for partition_key in cast( - Sequence[MultiPartitionKey], failed_partitions_subset.get_partition_keys() - ): + for partition_key in failed_partitions_subset.get_partition_keys(): + multipartition_key = partitions_def.get_partition_key_from_str(partition_key) dim2_failed_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] + multipartition_key.keys_by_dimension[primary_dim.name] ] = dim2_failed_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] - ].with_partition_keys([partition_key.keys_by_dimension[secondary_dim.name]]) + multipartition_key.keys_by_dimension[primary_dim.name] + ].with_partition_keys([multipartition_key.keys_by_dimension[secondary_dim.name]]) dim2_in_progress_partition_subset_by_dim1: Dict[str, PartitionsSubset] = defaultdict( lambda: secondary_dim.partitions_def.empty_subset() ) - for partition_key in cast( - Sequence[MultiPartitionKey], in_progress_partitions_subset.get_partition_keys() - ): + for partition_key in in_progress_partitions_subset.get_partition_keys(): + multipartition_key = partitions_def.get_partition_key_from_str(partition_key) dim2_in_progress_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] + multipartition_key.keys_by_dimension[primary_dim.name] ] = dim2_in_progress_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] - ].with_partition_keys([partition_key.keys_by_dimension[secondary_dim.name]]) + multipartition_key.keys_by_dimension[primary_dim.name] + ].with_partition_keys([multipartition_key.keys_by_dimension[secondary_dim.name]]) materialized_2d_ranges = [] @@ -626,6 +619,7 @@ def get_2d_run_length_encoded_partitions( dim2_materialized_partition_subset_by_dim1[start_key], dim2_failed_partition_subset_by_dim1[start_key], dim2_in_progress_partition_subset_by_dim1[start_key], + secondary_dim.partitions_def, ), ) ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 5b9658ed82316..cc88f368949a0 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -930,6 +930,12 @@ def resolve_assetPartitionStatuses( if not self._dynamic_partitions_loader: check.failed("dynamic_partitions_loader must be provided to get partition keys") + partitions_def = ( + self._external_asset_node.partitions_def_data.get_partitions_definition() + if self._external_asset_node.partitions_def_data + else None + ) + ( materialized_partition_subset, failed_partition_subset, @@ -938,11 +944,7 @@ def resolve_assetPartitionStatuses( graphene_info.context.instance, asset_key, self._dynamic_partitions_loader, - ( - self._external_asset_node.partitions_def_data.get_partitions_definition() - if self._external_asset_node.partitions_def_data - else None - ), + partitions_def, ) return build_partition_statuses( @@ -950,6 +952,7 @@ def resolve_assetPartitionStatuses( materialized_partition_subset, failed_partition_subset, in_progress_subset, + partitions_def, ) def resolve_partitionStats( diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index c66eb3fe482c0..f46badb6b3958 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -133,7 +133,9 @@ def __init__(self, partition_subset: PartitionsSubset): if isinstance(partition_subset, BaseTimeWindowPartitionsSubset): ranges = [ GraphenePartitionKeyRange(start, end) - for start, end in partition_subset.get_partition_key_ranges() + for start, end in partition_subset.get_partition_key_ranges( + partition_subset.partitions_def + ) ] partition_keys = None else: # Default partitions subset diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 0d06219a3f9c3..d3e20748ac964 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -723,7 +723,7 @@ def _build_run_requests_with_backfill_policy( run_requests = [] partition_subset = partitions_def.subset_with_partition_keys(partition_keys) partition_key_ranges = partition_subset.get_partition_key_ranges( - dynamic_partitions_store=dynamic_partitions_store + partitions_def, dynamic_partitions_store=dynamic_partitions_store ) for partition_key_range in partition_key_ranges: # We might resolve more than one partition key range for the given partition keys. diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index 7d21972594584..ad47b6c9b4afb 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -80,6 +80,7 @@ def get_unhandled_partitions( ) return handled_subset.get_partition_keys_not_in_subset( + partitions_def=partitions_def, current_time=current_time, dynamic_partitions_store=dynamic_partitions_store, ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 572285e188142..3f1524a0d88b9 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -364,6 +364,7 @@ def get_child_partition_keys_of_parent( partition_mapping = self.get_partition_mapping(child_asset_key, parent_asset_key) child_partitions_subset = partition_mapping.get_downstream_partitions_for_partitions( parent_partitions_def.empty_subset().with_partition_keys([parent_partition_key]), + parent_partitions_def, downstream_partitions_def=child_partitions_def, dynamic_partitions_store=dynamic_partitions_store, current_time=current_time, @@ -437,7 +438,7 @@ def get_parent_partition_keys_for_child( """ partition_key = check.opt_str_param(partition_key, "partition_key") - child_partitions_def = self.get_partitions_def(child_asset_key) + child_partitions_def = cast(PartitionsDefinition, self.get_partitions_def(child_asset_key)) parent_partitions_def = self.get_partitions_def(parent_asset_key) if parent_partitions_def is None: @@ -449,12 +450,11 @@ def get_parent_partition_keys_for_child( return partition_mapping.get_upstream_mapped_partitions_result_for_partitions( ( - cast(PartitionsDefinition, child_partitions_def).subset_with_partition_keys( - [partition_key] - ) + child_partitions_def.subset_with_partition_keys([partition_key]) if partition_key else None ), + downstream_partitions_def=child_partitions_def, upstream_partitions_def=parent_partitions_def, dynamic_partitions_store=dynamic_partitions_store, current_time=current_time, @@ -611,6 +611,7 @@ def bfs_filter_subsets( child_partitions_subset = ( partition_mapping.get_downstream_partitions_for_partitions( partitions_subset, + check.not_none(self.get_partitions_def(asset_key)), downstream_partitions_def=child_partitions_def, dynamic_partitions_store=dynamic_partitions_store, current_time=current_time, diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py index 773ac4b159361..efe084745b58f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py @@ -94,14 +94,18 @@ def to_storage_dict( for key, value in self.partitions_subsets_by_asset_key.items() }, "serializable_partitions_def_ids_by_asset_key": { - key.to_user_string(): value.partitions_def.get_serializable_unique_identifier( + key.to_user_string(): check.not_none( + self._asset_graph.get_partitions_def(key) + ).get_serializable_unique_identifier( dynamic_partitions_store=dynamic_partitions_store ) - for key, value in self.partitions_subsets_by_asset_key.items() + for key, _ in self.partitions_subsets_by_asset_key.items() }, "partitions_def_class_names_by_asset_key": { - key.to_user_string(): value.partitions_def.__class__.__name__ - for key, value in self.partitions_subsets_by_asset_key.items() + key.to_user_string(): check.not_none( + self._asset_graph.get_partitions_def(key) + ).__class__.__name__ + for key, _ in self.partitions_subsets_by_asset_key.items() }, "non_partitioned_asset_keys": [ key.to_user_string() for key in self._non_partitioned_asset_keys diff --git a/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py index 0fcd25945dc05..d4cdd574a4806 100644 --- a/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py @@ -718,6 +718,7 @@ def get_downstream_partition_keys( downstream_partition_key_subset = ( partition_mapping.get_downstream_partitions_for_partitions( from_asset.partitions_def.empty_subset().with_partition_keys([partition_key]), + from_asset.partitions_def, downstream_partitions_def=to_partitions_def, dynamic_partitions_store=self.instance, ) diff --git a/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py b/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py index 9acffb0e1aff2..1e4d95760dc10 100644 --- a/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py @@ -4,7 +4,6 @@ from functools import lru_cache, reduce from typing import ( Dict, - Iterable, List, Mapping, NamedTuple, @@ -216,7 +215,7 @@ def __init__(self, partitions_defs: Mapping[str, PartitionsDefinition]): @property def partitions_subset_class(self) -> Type["PartitionsSubset"]: - return MultiPartitionsSubset + return DefaultPartitionsSubset def get_serializable_unique_identifier( self, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None @@ -509,33 +508,6 @@ def get_num_partitions( return reduce(lambda x, y: x * y, dimension_counts, 1) -class MultiPartitionsSubset(DefaultPartitionsSubset): - def __init__( - self, - partitions_def: MultiPartitionsDefinition, - subset: Optional[Set[str]] = None, - ): - check.inst_param(partitions_def, "partitions_def", MultiPartitionsDefinition) - subset = ( - set( - [ - partitions_def.get_partition_key_from_str(key) - for key in subset - if MULTIPARTITION_KEY_DELIMITER in key - ] - ) - if subset - else set() - ) - super(MultiPartitionsSubset, self).__init__(partitions_def, subset) - - def with_partition_keys(self, partition_keys: Iterable[str]) -> "MultiPartitionsSubset": - return MultiPartitionsSubset( - cast(MultiPartitionsDefinition, self._partitions_def), - self._subset | set(partition_keys), - ) - - def get_tags_from_multi_partition_key(multi_partition_key: MultiPartitionKey) -> Mapping[str, str]: check.inst_param(multi_partition_key, "multi_partition_key", MultiPartitionKey) diff --git a/python_modules/dagster/dagster/_core/definitions/partition.py b/python_modules/dagster/dagster/_core/definitions/partition.py index 542c6838339d2..1ec01f8a8a123 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition.py +++ b/python_modules/dagster/dagster/_core/definitions/partition.py @@ -9,6 +9,7 @@ ) from enum import Enum from typing import ( + AbstractSet, Any, Callable, Dict, @@ -18,7 +19,6 @@ NamedTuple, Optional, Sequence, - Set, Type, Union, cast, @@ -139,8 +139,8 @@ class PartitionsDefinition(ABC, Generic[T_str]): """ @property - def partitions_subset_class(self) -> Type["PartitionsSubset[T_str]"]: - return DefaultPartitionsSubset[T_str] + def partitions_subset_class(self) -> Type["PartitionsSubset"]: + return DefaultPartitionsSubset @abstractmethod @public @@ -217,26 +217,24 @@ def get_partition_keys_in_range( + 1 ] - def empty_subset(self) -> "PartitionsSubset[T_str]": + def empty_subset(self) -> "PartitionsSubset": return self.partitions_subset_class.empty_subset(self) - def subset_with_partition_keys( - self, partition_keys: Iterable[str] - ) -> "PartitionsSubset[T_str]": + def subset_with_partition_keys(self, partition_keys: Iterable[str]) -> "PartitionsSubset": return self.empty_subset().with_partition_keys(partition_keys) def subset_with_all_partitions( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, - ) -> "PartitionsSubset[T_str]": + ) -> "PartitionsSubset": return self.subset_with_partition_keys( self.get_partition_keys( current_time=current_time, dynamic_partitions_store=dynamic_partitions_store ) ) - def deserialize_subset(self, serialized: str) -> "PartitionsSubset[T_str]": + def deserialize_subset(self, serialized: str) -> "PartitionsSubset": return self.partitions_subset_class.from_serialized(self, serialized) def can_deserialize_subset( @@ -949,6 +947,7 @@ class PartitionsSubset(ABC, Generic[T_str]): @abstractmethod def get_partition_keys_not_in_subset( self, + partitions_def: PartitionsDefinition[T_str], current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[T_str]: @@ -962,6 +961,7 @@ def get_partition_keys(self, current_time: Optional[datetime] = None) -> Iterabl @abstractmethod def get_partition_key_ranges( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: @@ -973,11 +973,12 @@ def with_partition_keys(self, partition_keys: Iterable[str]) -> "PartitionsSubse def with_partition_key_range( self, + partitions_def: PartitionsDefinition[T_str], partition_key_range: PartitionKeyRange, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> "PartitionsSubset[T_str]": return self.with_partition_keys( - self.partitions_def.get_partition_keys_in_range( + partitions_def.get_partition_keys_in_range( partition_key_range, dynamic_partitions_store=dynamic_partitions_store ) ) @@ -989,15 +990,15 @@ def __or__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]": def __sub__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]": if self is other: - return self.partitions_def.empty_subset() - return self.partitions_def.empty_subset().with_partition_keys( + return self.empty_subset(self.partitions_def) + return self.empty_subset(self.partitions_def).with_partition_keys( set(self.get_partition_keys()).difference(set(other.get_partition_keys())) ) def __and__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]": if self is other: return self - return self.partitions_def.empty_subset().with_partition_keys( + return self.empty_subset(self.partitions_def).with_partition_keys( set(self.get_partition_keys()) & set(other.get_partition_keys()) ) @@ -1024,7 +1025,7 @@ def can_deserialize( ... @abstractproperty - def partitions_def(self) -> PartitionsDefinition[T_str]: + def partitions_def(self) -> Optional[PartitionsDefinition[T_str]]: ... @abstractmethod @@ -1037,7 +1038,9 @@ def __contains__(self, value) -> bool: @classmethod @abstractmethod - def empty_subset(cls, partitions_def: PartitionsDefinition[T_str]) -> "PartitionsSubset[T_str]": + def empty_subset( + cls, partitions_def: Optional[PartitionsDefinition] = None + ) -> "PartitionsSubset[T_str]": ... @@ -1077,48 +1080,51 @@ def deserialize(self, partitions_def: PartitionsDefinition) -> PartitionsSubset: return partitions_def.deserialize_subset(self.serialized_subset) -class DefaultPartitionsSubset(PartitionsSubset[T_str]): +@whitelist_for_serdes +class DefaultPartitionsSubset( + PartitionsSubset, + NamedTuple("_DefaultPartitionsSubset", [("subset", AbstractSet[str])]), +): # Every time we change the serialization format, we should increment the version number. # This will ensure that we can gracefully degrade when deserializing old data. SERIALIZATION_VERSION = 1 - def __init__( - self, partitions_def: PartitionsDefinition[T_str], subset: Optional[Set[T_str]] = None + def __new__( + cls, + subset: Optional[AbstractSet[str]] = None, ): check.opt_set_param(subset, "subset") - self._partitions_def = partitions_def - self._subset = subset or set() + return super(DefaultPartitionsSubset, cls).__new__(cls, subset or set()) def get_partition_keys_not_in_subset( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[str]: - return ( - set( - self._partitions_def.get_partition_keys( - current_time=current_time, dynamic_partitions_store=dynamic_partitions_store - ) + return set( + partitions_def.get_partition_keys( + current_time=current_time, dynamic_partitions_store=dynamic_partitions_store ) - - self._subset - ) + ) - set(self.subset) def get_partition_keys(self, current_time: Optional[datetime] = None) -> Iterable[str]: - return self._subset + return self.subset def get_partition_key_ranges( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: - partition_keys = self._partitions_def.get_partition_keys( + partition_keys = partitions_def.get_partition_keys( current_time, dynamic_partitions_store=dynamic_partitions_store ) cur_range_start = None cur_range_end = None result = [] for partition_key in partition_keys: - if partition_key in self._subset: + if partition_key in self.subset: if cur_range_start is None: cur_range_start = partition_key cur_range_end = partition_key @@ -1132,12 +1138,9 @@ def get_partition_key_ranges( return result - def with_partition_keys( - self, partition_keys: Iterable[T_str] - ) -> "DefaultPartitionsSubset[T_str]": + def with_partition_keys(self, partition_keys: Iterable[str]) -> "DefaultPartitionsSubset": return DefaultPartitionsSubset( - self._partitions_def, - self._subset | set(partition_keys), + self.subset | set(partition_keys), ) def serialize(self) -> str: @@ -1147,32 +1150,32 @@ def serialize(self) -> str: { "version": self.SERIALIZATION_VERSION, # sort to ensure that equivalent partition subsets have identical serialized forms - "subset": sorted(list(self._subset)), + "subset": sorted(list(self.subset)), } ) @classmethod def from_serialized( - cls, partitions_def: PartitionsDefinition[T_str], serialized: str - ) -> "PartitionsSubset[T_str]": + cls, partitions_def: PartitionsDefinition, serialized: str + ) -> "PartitionsSubset": # Check the version number, so only valid versions can be deserialized. data = json.loads(serialized) if isinstance(data, list): # backwards compatibility - return cls(subset=set(data), partitions_def=partitions_def) + return cls(subset=set(data)) else: if data.get("version") != cls.SERIALIZATION_VERSION: raise DagsterInvalidDeserializationVersionError( f"Attempted to deserialize partition subset with version {data.get('version')}," f" but only version {cls.SERIALIZATION_VERSION} is supported." ) - return cls(subset=set(data.get("subset")), partitions_def=partitions_def) + return cls(subset=set(data.get("subset"))) @classmethod def can_deserialize( cls, - partitions_def: PartitionsDefinition[T_str], + partitions_def: PartitionsDefinition, serialized: str, serialized_partitions_def_unique_id: Optional[str], serialized_partitions_def_class_name: Optional[str], @@ -1186,27 +1189,23 @@ def can_deserialize( ) @property - def partitions_def(self) -> PartitionsDefinition[T_str]: - return self._partitions_def + def partitions_def(self) -> Optional[PartitionsDefinition[T_str]]: + return None def __eq__(self, other: object) -> bool: - return ( - isinstance(other, DefaultPartitionsSubset) - and self._partitions_def == other._partitions_def - and self._subset == other._subset - ) + return isinstance(other, DefaultPartitionsSubset) and self.subset == other.subset def __len__(self) -> int: - return len(self._subset) + return len(self.subset) def __contains__(self, value) -> bool: - return value in self._subset + return value in self.subset def __repr__(self) -> str: - return ( - f"DefaultPartitionsSubset(subset={self._subset}, partitions_def={self._partitions_def})" - ) + return f"DefaultPartitionsSubset(subset={self.subset})" @classmethod - def empty_subset(cls, partitions_def: PartitionsDefinition[T_str]) -> "PartitionsSubset[T_str]": - return cls(partitions_def=partitions_def) + def empty_subset( + cls, partitions_def: Optional[PartitionsDefinition] = None + ) -> "DefaultPartitionsSubset": + return cls() diff --git a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py index 5a5dddbbc6017..a5690fefc84be 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py @@ -62,6 +62,7 @@ class may change at any time. def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -81,6 +82,7 @@ def get_downstream_partitions_for_partitions( def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -107,6 +109,7 @@ class IdentityPartitionMapping(PartitionMapping, NamedTuple("_IdentityPartitionM def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -134,6 +137,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -167,6 +171,7 @@ class AllPartitionMapping(PartitionMapping, NamedTuple("_AllPartitionMapping", [ def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -179,6 +184,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -197,6 +203,7 @@ class LastPartitionMapping(PartitionMapping, NamedTuple("_LastPartitionMapping", def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -214,11 +221,12 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> PartitionsSubset: - last_upstream_partition = upstream_partitions_subset.partitions_def.get_last_partition_key( + last_upstream_partition = upstream_partitions_def.get_last_partition_key( current_time=current_time, dynamic_partitions_store=dynamic_partitions_store ) if last_upstream_partition and last_upstream_partition in upstream_partitions_subset: @@ -259,6 +267,7 @@ def a_downstream(upstream): def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -270,6 +279,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -325,9 +335,8 @@ def _get_dependency_partitions_subset( a_partition_keys_by_dimension = defaultdict(set) if isinstance(a_partitions_def, MultiPartitionsDefinition): for partition_key in a_partitions_subset.get_partition_keys(): - for dimension_name, key in cast( - MultiPartitionKey, partition_key - ).keys_by_dimension.items(): + key = a_partitions_def.get_partition_key_from_str(partition_key) + for dimension_name, key in key.keys_by_dimension.items(): a_partition_keys_by_dimension[dimension_name].add(key) else: for partition_key in a_partitions_subset.get_partition_keys(): @@ -383,6 +392,7 @@ def _get_dependency_partitions_subset( a_dimension_partitions_def.empty_subset().with_partition_keys( [key] ), + a_dimension_partitions_def, b_dimension_partitions_def, current_time=current_time, dynamic_partitions_store=dynamic_partitions_store, @@ -421,6 +431,7 @@ def _get_dependency_partitions_subset( a_dimension_partitions_def.empty_subset().with_partition_keys( [key] ), + a_dimension_partitions_def, b_dimension_partitions_def, current_time=current_time, dynamic_partitions_store=dynamic_partitions_store, @@ -447,7 +458,9 @@ def _get_dependency_partitions_subset( [ dep_b_keys_by_a_dim_and_key[dim_name][ ( - cast(MultiPartitionKey, key).keys_by_dimension[dim_name] + cast(MultiPartitionsDefinition, a_partitions_def) + .get_partition_key_from_str(key) + .keys_by_dimension[dim_name] if dim_name else key ) @@ -487,6 +500,7 @@ def _get_dependency_partitions_subset( def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -495,7 +509,7 @@ def get_upstream_mapped_partitions_result_for_partitions( check.failed("downstream asset is not partitioned") result = self._get_dependency_partitions_subset( - cast(MultiPartitionsDefinition, downstream_partitions_subset.partitions_def), + check.not_none(downstream_partitions_def), downstream_partitions_subset, cast(MultiPartitionsDefinition, upstream_partitions_def), a_upstream_of_b=False, @@ -511,6 +525,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -519,7 +534,7 @@ def get_downstream_partitions_for_partitions( check.failed("upstream asset is not partitioned") result = self._get_dependency_partitions_subset( - cast(MultiPartitionsDefinition, upstream_partitions_subset.partitions_def), + upstream_partitions_def, upstream_partitions_subset, cast(MultiPartitionsDefinition, downstream_partitions_def), a_upstream_of_b=True, @@ -855,6 +870,7 @@ def _check_downstream(self, *, downstream_partitions_def: PartitionsDefinition): def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -870,6 +886,7 @@ def get_downstream_partitions_for_partitions( def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py index 9d4f0d7ef1dd8..8e1215b776713 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py @@ -108,6 +108,7 @@ def __new__( def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -127,6 +128,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: Optional[PartitionsDefinition], current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -137,7 +139,7 @@ def get_downstream_partitions_for_partitions( if not provided. """ return self._map_partitions( - upstream_partitions_subset.partitions_def, + upstream_partitions_def, downstream_partitions_def, upstream_partitions_subset, end_offset=-self.start_offset, diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index 6c39df8d20ec7..8e0d49dca18bf 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -1497,6 +1497,7 @@ def _get_partition_time_windows_not_in_subset( def get_partition_keys_not_in_subset( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[str]: @@ -1529,6 +1530,7 @@ def with_partitions_def( def get_partition_key_ranges( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: @@ -1806,7 +1808,9 @@ def __eq__(self, other): ) or super(PartitionKeysTimeWindowPartitionsSubset, self).__eq__(other) @classmethod - def empty_subset(cls, partitions_def: PartitionsDefinition) -> "PartitionsSubset": + def empty_subset( + cls, partitions_def: Optional[PartitionsDefinition] = None + ) -> "PartitionsSubset": if not isinstance(partitions_def, TimeWindowPartitionsDefinition): check.failed("Partitions definition must be a TimeWindowPartitionsDefinition") partitions_def = cast(TimeWindowPartitionsDefinition, partitions_def) @@ -1975,7 +1979,7 @@ def with_partitions_def( ) def __repr__(self) -> str: - return f"TimeWindowPartitionsSubset({self.get_partition_key_ranges()})" + return f"TimeWindowPartitionsSubset({self.get_partition_key_ranges(self.partitions_def)})" class PartitionRangeStatus(Enum): diff --git a/python_modules/dagster/dagster/_core/execution/context/input.py b/python_modules/dagster/dagster/_core/execution/context/input.py index e821e95fdb194..0ca32a4fd5d23 100644 --- a/python_modules/dagster/dagster/_core/execution/context/input.py +++ b/python_modules/dagster/dagster/_core/execution/context/input.py @@ -365,7 +365,7 @@ def asset_partition_key_range(self) -> PartitionKeyRange: ) partition_key_ranges = subset.get_partition_key_ranges( - dynamic_partitions_store=self.instance + self.asset_partitions_def, dynamic_partitions_store=self.instance ) if len(partition_key_ranges) != 1: check.failed( @@ -608,7 +608,7 @@ def build_input_context( ) if asset_partitions_def and asset_partition_key_range: asset_partitions_subset = asset_partitions_def.empty_subset().with_partition_key_range( - asset_partition_key_range, dynamic_partitions_store=instance + asset_partitions_def, asset_partition_key_range, dynamic_partitions_store=instance ) elif asset_partition_key_range: asset_partitions_subset = KeyRangeNoPartitionsDefPartitionsSubset(asset_partition_key_range) @@ -643,6 +643,7 @@ def __init__(self, key_range: PartitionKeyRange): def get_partition_keys_not_in_subset( self, + partitions_def: "PartitionsDefinition", current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[str]: @@ -656,6 +657,7 @@ def get_partition_keys(self, current_time: Optional[datetime] = None) -> Iterabl def get_partition_key_ranges( self, + partitions_def: "PartitionsDefinition", current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: @@ -675,7 +677,7 @@ def serialize(self) -> str: raise NotImplementedError() @property - def partitions_def(self) -> "PartitionsDefinition": + def partitions_def(self) -> Optional["PartitionsDefinition"]: raise NotImplementedError() def __len__(self) -> int: @@ -701,5 +703,7 @@ def can_deserialize( raise NotImplementedError() @classmethod - def empty_subset(cls, partitions_def: "PartitionsDefinition") -> "PartitionsSubset": + def empty_subset( + cls, partitions_def: Optional["PartitionsDefinition"] = None + ) -> "PartitionsSubset": raise NotImplementedError() diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index a5486bee1eca7..60235f365eeb2 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1101,8 +1101,18 @@ def has_asset_partitions_for_input(self, input_name: str) -> bool: def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: subset = self.asset_partitions_subset_for_input(input_name) + + asset_layer = self.job_def.asset_layer + upstream_asset_key = check.not_none( + asset_layer.asset_key_for_input(self.node_handle, input_name) + ) + upstream_asset_partitions_def = check.not_none( + asset_layer.partitions_def_for_asset(upstream_asset_key) + ) + partition_key_ranges = subset.get_partition_key_ranges( - dynamic_partitions_store=self.instance + partitions_def=cast(PartitionsDefinition, upstream_asset_partitions_def), + dynamic_partitions_store=self.instance, ) if len(partition_key_ranges) != 1: @@ -1127,7 +1137,9 @@ def asset_partitions_subset_for_input( partitions_def = assets_def.partitions_def if assets_def else None partitions_subset = ( partitions_def.empty_subset().with_partition_key_range( - self.asset_partition_key_range, dynamic_partitions_store=self.instance + partitions_def, + self.asset_partition_key_range, + dynamic_partitions_store=self.instance, ) if partitions_def else None @@ -1142,6 +1154,7 @@ def asset_partitions_subset_for_input( mapped_partitions_result = ( partition_mapping.get_upstream_mapped_partitions_result_for_partitions( partitions_subset, + partitions_def, upstream_asset_partitions_def, dynamic_partitions_store=self.instance, ) diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index fd85f7d35543e..004a817218dc3 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -589,6 +589,7 @@ def asset_partitions_with_newly_updated_parents_and_new_latest_storage_id( child_partitions_subset = ( partition_mapping.get_downstream_partitions_for_partitions( partitions_subset, + partitions_def, downstream_partitions_def=child_partitions_def, dynamic_partitions_store=self, current_time=self.evaluation_time, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py index e1c8a0654b79f..9ae28328803c3 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py @@ -64,6 +64,7 @@ class TrailingWindowPartitionMapping(PartitionMapping): def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -74,7 +75,8 @@ def get_upstream_mapped_partitions_result_for_partitions( partition_keys = list(downstream_partitions_subset.get_partition_keys()) return UpstreamPartitionsResult( upstream_partitions_def.empty_subset().with_partition_key_range( - PartitionKeyRange(str(max(1, int(partition_keys[0]) - 1)), partition_keys[-1]) + upstream_partitions_def, + PartitionKeyRange(str(max(1, int(partition_keys[0]) - 1)), partition_keys[-1]), ), [], ) @@ -82,6 +84,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -260,24 +263,24 @@ def test_specific_partitions_partition_mapping_downstream_partitions(): # cases where at least one of the specific partitions is in the upstream partitions subset for partition_subset in [ - DefaultPartitionsSubset(upstream_partitions_def, {"a"}), - DefaultPartitionsSubset(upstream_partitions_def, {"a", "b"}), - DefaultPartitionsSubset(upstream_partitions_def, {"a", "b", "c", "d"}), + DefaultPartitionsSubset({"a"}), + DefaultPartitionsSubset({"a", "b"}), + DefaultPartitionsSubset({"a", "b", "c", "d"}), ]: assert ( partition_mapping.get_downstream_partitions_for_partitions( - partition_subset, downstream_partitions_def + partition_subset, upstream_partitions_def, downstream_partitions_def ) == downstream_partitions_def.subset_with_all_partitions() ) for partition_subset in [ - DefaultPartitionsSubset(upstream_partitions_def, {"c"}), - DefaultPartitionsSubset(upstream_partitions_def, {"c", "d"}), + DefaultPartitionsSubset({"c"}), + DefaultPartitionsSubset({"c", "d"}), ]: assert ( partition_mapping.get_downstream_partitions_for_partitions( - partition_subset, downstream_partitions_def + partition_subset, upstream_partitions_def, downstream_partitions_def ) == downstream_partitions_def.empty_subset() ) @@ -564,13 +567,13 @@ def test_identity_partition_mapping(): zx = StaticPartitionsDefinition(["z", "x"]) result = IdentityPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( - zx.empty_subset().with_partition_keys(["z", "x"]), xy + zx.empty_subset().with_partition_keys(["z", "x"]), zx, xy ) assert result.partitions_subset.get_partition_keys() == set(["x"]) assert result.required_but_nonexistent_partition_keys == ["z"] result = IdentityPartitionMapping().get_downstream_partitions_for_partitions( - zx.empty_subset().with_partition_keys(["z", "x"]), xy + zx.empty_subset().with_partition_keys(["z", "x"]), zx, xy ) assert result.get_partition_keys() == set(["x"]) @@ -854,6 +857,7 @@ def test_last_partition_mapping_get_downstream_partitions(): assert LastPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_def.empty_subset().with_partition_keys(["2023-10-04"]), + upstream_partitions_def, downstream_partitions_def, current_time, ) == downstream_partitions_def.empty_subset().with_partition_keys( @@ -862,6 +866,7 @@ def test_last_partition_mapping_get_downstream_partitions(): assert LastPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_def.empty_subset().with_partition_keys(["2023-10-03", "2023-10-04"]), + upstream_partitions_def, downstream_partitions_def, current_time, ) == downstream_partitions_def.empty_subset().with_partition_keys( @@ -871,6 +876,7 @@ def test_last_partition_mapping_get_downstream_partitions(): assert ( LastPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_def.empty_subset().with_partition_keys(["2023-10-03"]), + upstream_partitions_def, downstream_partitions_def, current_time, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py index beeb5ab3b4163..25226168cdb21 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py @@ -40,10 +40,11 @@ def test_get_downstream_partitions_single_key_in_range(): ) single_dimension_subset = single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("a", "a") + single_dimension_def, PartitionKeyRange("a", "a") ) result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=single_dimension_subset, + upstream_partitions_def=single_dimension_def, downstream_partitions_def=multipartitions_def, ) multipartitions_subset = multipartitions_def.empty_subset().with_partition_keys( @@ -57,6 +58,7 @@ def test_get_downstream_partitions_single_key_in_range(): result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=multipartitions_subset, + upstream_partitions_def=multipartitions_def, downstream_partitions_def=single_dimension_def, ) assert result == single_dimension_subset @@ -67,12 +69,12 @@ def test_get_downstream_partitions_single_key_in_range(): result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("b", "b") + partitions_def=single_dimension_def, partition_key_range=PartitionKeyRange("b", "b") ), + upstream_partitions_def=single_dimension_def, downstream_partitions_def=multipartitions_def, ) assert result == DefaultPartitionsSubset( - multipartitions_def, { MultiPartitionKey({"abc": "b", "xyz": "x"}), MultiPartitionKey({"abc": "b", "xyz": "y"}), @@ -87,7 +89,7 @@ def test_get_downstream_partitions_multiple_keys_in_range(): {"abc": single_dimension_def, "123": StaticPartitionsDefinition(["1", "2", "3"])} ) single_dimension_subset = single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("a", "b") + single_dimension_def, PartitionKeyRange("a", "b") ) multipartitions_subset = multipartitions_def.empty_subset().with_partition_keys( { @@ -102,12 +104,14 @@ def test_get_downstream_partitions_multiple_keys_in_range(): result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=single_dimension_subset, + upstream_partitions_def=single_dimension_def, downstream_partitions_def=multipartitions_def, ) assert result == multipartitions_subset result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=multipartitions_subset, + upstream_partitions_def=multipartitions_def, downstream_partitions_def=single_dimension_def, ) assert result == single_dimension_subset @@ -127,17 +131,19 @@ def test_get_downstream_partitions_multiple_keys_in_range(): @pytest.mark.parametrize( - "upstream_partitions_def,upstream_partitions_subset,downstream_partitions_subset", + "upstream_partitions_def,upstream_partitions_subset,downstream_partitions_subset,downstream_partitions_def", [ ( static_partitions_def, static_partitions_def.empty_subset().with_partition_keys({"a"}), static_multipartitions_def.empty_subset().with_partition_key_range( + static_multipartitions_def, PartitionKeyRange( MultiPartitionKey({"abc": "a", "123": "1"}), MultiPartitionKey({"abc": "a", "123": "1"}), - ) + ), ), + static_multipartitions_def, ), ( static_partitions_def, @@ -148,6 +154,7 @@ def test_get_downstream_partitions_multiple_keys_in_range(): MultiPartitionKey({"abc": "a", "123": "2"}), } ), + static_multipartitions_def, ), ( static_multipartitions_def, @@ -159,6 +166,7 @@ def test_get_downstream_partitions_multiple_keys_in_range(): } ), static_partitions_def.empty_subset().with_partition_keys({"a"}), + static_partitions_def, ), ( static_multipartitions_def, @@ -173,12 +181,17 @@ def test_get_downstream_partitions_multiple_keys_in_range(): } ), static_partitions_def.empty_subset().with_partition_keys({"a", "b"}), + static_partitions_def, ), ( daily_partitions_def, daily_partitions_def.empty_subset() - .with_partition_key_range(PartitionKeyRange(start="2023-01-08", end="2023-01-14")) - .with_partition_key_range(PartitionKeyRange(start="2023-01-29", end="2023-02-04")), + .with_partition_key_range( + daily_partitions_def, PartitionKeyRange(start="2023-01-08", end="2023-01-14") + ) + .with_partition_key_range( + daily_partitions_def, PartitionKeyRange(start="2023-01-29", end="2023-02-04") + ), weekly_multipartitions_def.empty_subset().with_partition_keys( { MultiPartitionKey({"ab": "a", "week": "2023-01-08"}), @@ -187,6 +200,7 @@ def test_get_downstream_partitions_multiple_keys_in_range(): MultiPartitionKey({"ab": "b", "week": "2023-01-29"}), } ), + weekly_multipartitions_def, ), ], ) @@ -194,11 +208,13 @@ def test_get_upstream_single_dimension_to_multi_partition_mapping( upstream_partitions_def, upstream_partitions_subset, downstream_partitions_subset, + downstream_partitions_def, ): assert ( MultiToSingleDimensionPartitionMapping() .get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset, + downstream_partitions_def, upstream_partitions_def, ) .partitions_subset @@ -219,38 +235,44 @@ def test_error_thrown_when_no_partition_dimension_name_provided(): with pytest.raises(CheckError, match="dimension name must be specified"): MultiToSingleDimensionPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( multipartitions_def.empty_subset().with_partition_key_range( + multipartitions_def, PartitionKeyRange( MultiPartitionKey({"a": "1", "b": "1"}), MultiPartitionKey({"a": "1", "b": "1"}), - ) + ), ), + multipartitions_def, single_dimension_def, ) with pytest.raises(CheckError, match="dimension name must be specified"): MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( multipartitions_def.empty_subset().with_partition_key_range( + multipartitions_def, PartitionKeyRange( MultiPartitionKey({"a": "1", "b": "1"}), MultiPartitionKey({"a": "1", "b": "1"}), - ) + ), ), + multipartitions_def, single_dimension_def, ) with pytest.raises(CheckError, match="dimension name must be specified"): MultiToSingleDimensionPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("1", "1") + single_dimension_def, PartitionKeyRange("1", "1") ), + single_dimension_def, multipartitions_def, ) with pytest.raises(CheckError, match="dimension name must be specified"): MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("1", "1") + single_dimension_def, PartitionKeyRange("1", "1") ), + single_dimension_def, multipartitions_def, ) @@ -629,6 +651,7 @@ def test_multipartitions_mapping_get_upstream_partitions( ): result = partitions_mapping.get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_def.empty_subset().with_partition_keys(downstream_partition_keys), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset.get_partition_keys() == set(upstream_partition_keys) @@ -665,6 +688,7 @@ def test_multipartitions_required_but_invalid_upstream_partitions(): MultiPartitionKey({"time": "2023-06-01", "123": "1"}), ] ), + may_multipartitions_def, june_multipartitions_def, ) assert result.partitions_subset.get_partition_keys() == set( @@ -689,6 +713,7 @@ def test_multipartitions_mapping_get_downstream_partitions( ): assert partitions_mapping.get_downstream_partitions_for_partitions( upstream_partitions_def.empty_subset().with_partition_keys(upstream_partition_keys), + upstream_partitions_def, downstream_partitions_def, ).get_partition_keys() == set(downstream_partition_keys) @@ -715,6 +740,7 @@ def test_multipartitions_mapping_dynamic(): downstream_partitions_def.empty_subset().with_partition_keys( [MultiPartitionKey({"dynamic": "a", "123": "1"})] ), + downstream_partitions_def, upstream_partitions_def, dynamic_partitions_store=instance, ) @@ -749,7 +775,9 @@ def test_error_multipartitions_mapping(): "other nonexistent dimension", SpecificPartitionsPartitionMapping(["c"]) ) } - ).get_upstream_mapped_partitions_result_for_partitions(weekly_abc.empty_subset(), daily_123) + ).get_upstream_mapped_partitions_result_for_partitions( + weekly_abc.empty_subset(), weekly_abc, daily_123 + ) def test_multi_partition_mapping_with_asset_deps(): @@ -920,6 +948,7 @@ def test_dynamic_dimension_multipartition_mapping(): result = MultiToSingleDimensionPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=foo.empty_subset().with_partition_keys(["a"]), + downstream_partitions_def=foo, upstream_partitions_def=foo_bar, dynamic_partitions_store=instance, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_static_partition_mapping.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_static_partition_mapping.py index 6f4b6f11fe233..00598139b9091 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_static_partition_mapping.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_static_partition_mapping.py @@ -16,19 +16,21 @@ def test_single_valued_static_mapping(): upstream_partitions_subset=upstream_parts.empty_subset().with_partition_keys( ["p1", "p3", "q2", "r1"] ), + upstream_partitions_def=upstream_parts, downstream_partitions_def=downstream_parts, ) - assert result == DefaultPartitionsSubset(downstream_parts, {"p", "r"}) + assert result == DefaultPartitionsSubset({"p", "r"}) result = mapping.get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=downstream_parts.empty_subset().with_partition_keys( ["p", "q"] ), + downstream_partitions_def=downstream_parts, upstream_partitions_def=upstream_parts, ) - assert result.partitions_subset == DefaultPartitionsSubset(upstream_parts, {"p1", "p2", "p3"}) + assert result.partitions_subset == DefaultPartitionsSubset({"p1", "p2", "p3"}) def test_multi_valued_static_mapping(): @@ -39,19 +41,21 @@ def test_multi_valued_static_mapping(): result = mapping.get_downstream_partitions_for_partitions( upstream_partitions_subset=upstream_parts.empty_subset().with_partition_keys(["p", "r"]), + upstream_partitions_def=upstream_parts, downstream_partitions_def=downstream_parts, ) - assert result == DefaultPartitionsSubset(downstream_parts, {"p1", "p2", "p3"}) + assert result == DefaultPartitionsSubset({"p1", "p2", "p3"}) result = mapping.get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=downstream_parts.empty_subset().with_partition_keys( ["p2", "p3", "q"] ), + downstream_partitions_def=downstream_parts, upstream_partitions_def=upstream_parts, ) - assert result.partitions_subset == DefaultPartitionsSubset(upstream_parts, {"p", "q1", "q2"}) + assert result.partitions_subset == DefaultPartitionsSubset({"p", "q1", "q2"}) def test_error_on_extra_keys_in_mapping(): @@ -63,6 +67,7 @@ def test_error_on_extra_keys_in_mapping(): {"p": "p", "q": {"q", "OTHER"}} ).get_downstream_partitions_for_partitions( upstream_partitions_subset=upstream_parts.empty_subset(), + upstream_partitions_def=upstream_parts, downstream_partitions_def=downstream_parts, ) @@ -71,6 +76,7 @@ def test_error_on_extra_keys_in_mapping(): {"p": "p", "q": "q", "OTHER": "q"} ).get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=downstream_parts.empty_subset(), + downstream_partitions_def=downstream_parts, upstream_partitions_def=upstream_parts, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py index 03202ece6c007..77e78c18747ae 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py @@ -30,6 +30,7 @@ def test_get_upstream_partitions_for_partition_range_same_partitioning(): # single partition key result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, ["2021-05-07"]), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset == upstream_partitions_def.empty_subset().with_partition_keys( @@ -39,6 +40,7 @@ def test_get_upstream_partitions_for_partition_range_same_partitioning(): # range of partition keys result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07", "2021-05-09"), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset == subset_with_key_range( @@ -52,6 +54,7 @@ def test_get_upstream_partitions_for_partition_range_same_partitioning_different result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07", "2021-05-09"), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset == subset_with_key_range( @@ -70,6 +73,7 @@ def test_get_upstream_partitions_for_partition_range_hourly_downstream_daily_ups upstream_partitions_def = DailyPartitionsDefinition(start_date="2021-05-05") result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, ["2021-05-07-05:00"]), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset == upstream_partitions_def.empty_subset().with_partition_keys( @@ -78,6 +82,7 @@ def test_get_upstream_partitions_for_partition_range_hourly_downstream_daily_ups result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07-05:00", "2021-05-09-09:00"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -93,6 +98,7 @@ def test_get_upstream_partitions_for_partition_range_daily_downstream_hourly_ups upstream_partitions_def = HourlyPartitionsDefinition(start_date="2021-05-05-00:00") result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, ["2021-05-07"]), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -104,6 +110,7 @@ def test_get_upstream_partitions_for_partition_range_daily_downstream_hourly_ups result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07", "2021-05-09"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -119,6 +126,7 @@ def test_get_upstream_partitions_for_partition_range_monthly_downstream_daily_up upstream_partitions_def = DailyPartitionsDefinition(start_date="2021-05-01") result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-01", "2021-07-01"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -139,6 +147,7 @@ def test_get_upstream_partitions_for_partition_range_twice_daily_downstream_dail ) result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-01", "2021-05-03"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -159,6 +168,7 @@ def test_get_upstream_partitions_for_partition_range_daily_downstream_twice_dail ) result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-01 00:00", "2021-05-03 00:00"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -179,6 +189,7 @@ def test_get_upstream_partitions_for_partition_range_daily_non_aligned(): ) result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-02", "2021-05-04"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -197,6 +208,7 @@ def test_get_upstream_partitions_for_partition_range_weekly_with_offset(): result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(partitions_def, "2022-09-11", "2022-09-11"), partitions_def, + partitions_def, ) assert result.partitions_subset.get_partition_keys() == ( partitions_def.get_partition_keys_in_range(PartitionKeyRange("2022-09-11", "2022-09-11")) @@ -211,17 +223,23 @@ def test_daily_to_daily_lag(): # single partition key assert mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-07"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-07"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-06"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2021-05-06"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2021-05-06"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2021-05-07"] # first partition key assert ( mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-05"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-05"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == [] ) @@ -229,17 +247,20 @@ def test_daily_to_daily_lag(): # range of partition keys assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07", "2021-05-09"), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-06", "2021-05-07", "2021-05-08"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_key_range(downstream_partitions_def, "2021-05-06", "2021-05-08"), + subset_with_key_range(upstream_partitions_def, "2021-05-06", "2021-05-08"), + upstream_partitions_def, downstream_partitions_def, ).get_partition_keys() == ["2021-05-07", "2021-05-08", "2021-05-09"] # range overlaps start assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-05", "2021-05-07"), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-05", "2021-05-06"] @@ -252,17 +273,23 @@ def test_exotic_cron_schedule_lag(): mapping = TimeWindowPartitionMapping(start_offset=-1, end_offset=-1) # single partition key assert mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-06_04"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-06_04"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-06_00"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2021-05-06_00"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2021-05-06_00"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2021-05-06_04"] # first partition key assert ( mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-05_00"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-05_00"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == [] ) @@ -270,17 +297,20 @@ def test_exotic_cron_schedule_lag(): # range of partition keys assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07_04", "2021-05-07_12"), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-07_00", "2021-05-07_04", "2021-05-07_08"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_key_range(downstream_partitions_def, "2021-05-07_04", "2021-05-07_12"), + subset_with_key_range(upstream_partitions_def, "2021-05-07_04", "2021-05-07_12"), + upstream_partitions_def, downstream_partitions_def, ).get_partition_keys() == ["2021-05-07_08", "2021-05-07_12", "2021-05-07_16"] # range overlaps start assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-05_00", "2021-05-05_08"), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-05_00", "2021-05-05_04"] @@ -291,11 +321,15 @@ def test_daily_to_daily_lag_different_start_date(): mapping = TimeWindowPartitionMapping(start_offset=-1, end_offset=-1) assert mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-06"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-06"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-05"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2021-05-05"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2021-05-05"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2021-05-06"] @@ -305,25 +339,33 @@ def test_daily_to_daily_many_to_one(): mapping = TimeWindowPartitionMapping(start_offset=-1) assert mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2022-07-04"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2022-07-04"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2022-07-03", "2022-07-04"] assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, ["2022-07-04", "2022-07-05"]), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2022-07-03", "2022-07-04", "2022-07-05"] assert mapping.get_downstream_partitions_for_partitions( subset_with_keys(upstream_partitions_def, ["2022-07-03", "2022-07-04"]), + upstream_partitions_def, downstream_partitions_def, ).get_partition_keys() == ["2022-07-03", "2022-07-04", "2022-07-05"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2022-07-03"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2022-07-03"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2022-07-03", "2022-07-04"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2022-07-04"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2022-07-04"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2022-07-04", "2022-07-05"] @@ -385,6 +427,7 @@ def test_get_downstream_with_current_time( assert ( mapping.get_downstream_partitions_for_partitions( subset_with_keys(upstream_partitions_def, upstream_keys), + upstream_partitions_def, downstream_partitions_def, current_time=current_time, ).get_partition_keys() @@ -489,6 +532,7 @@ def test_get_upstream_with_current_time( upstream_partitions_result = mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, downstream_keys), + downstream_partitions_def, upstream_partitions_def, current_time=current_time, ) @@ -508,6 +552,7 @@ def test_different_start_time_partitions_defs(): TimeWindowPartitionMapping() .get_downstream_partitions_for_partitions( upstream_partitions_subset=subset_with_keys(jan_start, ["2023-01-15"]), + upstream_partitions_def=jan_start, downstream_partitions_def=feb_start, ) .get_partition_keys() @@ -517,6 +562,7 @@ def test_different_start_time_partitions_defs(): upstream_partitions_result = ( TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=subset_with_keys(jan_start, ["2023-01-15"]), + downstream_partitions_def=jan_start, upstream_partitions_def=feb_start, ) ) @@ -530,6 +576,7 @@ def test_different_end_time_partitions_defs(): assert TimeWindowPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=subset_with_keys(jan_partitions_def, ["2023-01-15"]), + upstream_partitions_def=jan_partitions_def, downstream_partitions_def=jan_feb_partitions_def, ).get_partition_keys() == ["2023-01-15"] @@ -537,6 +584,7 @@ def test_different_end_time_partitions_defs(): TimeWindowPartitionMapping() .get_downstream_partitions_for_partitions( upstream_partitions_subset=subset_with_keys(jan_feb_partitions_def, ["2023-02-15"]), + upstream_partitions_def=jan_feb_partitions_def, downstream_partitions_def=jan_partitions_def, ) .get_partition_keys() @@ -546,6 +594,7 @@ def test_different_end_time_partitions_defs(): upstream_partitions_result = ( TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=subset_with_keys(jan_feb_partitions_def, ["2023-02-15"]), + downstream_partitions_def=jan_feb_partitions_def, upstream_partitions_def=jan_partitions_def, ) ) @@ -566,6 +615,7 @@ def test_daily_upstream_of_yearly(): allow_nonexistent_upstream_partitions=True ).get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=subset_with_keys(yearly, ["2023-01-01"]), + downstream_partitions_def=yearly, upstream_partitions_def=daily, current_time=datetime(2023, 1, 5, 0), ).partitions_subset.get_partition_keys() == [ @@ -633,6 +683,7 @@ def test_downstream_partition_has_valid_upstream_partitions( allow_nonexistent_upstream_partitions=allow_nonexistent_upstream_partitions ).get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=downstream_partitions_subset, + downstream_partitions_def=downstream_partitions_subset.partitions_def, upstream_partitions_def=upstream_partitions_def, current_time=current_time, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index 8c4756cc93c53..77667b0616a11 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -213,6 +213,7 @@ class TrailingWindowPartitionMapping(PartitionMapping): def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -223,7 +224,8 @@ def get_upstream_mapped_partitions_result_for_partitions( partition_keys = list(downstream_partitions_subset.get_partition_keys()) return UpstreamPartitionsResult( upstream_partitions_def.empty_subset().with_partition_key_range( - PartitionKeyRange(str(max(1, int(partition_keys[0]) - 1)), partition_keys[-1]) + upstream_partitions_def, + PartitionKeyRange(str(max(1, int(partition_keys[0]) - 1)), partition_keys[-1]), ), [], ) @@ -231,6 +233,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -405,7 +408,8 @@ def include_all(asset_key, partitions_subset): asset_graph, partitions_subsets_by_asset_key={ asset3.key: asset3.partitions_def.empty_subset().with_partition_key_range( - PartitionKeyRange("2022-01-02-00:00", "2022-01-03-23:00") + asset3.partitions_def, + PartitionKeyRange("2022-01-02-00:00", "2022-01-03-23:00"), ), }, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index d0dac272893ee..5206a1833b270 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -71,10 +71,13 @@ def get_upstream_partitions_for_partition_range( upstream_partitions_subset = ( downstream_partition_mapping.get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset, + downstream_partitions_def, upstream_partitions_def, ).partitions_subset ) - upstream_key_ranges = upstream_partitions_subset.get_partition_key_ranges() + upstream_key_ranges = upstream_partitions_subset.get_partition_key_ranges( + upstream_partitions_def + ) check.invariant(len(upstream_key_ranges) == 1) return upstream_key_ranges[0] @@ -101,10 +104,13 @@ def get_downstream_partitions_for_partition_range( downstream_partitions_subset = ( downstream_partition_mapping.get_downstream_partitions_for_partitions( upstream_partitions_subset, + upstream_partitions_def, downstream_assets_def.partitions_def, ) ) - downstream_key_ranges = downstream_partitions_subset.get_partition_key_ranges() + downstream_key_ranges = downstream_partitions_subset.get_partition_key_ranges( + downstream_assets_def.partitions_def + ) check.invariant(len(downstream_key_ranges) == 1) return downstream_key_ranges[0] diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index da95a13e60dbf..91f3a97061087 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -1389,9 +1389,11 @@ def foo_child(): ) assert asset_backfill_data.target_subset.partitions_subsets_by_asset_key == { - foo.key: foo_partitions_def.empty_subset().with_partition_key_range(partition_key_range), + foo.key: foo_partitions_def.empty_subset().with_partition_key_range( + foo_partitions_def, partition_key_range + ), foo_child.key: foo_partitions_def.empty_subset().with_partition_key_range( - partition_key_range + foo_partitions_def, partition_key_range ), } diff --git a/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py b/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py index 1399ed7a1d84d..d341aa18d2b53 100644 --- a/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py +++ b/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py @@ -142,10 +142,10 @@ def test_static_partitions_subset(): assert len(subset) == 0 assert "bar" not in subset with_some_partitions = subset.with_partition_keys(["foo", "bar"]) - assert with_some_partitions.get_partition_keys_not_in_subset() == {"baz", "qux"} + assert with_some_partitions.get_partition_keys_not_in_subset(partitions) == {"baz", "qux"} serialized = with_some_partitions.serialize() deserialized = partitions.deserialize_subset(serialized) - assert deserialized.get_partition_keys_not_in_subset() == {"baz", "qux"} + assert deserialized.get_partition_keys_not_in_subset(partitions) == {"baz", "qux"} assert len(with_some_partitions) == 2 assert len(deserialized) == 2 assert "bar" in with_some_partitions diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index 24562ae732a9e..84da523762b98 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -166,7 +166,10 @@ class AssetReconciliationScenario( ("cursor_from", Optional["AssetReconciliationScenario"]), ("current_time", Optional[datetime.datetime]), ("asset_selection", Optional[AssetSelection]), - ("active_backfill_targets", Optional[Sequence[Mapping[AssetKey, PartitionsSubset]]]), + ( + "active_backfill_targets", + Optional[Sequence[Union[Mapping[AssetKey, PartitionsSubset], Sequence[AssetKey]]]], + ), ("dagster_runs", Optional[Sequence[DagsterRun]]), ("event_log_entries", Optional[Sequence[EventLogEntry]]), ("expected_run_requests", Optional[Sequence[RunRequest]]), @@ -190,7 +193,9 @@ def __new__( cursor_from: Optional["AssetReconciliationScenario"] = None, current_time: Optional[datetime.datetime] = None, asset_selection: Optional[AssetSelection] = None, - active_backfill_targets: Optional[Sequence[Mapping[AssetKey, PartitionsSubset]]] = None, + active_backfill_targets: Optional[ + Sequence[Union[Mapping[AssetKey, PartitionsSubset], Sequence[AssetKey]]] + ] = None, dagster_runs: Optional[Sequence[DagsterRun]] = None, event_log_entries: Optional[Sequence[EventLogEntry]] = None, expected_run_requests: Optional[Sequence[RunRequest]] = None, @@ -300,11 +305,18 @@ def repo(): # add any backfills to the instance for i, target in enumerate(self.active_backfill_targets or []): - target_subset = AssetGraphSubset( - asset_graph=repo.asset_graph, - partitions_subsets_by_asset_key=target, - non_partitioned_asset_keys=set(), - ) + if isinstance(target, Mapping): + target_subset = AssetGraphSubset( + asset_graph=repo.asset_graph, + partitions_subsets_by_asset_key=target, + non_partitioned_asset_keys=set(), + ) + else: + target_subset = AssetGraphSubset( + asset_graph=repo.asset_graph, + partitions_subsets_by_asset_key={}, + non_partitioned_asset_keys=target, + ) empty_subset = AssetGraphSubset( asset_graph=repo.asset_graph, partitions_subsets_by_asset_key={}, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py index ec8bad1559a60..c3312a78b4c9e 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py @@ -167,17 +167,7 @@ ], ), }, - { - AssetKey( - "non_existant_asset" # ignored since can't be loaded - ): TimeWindowPartitionsSubset( - hourly_partitions_def, num_partitions=None, included_time_windows=[] - ).with_partition_keys( - [ - "2013-01-05-00:00", - ], - ), - }, + [AssetKey("non_existant_asset")], # ignored since can't be loaded ], current_time=create_pendulum_time(year=2013, month=1, day=5, hour=17), expected_run_requests=[ diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_flatten_time_window_ranges.py b/python_modules/dagster/dagster_tests/definitions_tests/test_flatten_time_window_ranges.py index 0cd5a9c0057e2..5042929338d4b 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_flatten_time_window_ranges.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_flatten_time_window_ranges.py @@ -38,10 +38,10 @@ def test_no_overlap() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-06", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-06", "2022-01-06") ), }, [ @@ -57,13 +57,13 @@ def test_no_overlap() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-06", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-06", "2022-01-06") ), PartitionRangeStatus.MATERIALIZING: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-8", "2022-01-09") + partitions_def, PartitionKeyRange("2022-01-8", "2022-01-09") ), }, [ @@ -86,10 +86,10 @@ def test_overlapped() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-05", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -106,10 +106,10 @@ def test_overlapped() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-05", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -132,10 +132,10 @@ def test_materialized_spans_failed() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-10") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-10") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-05", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -159,12 +159,18 @@ def test_materialized_spans_many_failed() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-01", "2022-12-10") + partitions_def, PartitionKeyRange("2022-01-01", "2022-12-10") ), PartitionRangeStatus.FAILED: ( - empty_subset.with_partition_key_range(PartitionKeyRange("2022-01-05", "2022-01-06")) - .with_partition_key_range(PartitionKeyRange("2022-03-01", "2022-03-10")) - .with_partition_key_range(PartitionKeyRange("2022-09-01", "2022-10-01")) + empty_subset.with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-03-01", "2022-03-10") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-09-01", "2022-10-01") + ) ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -221,8 +227,10 @@ def test_empty() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-10") - ).with_partition_key_range(PartitionKeyRange("2022-01-20", "2022-02-10")), + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-10") + ).with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-20", "2022-02-10") + ), PartitionRangeStatus.FAILED: empty_subset, PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -244,8 +252,10 @@ def test_empty() -> None: { PartitionRangeStatus.MATERIALIZED: empty_subset, PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-10") - ).with_partition_key_range(PartitionKeyRange("2022-01-20", "2022-02-10")), + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-10") + ).with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-20", "2022-02-10") + ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, [ @@ -267,8 +277,10 @@ def test_empty() -> None: PartitionRangeStatus.MATERIALIZED: empty_subset, PartitionRangeStatus.FAILED: empty_subset, PartitionRangeStatus.MATERIALIZING: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-10") - ).with_partition_key_range(PartitionKeyRange("2022-01-20", "2022-02-10")), + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-10") + ).with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-20", "2022-02-10") + ), }, [ { @@ -289,10 +301,12 @@ def test_cancels_out() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") - ).with_partition_key_range(PartitionKeyRange("2022-01-12", "2022-01-13")), + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") + ).with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-12", "2022-01-13") + ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -315,27 +329,37 @@ def test_lots() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: ( - empty_subset.with_partition_key_range(PartitionKeyRange("2022-01-02", "2022-01-05")) - .with_partition_key_range(PartitionKeyRange("2022-01-12", "2022-01-13")) - .with_partition_key_range(PartitionKeyRange("2022-01-15", "2022-01-17")) - .with_partition_key_range(PartitionKeyRange("2022-01-19", "2022-01-20")) - .with_partition_key_range(PartitionKeyRange("2022-01-22", "2022-01-24")) + empty_subset.with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-12", "2022-01-13") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-15", "2022-01-17") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-19", "2022-01-20") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-22", "2022-01-24") + ) ), PartitionRangeStatus.FAILED: ( empty_subset.with_partition_key_range( - PartitionKeyRange("2021-12-30", "2021-12-31") + partitions_def, PartitionKeyRange("2021-12-30", "2021-12-31") ) # before materialized subset .with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-03") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-03") ) # within materialized subset .with_partition_key_range( - PartitionKeyRange("2022-01-05", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") ) # directly after materialized subset .with_partition_key_range( - PartitionKeyRange("2022-01-08", "2022-01-09") + partitions_def, PartitionKeyRange("2022-01-08", "2022-01-09") ) # between materialized subsets .with_partition_key_range( - PartitionKeyRange("2022-01-11", "2022-01-14") + partitions_def, PartitionKeyRange("2022-01-11", "2022-01-14") ) # encompasses materialized subset .with_partition_keys(["2022-01-20"]) # at end materialized subset .with_partition_keys( @@ -381,13 +405,13 @@ def test_multiple_overlap_types(): _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-01", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-01", "2022-01-06") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.MATERIALIZING: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-03", "2022-01-04") + partitions_def, PartitionKeyRange("2022-01-03", "2022-01-04") ), }, [ diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py index e36c500ec4a43..0ddbb8e329f6c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py @@ -320,9 +320,9 @@ def test_multipartitions_subset_addition(initial, added): assert added_subset.get_partition_keys(current_time=current_day) == set( added_subset_keys + initial_subset_keys ) - assert added_subset.get_partition_keys_not_in_subset(current_time=current_day) == set( - expected_keys_not_in_updated_subset - ) + assert added_subset.get_partition_keys_not_in_subset( + multipartitions_def, current_time=current_day + ) == set(expected_keys_not_in_updated_subset) def test_asset_partition_key_is_multipartition_key(): diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py index 61472baa999b6..a0ea9d3ce691f 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py @@ -808,7 +808,8 @@ def test_partition_subset_get_partition_keys_not_in_subset( assert partition_key in subset assert ( subset.get_partition_keys_not_in_subset( - current_time=partitions_def.end_time_for_partition_key(full_set_keys[-1]) + partitions_def=partitions_def, + current_time=partitions_def.end_time_for_partition_key(full_set_keys[-1]), ) == expected_keys_not_in_subset ) @@ -946,7 +947,8 @@ def test_partition_subset_with_partition_keys( assert all(partition_key in updated_subset for partition_key in added_subset_keys) assert ( updated_subset.get_partition_keys_not_in_subset( - current_time=partitions_def.end_time_for_partition_key(full_set_keys[-1]) + partitions_def=partitions_def, + current_time=partitions_def.end_time_for_partition_key(full_set_keys[-1]), ) == expected_keys_not_in_updated_subset ) diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py index a0356e6a03eee..173c50405345b 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py @@ -295,6 +295,7 @@ class NoPartitionsPartitionMapping(PartitionMapping): def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -304,6 +305,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset, + upstream_partitions_def, downstream_partitions_def, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,