Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 16, 2023
1 parent 8b656d4 commit 78a91dd
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,7 @@ def with_partitions_def(
)

def __repr__(self) -> str:
return f"PartitionKeysTimeWindowPartitionsSubset({self.get_partition_key_ranges()})"
return f"PartitionKeysTimeWindowPartitionsSubset({self.get_partition_key_ranges(self.partitions_def)})"


class TimeWindowPartitionsSubsetSerializer(NamedTupleSerializer):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,13 +745,13 @@ def test_dst_transition_with_daily_partitions(

subset = partitions_def.subset_with_partition_keys([partition_key])
upstream = time_partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
subset, partitions_def, current_time=current_time
subset, partitions_def, partitions_def, current_time=current_time
)
assert upstream.partitions_subset.get_partition_keys(current_time=current_time) == [
expected_upstream_partition_key
]
downstream = time_partition_mapping.get_downstream_partitions_for_partitions(
subset, partitions_def, current_time=current_time
subset, partitions_def, partitions_def, current_time=current_time
)
assert downstream.get_partition_keys(current_time=current_time) == [
expected_downstream_partition_key
Expand All @@ -767,13 +767,13 @@ def test_mar_2024_dst_transition_with_hourly_partitions():

subset = partitions_def.subset_with_partition_keys(["2024-03-10-03:00"])
upstream = time_partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
subset, partitions_def, current_time=current_time
subset, partitions_def, partitions_def, current_time=current_time
)
assert upstream.partitions_subset.get_partition_keys(current_time=current_time) == [
"2024-03-10-01:00",
]
downstream = time_partition_mapping.get_downstream_partitions_for_partitions(
subset, partitions_def, current_time=current_time
subset, partitions_def, partitions_def, current_time=current_time
)
assert downstream.get_partition_keys(current_time=current_time) == [
"2024-03-10-04:00",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from dagster._check import CheckError
from dagster._core.test_utils import instance_for_test
from dagster._serdes import serialize_value


@pytest.mark.parametrize(
Expand Down Expand Up @@ -155,9 +156,12 @@ def test_static_partitions_subset_identical_serialization():
# serialized subsets should be equal if the original subsets are equal
partitions = StaticPartitionsDefinition([str(i) for i in range(1000)])
subset = [str(i) for i in range(500)]
serialized1 = partitions.subset_with_partition_keys(subset).serialize()
serialized2 = partitions.subset_with_partition_keys(reversed(subset)).serialize()
assert serialized1 == serialized2

in_order_subset = partitions.subset_with_partition_keys(subset)
reverse_order_subset = partitions.subset_with_partition_keys(reversed(subset))

assert in_order_subset.serialize() == reverse_order_subset.serialize()
assert serialize_value(in_order_subset) == serialize_value(reverse_order_subset)


def test_static_partitions_invalid_chars():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,3 +630,27 @@ def load_input(self, context):
partition_key=second_partition_key,
resources=resources,
)


def test_context_returns_multipartition_keys():
partitions_def = MultiPartitionsDefinition(
{"a": StaticPartitionsDefinition(["a", "b"]), "1": StaticPartitionsDefinition(["1", "2"])}
)

@asset(partitions_def=partitions_def)
def upstream(context):
assert isinstance(context.partition_key, MultiPartitionKey)

@asset(partitions_def=partitions_def)
def downstream(context, upstream):
assert isinstance(context.partition_key, MultiPartitionKey)

input_range = context.asset_partition_key_range_for_input("upstream")
assert isinstance(input_range.start, MultiPartitionKey)
assert isinstance(input_range.end, MultiPartitionKey)

output = context.asset_partition_key_range_for_output("result")
assert isinstance(output.start, MultiPartitionKey)
assert isinstance(output.end, MultiPartitionKey)

materialize([upstream, downstream], partition_key="1|a")

0 comments on commit 78a91dd

Please sign in to comment.