diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py index a0c34b17a4ce5..733aa51f8e727 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py @@ -20,18 +20,13 @@ DagsterEventType, DagsterInstance, EventRecordsFilter, - MultiPartitionKey, MultiPartitionsDefinition, _check as check, ) from dagster._core.definitions.data_time import CachingDataTimeResolver from dagster._core.definitions.external_asset_graph import ExternalAssetGraph -from dagster._core.definitions.multi_dimensional_partitions import ( - MultiPartitionsSubset, -) from dagster._core.definitions.partition import ( CachingDynamicPartitionsLoader, - DefaultPartitionsSubset, PartitionsDefinition, PartitionsSubset, ) @@ -420,6 +415,7 @@ def build_partition_statuses( materialized_partitions_subset: Optional[PartitionsSubset], failed_partitions_subset: Optional[PartitionsSubset], in_progress_partitions_subset: Optional[PartitionsSubset], + partitions_def: Optional[PartitionsDefinition], ) -> Union[ "GrapheneTimePartitionStatuses", "GrapheneDefaultPartitionStatuses", @@ -481,14 +477,15 @@ def build_partition_statuses( ) ) return GrapheneTimePartitionStatuses(ranges=graphene_ranges) - elif isinstance(materialized_partitions_subset, MultiPartitionsSubset): + elif isinstance(partitions_def, MultiPartitionsDefinition): return get_2d_run_length_encoded_partitions( dynamic_partitions_store, materialized_partitions_subset, failed_partitions_subset, in_progress_partitions_subset, + partitions_def, ) - elif isinstance(materialized_partitions_subset, DefaultPartitionsSubset): + elif partitions_def: materialized_keys = materialized_partitions_subset.get_partition_keys() failed_keys = failed_partitions_subset.get_partition_keys() in_progress_keys = in_progress_partitions_subset.get_partition_keys() @@ -499,7 +496,7 @@ def build_partition_statuses( - set(in_progress_keys), failedPartitions=failed_keys, unmaterializedPartitions=materialized_partitions_subset.get_partition_keys_not_in_subset( - dynamic_partitions_store=dynamic_partitions_store + partitions_def=partitions_def, dynamic_partitions_store=dynamic_partitions_store ), materializingPartitions=in_progress_keys, ) @@ -512,57 +509,53 @@ def get_2d_run_length_encoded_partitions( materialized_partitions_subset: PartitionsSubset, failed_partitions_subset: PartitionsSubset, in_progress_partitions_subset: PartitionsSubset, + partitions_def: MultiPartitionsDefinition, ) -> "GrapheneMultiPartitionStatuses": from ..schema.pipelines.pipeline import ( GrapheneMultiPartitionRangeStatuses, GrapheneMultiPartitionStatuses, ) - if ( - not isinstance(materialized_partitions_subset.partitions_def, MultiPartitionsDefinition) - or not isinstance(failed_partitions_subset.partitions_def, MultiPartitionsDefinition) - or not isinstance(in_progress_partitions_subset.partitions_def, MultiPartitionsDefinition) - ): - check.failed("Can only fetch 2D run length encoded partitions for multipartitioned assets") + check.invariant( + isinstance(partitions_def, MultiPartitionsDefinition), + "Partitions definition should be multipartitioned", + ) - primary_dim = materialized_partitions_subset.partitions_def.primary_dimension - secondary_dim = materialized_partitions_subset.partitions_def.secondary_dimension + primary_dim = partitions_def.primary_dimension + secondary_dim = partitions_def.secondary_dimension dim2_materialized_partition_subset_by_dim1: Dict[str, PartitionsSubset] = defaultdict( lambda: secondary_dim.partitions_def.empty_subset() ) - for partition_key in cast( - Sequence[MultiPartitionKey], materialized_partitions_subset.get_partition_keys() - ): + for partition_key in materialized_partitions_subset.get_partition_keys(): + multipartition_key = partitions_def.get_partition_key_from_str(partition_key) dim2_materialized_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] + multipartition_key.keys_by_dimension[primary_dim.name] ] = dim2_materialized_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] - ].with_partition_keys([partition_key.keys_by_dimension[secondary_dim.name]]) + multipartition_key.keys_by_dimension[primary_dim.name] + ].with_partition_keys([multipartition_key.keys_by_dimension[secondary_dim.name]]) dim2_failed_partition_subset_by_dim1: Dict[str, PartitionsSubset] = defaultdict( lambda: secondary_dim.partitions_def.empty_subset() ) - for partition_key in cast( - Sequence[MultiPartitionKey], failed_partitions_subset.get_partition_keys() - ): + for partition_key in failed_partitions_subset.get_partition_keys(): + multipartition_key = partitions_def.get_partition_key_from_str(partition_key) dim2_failed_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] + multipartition_key.keys_by_dimension[primary_dim.name] ] = dim2_failed_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] - ].with_partition_keys([partition_key.keys_by_dimension[secondary_dim.name]]) + multipartition_key.keys_by_dimension[primary_dim.name] + ].with_partition_keys([multipartition_key.keys_by_dimension[secondary_dim.name]]) dim2_in_progress_partition_subset_by_dim1: Dict[str, PartitionsSubset] = defaultdict( lambda: secondary_dim.partitions_def.empty_subset() ) - for partition_key in cast( - Sequence[MultiPartitionKey], in_progress_partitions_subset.get_partition_keys() - ): + for partition_key in in_progress_partitions_subset.get_partition_keys(): + multipartition_key = partitions_def.get_partition_key_from_str(partition_key) dim2_in_progress_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] + multipartition_key.keys_by_dimension[primary_dim.name] ] = dim2_in_progress_partition_subset_by_dim1[ - partition_key.keys_by_dimension[primary_dim.name] - ].with_partition_keys([partition_key.keys_by_dimension[secondary_dim.name]]) + multipartition_key.keys_by_dimension[primary_dim.name] + ].with_partition_keys([multipartition_key.keys_by_dimension[secondary_dim.name]]) materialized_2d_ranges = [] @@ -626,6 +619,7 @@ def get_2d_run_length_encoded_partitions( dim2_materialized_partition_subset_by_dim1[start_key], dim2_failed_partition_subset_by_dim1[start_key], dim2_in_progress_partition_subset_by_dim1[start_key], + secondary_dim.partitions_def, ), ) ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 5b9658ed82316..cc88f368949a0 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -930,6 +930,12 @@ def resolve_assetPartitionStatuses( if not self._dynamic_partitions_loader: check.failed("dynamic_partitions_loader must be provided to get partition keys") + partitions_def = ( + self._external_asset_node.partitions_def_data.get_partitions_definition() + if self._external_asset_node.partitions_def_data + else None + ) + ( materialized_partition_subset, failed_partition_subset, @@ -938,11 +944,7 @@ def resolve_assetPartitionStatuses( graphene_info.context.instance, asset_key, self._dynamic_partitions_loader, - ( - self._external_asset_node.partitions_def_data.get_partitions_definition() - if self._external_asset_node.partitions_def_data - else None - ), + partitions_def, ) return build_partition_statuses( @@ -950,6 +952,7 @@ def resolve_assetPartitionStatuses( materialized_partition_subset, failed_partition_subset, in_progress_subset, + partitions_def, ) def resolve_partitionStats( diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index c66eb3fe482c0..f46badb6b3958 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -133,7 +133,9 @@ def __init__(self, partition_subset: PartitionsSubset): if isinstance(partition_subset, BaseTimeWindowPartitionsSubset): ranges = [ GraphenePartitionKeyRange(start, end) - for start, end in partition_subset.get_partition_key_ranges() + for start, end in partition_subset.get_partition_key_ranges( + partition_subset.partitions_def + ) ] partition_keys = None else: # Default partitions subset diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 0d06219a3f9c3..d3e20748ac964 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -723,7 +723,7 @@ def _build_run_requests_with_backfill_policy( run_requests = [] partition_subset = partitions_def.subset_with_partition_keys(partition_keys) partition_key_ranges = partition_subset.get_partition_key_ranges( - dynamic_partitions_store=dynamic_partitions_store + partitions_def, dynamic_partitions_store=dynamic_partitions_store ) for partition_key_range in partition_key_ranges: # We might resolve more than one partition key range for the given partition keys. diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index 7d21972594584..ad47b6c9b4afb 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -80,6 +80,7 @@ def get_unhandled_partitions( ) return handled_subset.get_partition_keys_not_in_subset( + partitions_def=partitions_def, current_time=current_time, dynamic_partitions_store=dynamic_partitions_store, ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 572285e188142..3f1524a0d88b9 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -364,6 +364,7 @@ def get_child_partition_keys_of_parent( partition_mapping = self.get_partition_mapping(child_asset_key, parent_asset_key) child_partitions_subset = partition_mapping.get_downstream_partitions_for_partitions( parent_partitions_def.empty_subset().with_partition_keys([parent_partition_key]), + parent_partitions_def, downstream_partitions_def=child_partitions_def, dynamic_partitions_store=dynamic_partitions_store, current_time=current_time, @@ -437,7 +438,7 @@ def get_parent_partition_keys_for_child( """ partition_key = check.opt_str_param(partition_key, "partition_key") - child_partitions_def = self.get_partitions_def(child_asset_key) + child_partitions_def = cast(PartitionsDefinition, self.get_partitions_def(child_asset_key)) parent_partitions_def = self.get_partitions_def(parent_asset_key) if parent_partitions_def is None: @@ -449,12 +450,11 @@ def get_parent_partition_keys_for_child( return partition_mapping.get_upstream_mapped_partitions_result_for_partitions( ( - cast(PartitionsDefinition, child_partitions_def).subset_with_partition_keys( - [partition_key] - ) + child_partitions_def.subset_with_partition_keys([partition_key]) if partition_key else None ), + downstream_partitions_def=child_partitions_def, upstream_partitions_def=parent_partitions_def, dynamic_partitions_store=dynamic_partitions_store, current_time=current_time, @@ -611,6 +611,7 @@ def bfs_filter_subsets( child_partitions_subset = ( partition_mapping.get_downstream_partitions_for_partitions( partitions_subset, + check.not_none(self.get_partitions_def(asset_key)), downstream_partitions_def=child_partitions_def, dynamic_partitions_store=dynamic_partitions_store, current_time=current_time, diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py index 773ac4b159361..efe084745b58f 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py @@ -94,14 +94,18 @@ def to_storage_dict( for key, value in self.partitions_subsets_by_asset_key.items() }, "serializable_partitions_def_ids_by_asset_key": { - key.to_user_string(): value.partitions_def.get_serializable_unique_identifier( + key.to_user_string(): check.not_none( + self._asset_graph.get_partitions_def(key) + ).get_serializable_unique_identifier( dynamic_partitions_store=dynamic_partitions_store ) - for key, value in self.partitions_subsets_by_asset_key.items() + for key, _ in self.partitions_subsets_by_asset_key.items() }, "partitions_def_class_names_by_asset_key": { - key.to_user_string(): value.partitions_def.__class__.__name__ - for key, value in self.partitions_subsets_by_asset_key.items() + key.to_user_string(): check.not_none( + self._asset_graph.get_partitions_def(key) + ).__class__.__name__ + for key, _ in self.partitions_subsets_by_asset_key.items() }, "non_partitioned_asset_keys": [ key.to_user_string() for key in self._non_partitioned_asset_keys diff --git a/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py index 0fcd25945dc05..d4cdd574a4806 100644 --- a/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py @@ -718,6 +718,7 @@ def get_downstream_partition_keys( downstream_partition_key_subset = ( partition_mapping.get_downstream_partitions_for_partitions( from_asset.partitions_def.empty_subset().with_partition_keys([partition_key]), + from_asset.partitions_def, downstream_partitions_def=to_partitions_def, dynamic_partitions_store=self.instance, ) diff --git a/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py b/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py index 9acffb0e1aff2..1e4d95760dc10 100644 --- a/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/multi_dimensional_partitions.py @@ -4,7 +4,6 @@ from functools import lru_cache, reduce from typing import ( Dict, - Iterable, List, Mapping, NamedTuple, @@ -216,7 +215,7 @@ def __init__(self, partitions_defs: Mapping[str, PartitionsDefinition]): @property def partitions_subset_class(self) -> Type["PartitionsSubset"]: - return MultiPartitionsSubset + return DefaultPartitionsSubset def get_serializable_unique_identifier( self, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None @@ -509,33 +508,6 @@ def get_num_partitions( return reduce(lambda x, y: x * y, dimension_counts, 1) -class MultiPartitionsSubset(DefaultPartitionsSubset): - def __init__( - self, - partitions_def: MultiPartitionsDefinition, - subset: Optional[Set[str]] = None, - ): - check.inst_param(partitions_def, "partitions_def", MultiPartitionsDefinition) - subset = ( - set( - [ - partitions_def.get_partition_key_from_str(key) - for key in subset - if MULTIPARTITION_KEY_DELIMITER in key - ] - ) - if subset - else set() - ) - super(MultiPartitionsSubset, self).__init__(partitions_def, subset) - - def with_partition_keys(self, partition_keys: Iterable[str]) -> "MultiPartitionsSubset": - return MultiPartitionsSubset( - cast(MultiPartitionsDefinition, self._partitions_def), - self._subset | set(partition_keys), - ) - - def get_tags_from_multi_partition_key(multi_partition_key: MultiPartitionKey) -> Mapping[str, str]: check.inst_param(multi_partition_key, "multi_partition_key", MultiPartitionKey) diff --git a/python_modules/dagster/dagster/_core/definitions/partition.py b/python_modules/dagster/dagster/_core/definitions/partition.py index 542c6838339d2..1ec01f8a8a123 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition.py +++ b/python_modules/dagster/dagster/_core/definitions/partition.py @@ -9,6 +9,7 @@ ) from enum import Enum from typing import ( + AbstractSet, Any, Callable, Dict, @@ -18,7 +19,6 @@ NamedTuple, Optional, Sequence, - Set, Type, Union, cast, @@ -139,8 +139,8 @@ class PartitionsDefinition(ABC, Generic[T_str]): """ @property - def partitions_subset_class(self) -> Type["PartitionsSubset[T_str]"]: - return DefaultPartitionsSubset[T_str] + def partitions_subset_class(self) -> Type["PartitionsSubset"]: + return DefaultPartitionsSubset @abstractmethod @public @@ -217,26 +217,24 @@ def get_partition_keys_in_range( + 1 ] - def empty_subset(self) -> "PartitionsSubset[T_str]": + def empty_subset(self) -> "PartitionsSubset": return self.partitions_subset_class.empty_subset(self) - def subset_with_partition_keys( - self, partition_keys: Iterable[str] - ) -> "PartitionsSubset[T_str]": + def subset_with_partition_keys(self, partition_keys: Iterable[str]) -> "PartitionsSubset": return self.empty_subset().with_partition_keys(partition_keys) def subset_with_all_partitions( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, - ) -> "PartitionsSubset[T_str]": + ) -> "PartitionsSubset": return self.subset_with_partition_keys( self.get_partition_keys( current_time=current_time, dynamic_partitions_store=dynamic_partitions_store ) ) - def deserialize_subset(self, serialized: str) -> "PartitionsSubset[T_str]": + def deserialize_subset(self, serialized: str) -> "PartitionsSubset": return self.partitions_subset_class.from_serialized(self, serialized) def can_deserialize_subset( @@ -949,6 +947,7 @@ class PartitionsSubset(ABC, Generic[T_str]): @abstractmethod def get_partition_keys_not_in_subset( self, + partitions_def: PartitionsDefinition[T_str], current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[T_str]: @@ -962,6 +961,7 @@ def get_partition_keys(self, current_time: Optional[datetime] = None) -> Iterabl @abstractmethod def get_partition_key_ranges( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: @@ -973,11 +973,12 @@ def with_partition_keys(self, partition_keys: Iterable[str]) -> "PartitionsSubse def with_partition_key_range( self, + partitions_def: PartitionsDefinition[T_str], partition_key_range: PartitionKeyRange, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> "PartitionsSubset[T_str]": return self.with_partition_keys( - self.partitions_def.get_partition_keys_in_range( + partitions_def.get_partition_keys_in_range( partition_key_range, dynamic_partitions_store=dynamic_partitions_store ) ) @@ -989,15 +990,15 @@ def __or__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]": def __sub__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]": if self is other: - return self.partitions_def.empty_subset() - return self.partitions_def.empty_subset().with_partition_keys( + return self.empty_subset(self.partitions_def) + return self.empty_subset(self.partitions_def).with_partition_keys( set(self.get_partition_keys()).difference(set(other.get_partition_keys())) ) def __and__(self, other: "PartitionsSubset") -> "PartitionsSubset[T_str]": if self is other: return self - return self.partitions_def.empty_subset().with_partition_keys( + return self.empty_subset(self.partitions_def).with_partition_keys( set(self.get_partition_keys()) & set(other.get_partition_keys()) ) @@ -1024,7 +1025,7 @@ def can_deserialize( ... @abstractproperty - def partitions_def(self) -> PartitionsDefinition[T_str]: + def partitions_def(self) -> Optional[PartitionsDefinition[T_str]]: ... @abstractmethod @@ -1037,7 +1038,9 @@ def __contains__(self, value) -> bool: @classmethod @abstractmethod - def empty_subset(cls, partitions_def: PartitionsDefinition[T_str]) -> "PartitionsSubset[T_str]": + def empty_subset( + cls, partitions_def: Optional[PartitionsDefinition] = None + ) -> "PartitionsSubset[T_str]": ... @@ -1077,48 +1080,51 @@ def deserialize(self, partitions_def: PartitionsDefinition) -> PartitionsSubset: return partitions_def.deserialize_subset(self.serialized_subset) -class DefaultPartitionsSubset(PartitionsSubset[T_str]): +@whitelist_for_serdes +class DefaultPartitionsSubset( + PartitionsSubset, + NamedTuple("_DefaultPartitionsSubset", [("subset", AbstractSet[str])]), +): # Every time we change the serialization format, we should increment the version number. # This will ensure that we can gracefully degrade when deserializing old data. SERIALIZATION_VERSION = 1 - def __init__( - self, partitions_def: PartitionsDefinition[T_str], subset: Optional[Set[T_str]] = None + def __new__( + cls, + subset: Optional[AbstractSet[str]] = None, ): check.opt_set_param(subset, "subset") - self._partitions_def = partitions_def - self._subset = subset or set() + return super(DefaultPartitionsSubset, cls).__new__(cls, subset or set()) def get_partition_keys_not_in_subset( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[str]: - return ( - set( - self._partitions_def.get_partition_keys( - current_time=current_time, dynamic_partitions_store=dynamic_partitions_store - ) + return set( + partitions_def.get_partition_keys( + current_time=current_time, dynamic_partitions_store=dynamic_partitions_store ) - - self._subset - ) + ) - set(self.subset) def get_partition_keys(self, current_time: Optional[datetime] = None) -> Iterable[str]: - return self._subset + return self.subset def get_partition_key_ranges( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: - partition_keys = self._partitions_def.get_partition_keys( + partition_keys = partitions_def.get_partition_keys( current_time, dynamic_partitions_store=dynamic_partitions_store ) cur_range_start = None cur_range_end = None result = [] for partition_key in partition_keys: - if partition_key in self._subset: + if partition_key in self.subset: if cur_range_start is None: cur_range_start = partition_key cur_range_end = partition_key @@ -1132,12 +1138,9 @@ def get_partition_key_ranges( return result - def with_partition_keys( - self, partition_keys: Iterable[T_str] - ) -> "DefaultPartitionsSubset[T_str]": + def with_partition_keys(self, partition_keys: Iterable[str]) -> "DefaultPartitionsSubset": return DefaultPartitionsSubset( - self._partitions_def, - self._subset | set(partition_keys), + self.subset | set(partition_keys), ) def serialize(self) -> str: @@ -1147,32 +1150,32 @@ def serialize(self) -> str: { "version": self.SERIALIZATION_VERSION, # sort to ensure that equivalent partition subsets have identical serialized forms - "subset": sorted(list(self._subset)), + "subset": sorted(list(self.subset)), } ) @classmethod def from_serialized( - cls, partitions_def: PartitionsDefinition[T_str], serialized: str - ) -> "PartitionsSubset[T_str]": + cls, partitions_def: PartitionsDefinition, serialized: str + ) -> "PartitionsSubset": # Check the version number, so only valid versions can be deserialized. data = json.loads(serialized) if isinstance(data, list): # backwards compatibility - return cls(subset=set(data), partitions_def=partitions_def) + return cls(subset=set(data)) else: if data.get("version") != cls.SERIALIZATION_VERSION: raise DagsterInvalidDeserializationVersionError( f"Attempted to deserialize partition subset with version {data.get('version')}," f" but only version {cls.SERIALIZATION_VERSION} is supported." ) - return cls(subset=set(data.get("subset")), partitions_def=partitions_def) + return cls(subset=set(data.get("subset"))) @classmethod def can_deserialize( cls, - partitions_def: PartitionsDefinition[T_str], + partitions_def: PartitionsDefinition, serialized: str, serialized_partitions_def_unique_id: Optional[str], serialized_partitions_def_class_name: Optional[str], @@ -1186,27 +1189,23 @@ def can_deserialize( ) @property - def partitions_def(self) -> PartitionsDefinition[T_str]: - return self._partitions_def + def partitions_def(self) -> Optional[PartitionsDefinition[T_str]]: + return None def __eq__(self, other: object) -> bool: - return ( - isinstance(other, DefaultPartitionsSubset) - and self._partitions_def == other._partitions_def - and self._subset == other._subset - ) + return isinstance(other, DefaultPartitionsSubset) and self.subset == other.subset def __len__(self) -> int: - return len(self._subset) + return len(self.subset) def __contains__(self, value) -> bool: - return value in self._subset + return value in self.subset def __repr__(self) -> str: - return ( - f"DefaultPartitionsSubset(subset={self._subset}, partitions_def={self._partitions_def})" - ) + return f"DefaultPartitionsSubset(subset={self.subset})" @classmethod - def empty_subset(cls, partitions_def: PartitionsDefinition[T_str]) -> "PartitionsSubset[T_str]": - return cls(partitions_def=partitions_def) + def empty_subset( + cls, partitions_def: Optional[PartitionsDefinition] = None + ) -> "DefaultPartitionsSubset": + return cls() diff --git a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py index 5a5dddbbc6017..a5690fefc84be 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/partition_mapping.py @@ -62,6 +62,7 @@ class may change at any time. def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -81,6 +82,7 @@ def get_downstream_partitions_for_partitions( def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -107,6 +109,7 @@ class IdentityPartitionMapping(PartitionMapping, NamedTuple("_IdentityPartitionM def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -134,6 +137,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -167,6 +171,7 @@ class AllPartitionMapping(PartitionMapping, NamedTuple("_AllPartitionMapping", [ def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -179,6 +184,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -197,6 +203,7 @@ class LastPartitionMapping(PartitionMapping, NamedTuple("_LastPartitionMapping", def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -214,11 +221,12 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> PartitionsSubset: - last_upstream_partition = upstream_partitions_subset.partitions_def.get_last_partition_key( + last_upstream_partition = upstream_partitions_def.get_last_partition_key( current_time=current_time, dynamic_partitions_store=dynamic_partitions_store ) if last_upstream_partition and last_upstream_partition in upstream_partitions_subset: @@ -259,6 +267,7 @@ def a_downstream(upstream): def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -270,6 +279,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -325,9 +335,8 @@ def _get_dependency_partitions_subset( a_partition_keys_by_dimension = defaultdict(set) if isinstance(a_partitions_def, MultiPartitionsDefinition): for partition_key in a_partitions_subset.get_partition_keys(): - for dimension_name, key in cast( - MultiPartitionKey, partition_key - ).keys_by_dimension.items(): + key = a_partitions_def.get_partition_key_from_str(partition_key) + for dimension_name, key in key.keys_by_dimension.items(): a_partition_keys_by_dimension[dimension_name].add(key) else: for partition_key in a_partitions_subset.get_partition_keys(): @@ -383,6 +392,7 @@ def _get_dependency_partitions_subset( a_dimension_partitions_def.empty_subset().with_partition_keys( [key] ), + a_dimension_partitions_def, b_dimension_partitions_def, current_time=current_time, dynamic_partitions_store=dynamic_partitions_store, @@ -421,6 +431,7 @@ def _get_dependency_partitions_subset( a_dimension_partitions_def.empty_subset().with_partition_keys( [key] ), + a_dimension_partitions_def, b_dimension_partitions_def, current_time=current_time, dynamic_partitions_store=dynamic_partitions_store, @@ -447,7 +458,9 @@ def _get_dependency_partitions_subset( [ dep_b_keys_by_a_dim_and_key[dim_name][ ( - cast(MultiPartitionKey, key).keys_by_dimension[dim_name] + cast(MultiPartitionsDefinition, a_partitions_def) + .get_partition_key_from_str(key) + .keys_by_dimension[dim_name] if dim_name else key ) @@ -487,6 +500,7 @@ def _get_dependency_partitions_subset( def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -495,7 +509,7 @@ def get_upstream_mapped_partitions_result_for_partitions( check.failed("downstream asset is not partitioned") result = self._get_dependency_partitions_subset( - cast(MultiPartitionsDefinition, downstream_partitions_subset.partitions_def), + check.not_none(downstream_partitions_def), downstream_partitions_subset, cast(MultiPartitionsDefinition, upstream_partitions_def), a_upstream_of_b=False, @@ -511,6 +525,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -519,7 +534,7 @@ def get_downstream_partitions_for_partitions( check.failed("upstream asset is not partitioned") result = self._get_dependency_partitions_subset( - cast(MultiPartitionsDefinition, upstream_partitions_subset.partitions_def), + upstream_partitions_def, upstream_partitions_subset, cast(MultiPartitionsDefinition, downstream_partitions_def), a_upstream_of_b=True, @@ -855,6 +870,7 @@ def _check_downstream(self, *, downstream_partitions_def: PartitionsDefinition): def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -870,6 +886,7 @@ def get_downstream_partitions_for_partitions( def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py index 9d4f0d7ef1dd8..8e1215b776713 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partition_mapping.py @@ -108,6 +108,7 @@ def __new__( def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -127,6 +128,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: Optional[PartitionsDefinition], current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -137,7 +139,7 @@ def get_downstream_partitions_for_partitions( if not provided. """ return self._map_partitions( - upstream_partitions_subset.partitions_def, + upstream_partitions_def, downstream_partitions_def, upstream_partitions_subset, end_offset=-self.start_offset, diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index 6c39df8d20ec7..8e0d49dca18bf 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -1497,6 +1497,7 @@ def _get_partition_time_windows_not_in_subset( def get_partition_keys_not_in_subset( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[str]: @@ -1529,6 +1530,7 @@ def with_partitions_def( def get_partition_key_ranges( self, + partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: @@ -1806,7 +1808,9 @@ def __eq__(self, other): ) or super(PartitionKeysTimeWindowPartitionsSubset, self).__eq__(other) @classmethod - def empty_subset(cls, partitions_def: PartitionsDefinition) -> "PartitionsSubset": + def empty_subset( + cls, partitions_def: Optional[PartitionsDefinition] = None + ) -> "PartitionsSubset": if not isinstance(partitions_def, TimeWindowPartitionsDefinition): check.failed("Partitions definition must be a TimeWindowPartitionsDefinition") partitions_def = cast(TimeWindowPartitionsDefinition, partitions_def) @@ -1975,7 +1979,7 @@ def with_partitions_def( ) def __repr__(self) -> str: - return f"TimeWindowPartitionsSubset({self.get_partition_key_ranges()})" + return f"TimeWindowPartitionsSubset({self.get_partition_key_ranges(self.partitions_def)})" class PartitionRangeStatus(Enum): diff --git a/python_modules/dagster/dagster/_core/execution/context/input.py b/python_modules/dagster/dagster/_core/execution/context/input.py index e821e95fdb194..0ca32a4fd5d23 100644 --- a/python_modules/dagster/dagster/_core/execution/context/input.py +++ b/python_modules/dagster/dagster/_core/execution/context/input.py @@ -365,7 +365,7 @@ def asset_partition_key_range(self) -> PartitionKeyRange: ) partition_key_ranges = subset.get_partition_key_ranges( - dynamic_partitions_store=self.instance + self.asset_partitions_def, dynamic_partitions_store=self.instance ) if len(partition_key_ranges) != 1: check.failed( @@ -608,7 +608,7 @@ def build_input_context( ) if asset_partitions_def and asset_partition_key_range: asset_partitions_subset = asset_partitions_def.empty_subset().with_partition_key_range( - asset_partition_key_range, dynamic_partitions_store=instance + asset_partitions_def, asset_partition_key_range, dynamic_partitions_store=instance ) elif asset_partition_key_range: asset_partitions_subset = KeyRangeNoPartitionsDefPartitionsSubset(asset_partition_key_range) @@ -643,6 +643,7 @@ def __init__(self, key_range: PartitionKeyRange): def get_partition_keys_not_in_subset( self, + partitions_def: "PartitionsDefinition", current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[str]: @@ -656,6 +657,7 @@ def get_partition_keys(self, current_time: Optional[datetime] = None) -> Iterabl def get_partition_key_ranges( self, + partitions_def: "PartitionsDefinition", current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: @@ -675,7 +677,7 @@ def serialize(self) -> str: raise NotImplementedError() @property - def partitions_def(self) -> "PartitionsDefinition": + def partitions_def(self) -> Optional["PartitionsDefinition"]: raise NotImplementedError() def __len__(self) -> int: @@ -701,5 +703,7 @@ def can_deserialize( raise NotImplementedError() @classmethod - def empty_subset(cls, partitions_def: "PartitionsDefinition") -> "PartitionsSubset": + def empty_subset( + cls, partitions_def: Optional["PartitionsDefinition"] = None + ) -> "PartitionsSubset": raise NotImplementedError() diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index a5486bee1eca7..60235f365eeb2 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -1101,8 +1101,18 @@ def has_asset_partitions_for_input(self, input_name: str) -> bool: def asset_partition_key_range_for_input(self, input_name: str) -> PartitionKeyRange: subset = self.asset_partitions_subset_for_input(input_name) + + asset_layer = self.job_def.asset_layer + upstream_asset_key = check.not_none( + asset_layer.asset_key_for_input(self.node_handle, input_name) + ) + upstream_asset_partitions_def = check.not_none( + asset_layer.partitions_def_for_asset(upstream_asset_key) + ) + partition_key_ranges = subset.get_partition_key_ranges( - dynamic_partitions_store=self.instance + partitions_def=cast(PartitionsDefinition, upstream_asset_partitions_def), + dynamic_partitions_store=self.instance, ) if len(partition_key_ranges) != 1: @@ -1127,7 +1137,9 @@ def asset_partitions_subset_for_input( partitions_def = assets_def.partitions_def if assets_def else None partitions_subset = ( partitions_def.empty_subset().with_partition_key_range( - self.asset_partition_key_range, dynamic_partitions_store=self.instance + partitions_def, + self.asset_partition_key_range, + dynamic_partitions_store=self.instance, ) if partitions_def else None @@ -1142,6 +1154,7 @@ def asset_partitions_subset_for_input( mapped_partitions_result = ( partition_mapping.get_upstream_mapped_partitions_result_for_partitions( partitions_subset, + partitions_def, upstream_asset_partitions_def, dynamic_partitions_store=self.instance, ) diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index fd85f7d35543e..004a817218dc3 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -589,6 +589,7 @@ def asset_partitions_with_newly_updated_parents_and_new_latest_storage_id( child_partitions_subset = ( partition_mapping.get_downstream_partitions_for_partitions( partitions_subset, + partitions_def, downstream_partitions_def=child_partitions_def, dynamic_partitions_store=self, current_time=self.evaluation_time, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py index e1c8a0654b79f..9ae28328803c3 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py @@ -64,6 +64,7 @@ class TrailingWindowPartitionMapping(PartitionMapping): def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -74,7 +75,8 @@ def get_upstream_mapped_partitions_result_for_partitions( partition_keys = list(downstream_partitions_subset.get_partition_keys()) return UpstreamPartitionsResult( upstream_partitions_def.empty_subset().with_partition_key_range( - PartitionKeyRange(str(max(1, int(partition_keys[0]) - 1)), partition_keys[-1]) + upstream_partitions_def, + PartitionKeyRange(str(max(1, int(partition_keys[0]) - 1)), partition_keys[-1]), ), [], ) @@ -82,6 +84,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -260,24 +263,24 @@ def test_specific_partitions_partition_mapping_downstream_partitions(): # cases where at least one of the specific partitions is in the upstream partitions subset for partition_subset in [ - DefaultPartitionsSubset(upstream_partitions_def, {"a"}), - DefaultPartitionsSubset(upstream_partitions_def, {"a", "b"}), - DefaultPartitionsSubset(upstream_partitions_def, {"a", "b", "c", "d"}), + DefaultPartitionsSubset({"a"}), + DefaultPartitionsSubset({"a", "b"}), + DefaultPartitionsSubset({"a", "b", "c", "d"}), ]: assert ( partition_mapping.get_downstream_partitions_for_partitions( - partition_subset, downstream_partitions_def + partition_subset, upstream_partitions_def, downstream_partitions_def ) == downstream_partitions_def.subset_with_all_partitions() ) for partition_subset in [ - DefaultPartitionsSubset(upstream_partitions_def, {"c"}), - DefaultPartitionsSubset(upstream_partitions_def, {"c", "d"}), + DefaultPartitionsSubset({"c"}), + DefaultPartitionsSubset({"c", "d"}), ]: assert ( partition_mapping.get_downstream_partitions_for_partitions( - partition_subset, downstream_partitions_def + partition_subset, upstream_partitions_def, downstream_partitions_def ) == downstream_partitions_def.empty_subset() ) @@ -564,13 +567,13 @@ def test_identity_partition_mapping(): zx = StaticPartitionsDefinition(["z", "x"]) result = IdentityPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( - zx.empty_subset().with_partition_keys(["z", "x"]), xy + zx.empty_subset().with_partition_keys(["z", "x"]), zx, xy ) assert result.partitions_subset.get_partition_keys() == set(["x"]) assert result.required_but_nonexistent_partition_keys == ["z"] result = IdentityPartitionMapping().get_downstream_partitions_for_partitions( - zx.empty_subset().with_partition_keys(["z", "x"]), xy + zx.empty_subset().with_partition_keys(["z", "x"]), zx, xy ) assert result.get_partition_keys() == set(["x"]) @@ -854,6 +857,7 @@ def test_last_partition_mapping_get_downstream_partitions(): assert LastPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_def.empty_subset().with_partition_keys(["2023-10-04"]), + upstream_partitions_def, downstream_partitions_def, current_time, ) == downstream_partitions_def.empty_subset().with_partition_keys( @@ -862,6 +866,7 @@ def test_last_partition_mapping_get_downstream_partitions(): assert LastPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_def.empty_subset().with_partition_keys(["2023-10-03", "2023-10-04"]), + upstream_partitions_def, downstream_partitions_def, current_time, ) == downstream_partitions_def.empty_subset().with_partition_keys( @@ -871,6 +876,7 @@ def test_last_partition_mapping_get_downstream_partitions(): assert ( LastPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_def.empty_subset().with_partition_keys(["2023-10-03"]), + upstream_partitions_def, downstream_partitions_def, current_time, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py index beeb5ab3b4163..25226168cdb21 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_multipartition_partition_mapping.py @@ -40,10 +40,11 @@ def test_get_downstream_partitions_single_key_in_range(): ) single_dimension_subset = single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("a", "a") + single_dimension_def, PartitionKeyRange("a", "a") ) result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=single_dimension_subset, + upstream_partitions_def=single_dimension_def, downstream_partitions_def=multipartitions_def, ) multipartitions_subset = multipartitions_def.empty_subset().with_partition_keys( @@ -57,6 +58,7 @@ def test_get_downstream_partitions_single_key_in_range(): result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=multipartitions_subset, + upstream_partitions_def=multipartitions_def, downstream_partitions_def=single_dimension_def, ) assert result == single_dimension_subset @@ -67,12 +69,12 @@ def test_get_downstream_partitions_single_key_in_range(): result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("b", "b") + partitions_def=single_dimension_def, partition_key_range=PartitionKeyRange("b", "b") ), + upstream_partitions_def=single_dimension_def, downstream_partitions_def=multipartitions_def, ) assert result == DefaultPartitionsSubset( - multipartitions_def, { MultiPartitionKey({"abc": "b", "xyz": "x"}), MultiPartitionKey({"abc": "b", "xyz": "y"}), @@ -87,7 +89,7 @@ def test_get_downstream_partitions_multiple_keys_in_range(): {"abc": single_dimension_def, "123": StaticPartitionsDefinition(["1", "2", "3"])} ) single_dimension_subset = single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("a", "b") + single_dimension_def, PartitionKeyRange("a", "b") ) multipartitions_subset = multipartitions_def.empty_subset().with_partition_keys( { @@ -102,12 +104,14 @@ def test_get_downstream_partitions_multiple_keys_in_range(): result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=single_dimension_subset, + upstream_partitions_def=single_dimension_def, downstream_partitions_def=multipartitions_def, ) assert result == multipartitions_subset result = MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=multipartitions_subset, + upstream_partitions_def=multipartitions_def, downstream_partitions_def=single_dimension_def, ) assert result == single_dimension_subset @@ -127,17 +131,19 @@ def test_get_downstream_partitions_multiple_keys_in_range(): @pytest.mark.parametrize( - "upstream_partitions_def,upstream_partitions_subset,downstream_partitions_subset", + "upstream_partitions_def,upstream_partitions_subset,downstream_partitions_subset,downstream_partitions_def", [ ( static_partitions_def, static_partitions_def.empty_subset().with_partition_keys({"a"}), static_multipartitions_def.empty_subset().with_partition_key_range( + static_multipartitions_def, PartitionKeyRange( MultiPartitionKey({"abc": "a", "123": "1"}), MultiPartitionKey({"abc": "a", "123": "1"}), - ) + ), ), + static_multipartitions_def, ), ( static_partitions_def, @@ -148,6 +154,7 @@ def test_get_downstream_partitions_multiple_keys_in_range(): MultiPartitionKey({"abc": "a", "123": "2"}), } ), + static_multipartitions_def, ), ( static_multipartitions_def, @@ -159,6 +166,7 @@ def test_get_downstream_partitions_multiple_keys_in_range(): } ), static_partitions_def.empty_subset().with_partition_keys({"a"}), + static_partitions_def, ), ( static_multipartitions_def, @@ -173,12 +181,17 @@ def test_get_downstream_partitions_multiple_keys_in_range(): } ), static_partitions_def.empty_subset().with_partition_keys({"a", "b"}), + static_partitions_def, ), ( daily_partitions_def, daily_partitions_def.empty_subset() - .with_partition_key_range(PartitionKeyRange(start="2023-01-08", end="2023-01-14")) - .with_partition_key_range(PartitionKeyRange(start="2023-01-29", end="2023-02-04")), + .with_partition_key_range( + daily_partitions_def, PartitionKeyRange(start="2023-01-08", end="2023-01-14") + ) + .with_partition_key_range( + daily_partitions_def, PartitionKeyRange(start="2023-01-29", end="2023-02-04") + ), weekly_multipartitions_def.empty_subset().with_partition_keys( { MultiPartitionKey({"ab": "a", "week": "2023-01-08"}), @@ -187,6 +200,7 @@ def test_get_downstream_partitions_multiple_keys_in_range(): MultiPartitionKey({"ab": "b", "week": "2023-01-29"}), } ), + weekly_multipartitions_def, ), ], ) @@ -194,11 +208,13 @@ def test_get_upstream_single_dimension_to_multi_partition_mapping( upstream_partitions_def, upstream_partitions_subset, downstream_partitions_subset, + downstream_partitions_def, ): assert ( MultiToSingleDimensionPartitionMapping() .get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset, + downstream_partitions_def, upstream_partitions_def, ) .partitions_subset @@ -219,38 +235,44 @@ def test_error_thrown_when_no_partition_dimension_name_provided(): with pytest.raises(CheckError, match="dimension name must be specified"): MultiToSingleDimensionPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( multipartitions_def.empty_subset().with_partition_key_range( + multipartitions_def, PartitionKeyRange( MultiPartitionKey({"a": "1", "b": "1"}), MultiPartitionKey({"a": "1", "b": "1"}), - ) + ), ), + multipartitions_def, single_dimension_def, ) with pytest.raises(CheckError, match="dimension name must be specified"): MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( multipartitions_def.empty_subset().with_partition_key_range( + multipartitions_def, PartitionKeyRange( MultiPartitionKey({"a": "1", "b": "1"}), MultiPartitionKey({"a": "1", "b": "1"}), - ) + ), ), + multipartitions_def, single_dimension_def, ) with pytest.raises(CheckError, match="dimension name must be specified"): MultiToSingleDimensionPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("1", "1") + single_dimension_def, PartitionKeyRange("1", "1") ), + single_dimension_def, multipartitions_def, ) with pytest.raises(CheckError, match="dimension name must be specified"): MultiToSingleDimensionPartitionMapping().get_downstream_partitions_for_partitions( single_dimension_def.empty_subset().with_partition_key_range( - PartitionKeyRange("1", "1") + single_dimension_def, PartitionKeyRange("1", "1") ), + single_dimension_def, multipartitions_def, ) @@ -629,6 +651,7 @@ def test_multipartitions_mapping_get_upstream_partitions( ): result = partitions_mapping.get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_def.empty_subset().with_partition_keys(downstream_partition_keys), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset.get_partition_keys() == set(upstream_partition_keys) @@ -665,6 +688,7 @@ def test_multipartitions_required_but_invalid_upstream_partitions(): MultiPartitionKey({"time": "2023-06-01", "123": "1"}), ] ), + may_multipartitions_def, june_multipartitions_def, ) assert result.partitions_subset.get_partition_keys() == set( @@ -689,6 +713,7 @@ def test_multipartitions_mapping_get_downstream_partitions( ): assert partitions_mapping.get_downstream_partitions_for_partitions( upstream_partitions_def.empty_subset().with_partition_keys(upstream_partition_keys), + upstream_partitions_def, downstream_partitions_def, ).get_partition_keys() == set(downstream_partition_keys) @@ -715,6 +740,7 @@ def test_multipartitions_mapping_dynamic(): downstream_partitions_def.empty_subset().with_partition_keys( [MultiPartitionKey({"dynamic": "a", "123": "1"})] ), + downstream_partitions_def, upstream_partitions_def, dynamic_partitions_store=instance, ) @@ -749,7 +775,9 @@ def test_error_multipartitions_mapping(): "other nonexistent dimension", SpecificPartitionsPartitionMapping(["c"]) ) } - ).get_upstream_mapped_partitions_result_for_partitions(weekly_abc.empty_subset(), daily_123) + ).get_upstream_mapped_partitions_result_for_partitions( + weekly_abc.empty_subset(), weekly_abc, daily_123 + ) def test_multi_partition_mapping_with_asset_deps(): @@ -920,6 +948,7 @@ def test_dynamic_dimension_multipartition_mapping(): result = MultiToSingleDimensionPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=foo.empty_subset().with_partition_keys(["a"]), + downstream_partitions_def=foo, upstream_partitions_def=foo_bar, dynamic_partitions_store=instance, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_static_partition_mapping.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_static_partition_mapping.py index 6f4b6f11fe233..00598139b9091 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_static_partition_mapping.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_static_partition_mapping.py @@ -16,19 +16,21 @@ def test_single_valued_static_mapping(): upstream_partitions_subset=upstream_parts.empty_subset().with_partition_keys( ["p1", "p3", "q2", "r1"] ), + upstream_partitions_def=upstream_parts, downstream_partitions_def=downstream_parts, ) - assert result == DefaultPartitionsSubset(downstream_parts, {"p", "r"}) + assert result == DefaultPartitionsSubset({"p", "r"}) result = mapping.get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=downstream_parts.empty_subset().with_partition_keys( ["p", "q"] ), + downstream_partitions_def=downstream_parts, upstream_partitions_def=upstream_parts, ) - assert result.partitions_subset == DefaultPartitionsSubset(upstream_parts, {"p1", "p2", "p3"}) + assert result.partitions_subset == DefaultPartitionsSubset({"p1", "p2", "p3"}) def test_multi_valued_static_mapping(): @@ -39,19 +41,21 @@ def test_multi_valued_static_mapping(): result = mapping.get_downstream_partitions_for_partitions( upstream_partitions_subset=upstream_parts.empty_subset().with_partition_keys(["p", "r"]), + upstream_partitions_def=upstream_parts, downstream_partitions_def=downstream_parts, ) - assert result == DefaultPartitionsSubset(downstream_parts, {"p1", "p2", "p3"}) + assert result == DefaultPartitionsSubset({"p1", "p2", "p3"}) result = mapping.get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=downstream_parts.empty_subset().with_partition_keys( ["p2", "p3", "q"] ), + downstream_partitions_def=downstream_parts, upstream_partitions_def=upstream_parts, ) - assert result.partitions_subset == DefaultPartitionsSubset(upstream_parts, {"p", "q1", "q2"}) + assert result.partitions_subset == DefaultPartitionsSubset({"p", "q1", "q2"}) def test_error_on_extra_keys_in_mapping(): @@ -63,6 +67,7 @@ def test_error_on_extra_keys_in_mapping(): {"p": "p", "q": {"q", "OTHER"}} ).get_downstream_partitions_for_partitions( upstream_partitions_subset=upstream_parts.empty_subset(), + upstream_partitions_def=upstream_parts, downstream_partitions_def=downstream_parts, ) @@ -71,6 +76,7 @@ def test_error_on_extra_keys_in_mapping(): {"p": "p", "q": "q", "OTHER": "q"} ).get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=downstream_parts.empty_subset(), + downstream_partitions_def=downstream_parts, upstream_partitions_def=upstream_parts, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py index 03202ece6c007..77e78c18747ae 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py @@ -30,6 +30,7 @@ def test_get_upstream_partitions_for_partition_range_same_partitioning(): # single partition key result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, ["2021-05-07"]), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset == upstream_partitions_def.empty_subset().with_partition_keys( @@ -39,6 +40,7 @@ def test_get_upstream_partitions_for_partition_range_same_partitioning(): # range of partition keys result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07", "2021-05-09"), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset == subset_with_key_range( @@ -52,6 +54,7 @@ def test_get_upstream_partitions_for_partition_range_same_partitioning_different result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07", "2021-05-09"), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset == subset_with_key_range( @@ -70,6 +73,7 @@ def test_get_upstream_partitions_for_partition_range_hourly_downstream_daily_ups upstream_partitions_def = DailyPartitionsDefinition(start_date="2021-05-05") result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, ["2021-05-07-05:00"]), + downstream_partitions_def, upstream_partitions_def, ) assert result.partitions_subset == upstream_partitions_def.empty_subset().with_partition_keys( @@ -78,6 +82,7 @@ def test_get_upstream_partitions_for_partition_range_hourly_downstream_daily_ups result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07-05:00", "2021-05-09-09:00"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -93,6 +98,7 @@ def test_get_upstream_partitions_for_partition_range_daily_downstream_hourly_ups upstream_partitions_def = HourlyPartitionsDefinition(start_date="2021-05-05-00:00") result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, ["2021-05-07"]), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -104,6 +110,7 @@ def test_get_upstream_partitions_for_partition_range_daily_downstream_hourly_ups result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07", "2021-05-09"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -119,6 +126,7 @@ def test_get_upstream_partitions_for_partition_range_monthly_downstream_daily_up upstream_partitions_def = DailyPartitionsDefinition(start_date="2021-05-01") result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-01", "2021-07-01"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -139,6 +147,7 @@ def test_get_upstream_partitions_for_partition_range_twice_daily_downstream_dail ) result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-01", "2021-05-03"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -159,6 +168,7 @@ def test_get_upstream_partitions_for_partition_range_daily_downstream_twice_dail ) result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-01 00:00", "2021-05-03 00:00"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -179,6 +189,7 @@ def test_get_upstream_partitions_for_partition_range_daily_non_aligned(): ) result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-02", "2021-05-04"), + downstream_partitions_def, upstream_partitions_def, ) assert ( @@ -197,6 +208,7 @@ def test_get_upstream_partitions_for_partition_range_weekly_with_offset(): result = TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(partitions_def, "2022-09-11", "2022-09-11"), partitions_def, + partitions_def, ) assert result.partitions_subset.get_partition_keys() == ( partitions_def.get_partition_keys_in_range(PartitionKeyRange("2022-09-11", "2022-09-11")) @@ -211,17 +223,23 @@ def test_daily_to_daily_lag(): # single partition key assert mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-07"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-07"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-06"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2021-05-06"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2021-05-06"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2021-05-07"] # first partition key assert ( mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-05"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-05"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == [] ) @@ -229,17 +247,20 @@ def test_daily_to_daily_lag(): # range of partition keys assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07", "2021-05-09"), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-06", "2021-05-07", "2021-05-08"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_key_range(downstream_partitions_def, "2021-05-06", "2021-05-08"), + subset_with_key_range(upstream_partitions_def, "2021-05-06", "2021-05-08"), + upstream_partitions_def, downstream_partitions_def, ).get_partition_keys() == ["2021-05-07", "2021-05-08", "2021-05-09"] # range overlaps start assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-05", "2021-05-07"), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-05", "2021-05-06"] @@ -252,17 +273,23 @@ def test_exotic_cron_schedule_lag(): mapping = TimeWindowPartitionMapping(start_offset=-1, end_offset=-1) # single partition key assert mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-06_04"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-06_04"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-06_00"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2021-05-06_00"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2021-05-06_00"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2021-05-06_04"] # first partition key assert ( mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-05_00"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-05_00"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == [] ) @@ -270,17 +297,20 @@ def test_exotic_cron_schedule_lag(): # range of partition keys assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-07_04", "2021-05-07_12"), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-07_00", "2021-05-07_04", "2021-05-07_08"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_key_range(downstream_partitions_def, "2021-05-07_04", "2021-05-07_12"), + subset_with_key_range(upstream_partitions_def, "2021-05-07_04", "2021-05-07_12"), + upstream_partitions_def, downstream_partitions_def, ).get_partition_keys() == ["2021-05-07_08", "2021-05-07_12", "2021-05-07_16"] # range overlaps start assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_key_range(downstream_partitions_def, "2021-05-05_00", "2021-05-05_08"), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-05_00", "2021-05-05_04"] @@ -291,11 +321,15 @@ def test_daily_to_daily_lag_different_start_date(): mapping = TimeWindowPartitionMapping(start_offset=-1, end_offset=-1) assert mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2021-05-06"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2021-05-06"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2021-05-05"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2021-05-05"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2021-05-05"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2021-05-06"] @@ -305,25 +339,33 @@ def test_daily_to_daily_many_to_one(): mapping = TimeWindowPartitionMapping(start_offset=-1) assert mapping.get_upstream_mapped_partitions_result_for_partitions( - subset_with_keys(downstream_partitions_def, ["2022-07-04"]), upstream_partitions_def + subset_with_keys(downstream_partitions_def, ["2022-07-04"]), + downstream_partitions_def, + upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2022-07-03", "2022-07-04"] assert mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, ["2022-07-04", "2022-07-05"]), + downstream_partitions_def, upstream_partitions_def, ).partitions_subset.get_partition_keys() == ["2022-07-03", "2022-07-04", "2022-07-05"] assert mapping.get_downstream_partitions_for_partitions( subset_with_keys(upstream_partitions_def, ["2022-07-03", "2022-07-04"]), + upstream_partitions_def, downstream_partitions_def, ).get_partition_keys() == ["2022-07-03", "2022-07-04", "2022-07-05"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2022-07-03"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2022-07-03"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2022-07-03", "2022-07-04"] assert mapping.get_downstream_partitions_for_partitions( - subset_with_keys(upstream_partitions_def, ["2022-07-04"]), downstream_partitions_def + subset_with_keys(upstream_partitions_def, ["2022-07-04"]), + upstream_partitions_def, + downstream_partitions_def, ).get_partition_keys() == ["2022-07-04", "2022-07-05"] @@ -385,6 +427,7 @@ def test_get_downstream_with_current_time( assert ( mapping.get_downstream_partitions_for_partitions( subset_with_keys(upstream_partitions_def, upstream_keys), + upstream_partitions_def, downstream_partitions_def, current_time=current_time, ).get_partition_keys() @@ -489,6 +532,7 @@ def test_get_upstream_with_current_time( upstream_partitions_result = mapping.get_upstream_mapped_partitions_result_for_partitions( subset_with_keys(downstream_partitions_def, downstream_keys), + downstream_partitions_def, upstream_partitions_def, current_time=current_time, ) @@ -508,6 +552,7 @@ def test_different_start_time_partitions_defs(): TimeWindowPartitionMapping() .get_downstream_partitions_for_partitions( upstream_partitions_subset=subset_with_keys(jan_start, ["2023-01-15"]), + upstream_partitions_def=jan_start, downstream_partitions_def=feb_start, ) .get_partition_keys() @@ -517,6 +562,7 @@ def test_different_start_time_partitions_defs(): upstream_partitions_result = ( TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=subset_with_keys(jan_start, ["2023-01-15"]), + downstream_partitions_def=jan_start, upstream_partitions_def=feb_start, ) ) @@ -530,6 +576,7 @@ def test_different_end_time_partitions_defs(): assert TimeWindowPartitionMapping().get_downstream_partitions_for_partitions( upstream_partitions_subset=subset_with_keys(jan_partitions_def, ["2023-01-15"]), + upstream_partitions_def=jan_partitions_def, downstream_partitions_def=jan_feb_partitions_def, ).get_partition_keys() == ["2023-01-15"] @@ -537,6 +584,7 @@ def test_different_end_time_partitions_defs(): TimeWindowPartitionMapping() .get_downstream_partitions_for_partitions( upstream_partitions_subset=subset_with_keys(jan_feb_partitions_def, ["2023-02-15"]), + upstream_partitions_def=jan_feb_partitions_def, downstream_partitions_def=jan_partitions_def, ) .get_partition_keys() @@ -546,6 +594,7 @@ def test_different_end_time_partitions_defs(): upstream_partitions_result = ( TimeWindowPartitionMapping().get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=subset_with_keys(jan_feb_partitions_def, ["2023-02-15"]), + downstream_partitions_def=jan_feb_partitions_def, upstream_partitions_def=jan_partitions_def, ) ) @@ -566,6 +615,7 @@ def test_daily_upstream_of_yearly(): allow_nonexistent_upstream_partitions=True ).get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=subset_with_keys(yearly, ["2023-01-01"]), + downstream_partitions_def=yearly, upstream_partitions_def=daily, current_time=datetime(2023, 1, 5, 0), ).partitions_subset.get_partition_keys() == [ @@ -633,6 +683,7 @@ def test_downstream_partition_has_valid_upstream_partitions( allow_nonexistent_upstream_partitions=allow_nonexistent_upstream_partitions ).get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset=downstream_partitions_subset, + downstream_partitions_def=downstream_partitions_subset.partitions_def, upstream_partitions_def=upstream_partitions_def, current_time=current_time, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index 8c4756cc93c53..77667b0616a11 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -213,6 +213,7 @@ class TrailingWindowPartitionMapping(PartitionMapping): def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -223,7 +224,8 @@ def get_upstream_mapped_partitions_result_for_partitions( partition_keys = list(downstream_partitions_subset.get_partition_keys()) return UpstreamPartitionsResult( upstream_partitions_def.empty_subset().with_partition_key_range( - PartitionKeyRange(str(max(1, int(partition_keys[0]) - 1)), partition_keys[-1]) + upstream_partitions_def, + PartitionKeyRange(str(max(1, int(partition_keys[0]) - 1)), partition_keys[-1]), ), [], ) @@ -231,6 +233,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset: PartitionsSubset, + upstream_partitions_def: PartitionsDefinition, downstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -405,7 +408,8 @@ def include_all(asset_key, partitions_subset): asset_graph, partitions_subsets_by_asset_key={ asset3.key: asset3.partitions_def.empty_subset().with_partition_key_range( - PartitionKeyRange("2022-01-02-00:00", "2022-01-03-23:00") + asset3.partitions_def, + PartitionKeyRange("2022-01-02-00:00", "2022-01-03-23:00"), ), }, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index d0dac272893ee..5206a1833b270 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -71,10 +71,13 @@ def get_upstream_partitions_for_partition_range( upstream_partitions_subset = ( downstream_partition_mapping.get_upstream_mapped_partitions_result_for_partitions( downstream_partitions_subset, + downstream_partitions_def, upstream_partitions_def, ).partitions_subset ) - upstream_key_ranges = upstream_partitions_subset.get_partition_key_ranges() + upstream_key_ranges = upstream_partitions_subset.get_partition_key_ranges( + upstream_partitions_def + ) check.invariant(len(upstream_key_ranges) == 1) return upstream_key_ranges[0] @@ -101,10 +104,13 @@ def get_downstream_partitions_for_partition_range( downstream_partitions_subset = ( downstream_partition_mapping.get_downstream_partitions_for_partitions( upstream_partitions_subset, + upstream_partitions_def, downstream_assets_def.partitions_def, ) ) - downstream_key_ranges = downstream_partitions_subset.get_partition_key_ranges() + downstream_key_ranges = downstream_partitions_subset.get_partition_key_ranges( + downstream_assets_def.partitions_def + ) check.invariant(len(downstream_key_ranges) == 1) return downstream_key_ranges[0] diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index da95a13e60dbf..91f3a97061087 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -1389,9 +1389,11 @@ def foo_child(): ) assert asset_backfill_data.target_subset.partitions_subsets_by_asset_key == { - foo.key: foo_partitions_def.empty_subset().with_partition_key_range(partition_key_range), + foo.key: foo_partitions_def.empty_subset().with_partition_key_range( + foo_partitions_def, partition_key_range + ), foo_child.key: foo_partitions_def.empty_subset().with_partition_key_range( - partition_key_range + foo_partitions_def, partition_key_range ), } diff --git a/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py b/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py index 1399ed7a1d84d..d341aa18d2b53 100644 --- a/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py +++ b/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py @@ -142,10 +142,10 @@ def test_static_partitions_subset(): assert len(subset) == 0 assert "bar" not in subset with_some_partitions = subset.with_partition_keys(["foo", "bar"]) - assert with_some_partitions.get_partition_keys_not_in_subset() == {"baz", "qux"} + assert with_some_partitions.get_partition_keys_not_in_subset(partitions) == {"baz", "qux"} serialized = with_some_partitions.serialize() deserialized = partitions.deserialize_subset(serialized) - assert deserialized.get_partition_keys_not_in_subset() == {"baz", "qux"} + assert deserialized.get_partition_keys_not_in_subset(partitions) == {"baz", "qux"} assert len(with_some_partitions) == 2 assert len(deserialized) == 2 assert "bar" in with_some_partitions diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index 24562ae732a9e..84da523762b98 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -166,7 +166,10 @@ class AssetReconciliationScenario( ("cursor_from", Optional["AssetReconciliationScenario"]), ("current_time", Optional[datetime.datetime]), ("asset_selection", Optional[AssetSelection]), - ("active_backfill_targets", Optional[Sequence[Mapping[AssetKey, PartitionsSubset]]]), + ( + "active_backfill_targets", + Optional[Sequence[Union[Mapping[AssetKey, PartitionsSubset], Sequence[AssetKey]]]], + ), ("dagster_runs", Optional[Sequence[DagsterRun]]), ("event_log_entries", Optional[Sequence[EventLogEntry]]), ("expected_run_requests", Optional[Sequence[RunRequest]]), @@ -190,7 +193,9 @@ def __new__( cursor_from: Optional["AssetReconciliationScenario"] = None, current_time: Optional[datetime.datetime] = None, asset_selection: Optional[AssetSelection] = None, - active_backfill_targets: Optional[Sequence[Mapping[AssetKey, PartitionsSubset]]] = None, + active_backfill_targets: Optional[ + Sequence[Union[Mapping[AssetKey, PartitionsSubset], Sequence[AssetKey]]] + ] = None, dagster_runs: Optional[Sequence[DagsterRun]] = None, event_log_entries: Optional[Sequence[EventLogEntry]] = None, expected_run_requests: Optional[Sequence[RunRequest]] = None, @@ -300,11 +305,18 @@ def repo(): # add any backfills to the instance for i, target in enumerate(self.active_backfill_targets or []): - target_subset = AssetGraphSubset( - asset_graph=repo.asset_graph, - partitions_subsets_by_asset_key=target, - non_partitioned_asset_keys=set(), - ) + if isinstance(target, Mapping): + target_subset = AssetGraphSubset( + asset_graph=repo.asset_graph, + partitions_subsets_by_asset_key=target, + non_partitioned_asset_keys=set(), + ) + else: + target_subset = AssetGraphSubset( + asset_graph=repo.asset_graph, + partitions_subsets_by_asset_key={}, + non_partitioned_asset_keys=target, + ) empty_subset = AssetGraphSubset( asset_graph=repo.asset_graph, partitions_subsets_by_asset_key={}, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py index ec8bad1559a60..c3312a78b4c9e 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/partition_scenarios.py @@ -167,17 +167,7 @@ ], ), }, - { - AssetKey( - "non_existant_asset" # ignored since can't be loaded - ): TimeWindowPartitionsSubset( - hourly_partitions_def, num_partitions=None, included_time_windows=[] - ).with_partition_keys( - [ - "2013-01-05-00:00", - ], - ), - }, + [AssetKey("non_existant_asset")], # ignored since can't be loaded ], current_time=create_pendulum_time(year=2013, month=1, day=5, hour=17), expected_run_requests=[ diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_flatten_time_window_ranges.py b/python_modules/dagster/dagster_tests/definitions_tests/test_flatten_time_window_ranges.py index 0cd5a9c0057e2..5042929338d4b 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_flatten_time_window_ranges.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_flatten_time_window_ranges.py @@ -38,10 +38,10 @@ def test_no_overlap() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-06", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-06", "2022-01-06") ), }, [ @@ -57,13 +57,13 @@ def test_no_overlap() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-06", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-06", "2022-01-06") ), PartitionRangeStatus.MATERIALIZING: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-8", "2022-01-09") + partitions_def, PartitionKeyRange("2022-01-8", "2022-01-09") ), }, [ @@ -86,10 +86,10 @@ def test_overlapped() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-05", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -106,10 +106,10 @@ def test_overlapped() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-05", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -132,10 +132,10 @@ def test_materialized_spans_failed() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-10") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-10") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-05", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -159,12 +159,18 @@ def test_materialized_spans_many_failed() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-01", "2022-12-10") + partitions_def, PartitionKeyRange("2022-01-01", "2022-12-10") ), PartitionRangeStatus.FAILED: ( - empty_subset.with_partition_key_range(PartitionKeyRange("2022-01-05", "2022-01-06")) - .with_partition_key_range(PartitionKeyRange("2022-03-01", "2022-03-10")) - .with_partition_key_range(PartitionKeyRange("2022-09-01", "2022-10-01")) + empty_subset.with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-03-01", "2022-03-10") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-09-01", "2022-10-01") + ) ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -221,8 +227,10 @@ def test_empty() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-10") - ).with_partition_key_range(PartitionKeyRange("2022-01-20", "2022-02-10")), + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-10") + ).with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-20", "2022-02-10") + ), PartitionRangeStatus.FAILED: empty_subset, PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -244,8 +252,10 @@ def test_empty() -> None: { PartitionRangeStatus.MATERIALIZED: empty_subset, PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-10") - ).with_partition_key_range(PartitionKeyRange("2022-01-20", "2022-02-10")), + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-10") + ).with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-20", "2022-02-10") + ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, [ @@ -267,8 +277,10 @@ def test_empty() -> None: PartitionRangeStatus.MATERIALIZED: empty_subset, PartitionRangeStatus.FAILED: empty_subset, PartitionRangeStatus.MATERIALIZING: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-10") - ).with_partition_key_range(PartitionKeyRange("2022-01-20", "2022-02-10")), + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-10") + ).with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-20", "2022-02-10") + ), }, [ { @@ -289,10 +301,12 @@ def test_cancels_out() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") - ).with_partition_key_range(PartitionKeyRange("2022-01-12", "2022-01-13")), + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") + ).with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-12", "2022-01-13") + ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.MATERIALIZING: empty_subset, }, @@ -315,27 +329,37 @@ def test_lots() -> None: _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: ( - empty_subset.with_partition_key_range(PartitionKeyRange("2022-01-02", "2022-01-05")) - .with_partition_key_range(PartitionKeyRange("2022-01-12", "2022-01-13")) - .with_partition_key_range(PartitionKeyRange("2022-01-15", "2022-01-17")) - .with_partition_key_range(PartitionKeyRange("2022-01-19", "2022-01-20")) - .with_partition_key_range(PartitionKeyRange("2022-01-22", "2022-01-24")) + empty_subset.with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-12", "2022-01-13") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-15", "2022-01-17") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-19", "2022-01-20") + ) + .with_partition_key_range( + partitions_def, PartitionKeyRange("2022-01-22", "2022-01-24") + ) ), PartitionRangeStatus.FAILED: ( empty_subset.with_partition_key_range( - PartitionKeyRange("2021-12-30", "2021-12-31") + partitions_def, PartitionKeyRange("2021-12-30", "2021-12-31") ) # before materialized subset .with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-03") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-03") ) # within materialized subset .with_partition_key_range( - PartitionKeyRange("2022-01-05", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-05", "2022-01-06") ) # directly after materialized subset .with_partition_key_range( - PartitionKeyRange("2022-01-08", "2022-01-09") + partitions_def, PartitionKeyRange("2022-01-08", "2022-01-09") ) # between materialized subsets .with_partition_key_range( - PartitionKeyRange("2022-01-11", "2022-01-14") + partitions_def, PartitionKeyRange("2022-01-11", "2022-01-14") ) # encompasses materialized subset .with_partition_keys(["2022-01-20"]) # at end materialized subset .with_partition_keys( @@ -381,13 +405,13 @@ def test_multiple_overlap_types(): _check_flatten_time_window_ranges( { PartitionRangeStatus.MATERIALIZED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-01", "2022-01-06") + partitions_def, PartitionKeyRange("2022-01-01", "2022-01-06") ), PartitionRangeStatus.FAILED: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-02", "2022-01-05") + partitions_def, PartitionKeyRange("2022-01-02", "2022-01-05") ), PartitionRangeStatus.MATERIALIZING: empty_subset.with_partition_key_range( - PartitionKeyRange("2022-01-03", "2022-01-04") + partitions_def, PartitionKeyRange("2022-01-03", "2022-01-04") ), }, [ diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py index e36c500ec4a43..0ddbb8e329f6c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py @@ -320,9 +320,9 @@ def test_multipartitions_subset_addition(initial, added): assert added_subset.get_partition_keys(current_time=current_day) == set( added_subset_keys + initial_subset_keys ) - assert added_subset.get_partition_keys_not_in_subset(current_time=current_day) == set( - expected_keys_not_in_updated_subset - ) + assert added_subset.get_partition_keys_not_in_subset( + multipartitions_def, current_time=current_day + ) == set(expected_keys_not_in_updated_subset) def test_asset_partition_key_is_multipartition_key(): diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py index 61472baa999b6..a0ea9d3ce691f 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_time_window_partitions.py @@ -808,7 +808,8 @@ def test_partition_subset_get_partition_keys_not_in_subset( assert partition_key in subset assert ( subset.get_partition_keys_not_in_subset( - current_time=partitions_def.end_time_for_partition_key(full_set_keys[-1]) + partitions_def=partitions_def, + current_time=partitions_def.end_time_for_partition_key(full_set_keys[-1]), ) == expected_keys_not_in_subset ) @@ -946,7 +947,8 @@ def test_partition_subset_with_partition_keys( assert all(partition_key in updated_subset for partition_key in added_subset_keys) assert ( updated_subset.get_partition_keys_not_in_subset( - current_time=partitions_def.end_time_for_partition_key(full_set_keys[-1]) + partitions_def=partitions_def, + current_time=partitions_def.end_time_for_partition_key(full_set_keys[-1]), ) == expected_keys_not_in_updated_subset ) diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py index a0356e6a03eee..173c50405345b 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py @@ -295,6 +295,7 @@ class NoPartitionsPartitionMapping(PartitionMapping): def get_upstream_mapped_partitions_result_for_partitions( self, downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, @@ -304,6 +305,7 @@ def get_upstream_mapped_partitions_result_for_partitions( def get_downstream_partitions_for_partitions( self, upstream_partitions_subset, + upstream_partitions_def, downstream_partitions_def, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,