From e6c5ca9da96523e3131be04a830c487102db9e7b Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Wed, 15 Nov 2023 16:15:36 -0800 Subject: [PATCH] add test --- .../definitions/time_window_partitions.py | 2 +- .../test_time_window_partition_mapping.py | 8 +++---- .../partition_tests/test_partition.py | 10 +++++--- .../test_multi_partitions.py | 24 +++++++++++++++++++ 4 files changed, 36 insertions(+), 8 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index d73fd420f096a..5d4427b6a108c 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -1868,7 +1868,7 @@ def with_partitions_def( ) def __repr__(self) -> str: - return f"PartitionKeysTimeWindowPartitionsSubset({self.get_partition_key_ranges()})" + return f"PartitionKeysTimeWindowPartitionsSubset({self.get_partition_key_ranges(self.partitions_def)})" class TimeWindowPartitionsSubsetSerializer(NamedTupleSerializer): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py index 9cf05bd369d47..331c02cffe31b 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_time_window_partition_mapping.py @@ -745,13 +745,13 @@ def test_dst_transition_with_daily_partitions( subset = partitions_def.subset_with_partition_keys([partition_key]) upstream = time_partition_mapping.get_upstream_mapped_partitions_result_for_partitions( - subset, partitions_def, current_time=current_time + subset, partitions_def, partitions_def, current_time=current_time ) assert upstream.partitions_subset.get_partition_keys(current_time=current_time) == [ expected_upstream_partition_key ] downstream = time_partition_mapping.get_downstream_partitions_for_partitions( - subset, partitions_def, current_time=current_time + subset, partitions_def, partitions_def, current_time=current_time ) assert downstream.get_partition_keys(current_time=current_time) == [ expected_downstream_partition_key @@ -767,13 +767,13 @@ def test_mar_2024_dst_transition_with_hourly_partitions(): subset = partitions_def.subset_with_partition_keys(["2024-03-10-03:00"]) upstream = time_partition_mapping.get_upstream_mapped_partitions_result_for_partitions( - subset, partitions_def, current_time=current_time + subset, partitions_def, partitions_def, current_time=current_time ) assert upstream.partitions_subset.get_partition_keys(current_time=current_time) == [ "2024-03-10-01:00", ] downstream = time_partition_mapping.get_downstream_partitions_for_partitions( - subset, partitions_def, current_time=current_time + subset, partitions_def, partitions_def, current_time=current_time ) assert downstream.get_partition_keys(current_time=current_time) == [ "2024-03-10-04:00", diff --git a/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py b/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py index d341aa18d2b53..2e3600c688961 100644 --- a/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py +++ b/python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py @@ -16,6 +16,7 @@ ) from dagster._check import CheckError from dagster._core.test_utils import instance_for_test +from dagster._serdes import serialize_value @pytest.mark.parametrize( @@ -155,9 +156,12 @@ def test_static_partitions_subset_identical_serialization(): # serialized subsets should be equal if the original subsets are equal partitions = StaticPartitionsDefinition([str(i) for i in range(1000)]) subset = [str(i) for i in range(500)] - serialized1 = partitions.subset_with_partition_keys(subset).serialize() - serialized2 = partitions.subset_with_partition_keys(reversed(subset)).serialize() - assert serialized1 == serialized2 + + in_order_subset = partitions.subset_with_partition_keys(subset) + reverse_order_subset = partitions.subset_with_partition_keys(reversed(subset)) + + assert in_order_subset.serialize() == reverse_order_subset.serialize() + assert serialize_value(in_order_subset) == serialize_value(reverse_order_subset) def test_static_partitions_invalid_chars(): diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py b/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py index 0ddbb8e329f6c..0ad2d4cd7629d 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_multi_partitions.py @@ -630,3 +630,27 @@ def load_input(self, context): partition_key=second_partition_key, resources=resources, ) + + +def test_context_returns_multipartition_keys(): + partitions_def = MultiPartitionsDefinition( + {"a": StaticPartitionsDefinition(["a", "b"]), "1": StaticPartitionsDefinition(["1", "2"])} + ) + + @asset(partitions_def=partitions_def) + def upstream(context): + assert isinstance(context.partition_key, MultiPartitionKey) + + @asset(partitions_def=partitions_def) + def downstream(context, upstream): + assert isinstance(context.partition_key, MultiPartitionKey) + + input_range = context.asset_partition_key_range_for_input("upstream") + assert isinstance(input_range.start, MultiPartitionKey) + assert isinstance(input_range.end, MultiPartitionKey) + + output = context.asset_partition_key_range_for_output("result") + assert isinstance(output.start, MultiPartitionKey) + assert isinstance(output.end, MultiPartitionKey) + + materialize([upstream, downstream], partition_key="1|a")