Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 16, 2023
1 parent a79cfdd commit 3195b7e
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1979,7 +1979,7 @@ def with_partitions_def(
)

def __repr__(self) -> str:
return f"PartitionKeysTimeWindowPartitionsSubset({self.get_partition_key_ranges()})"
return f"PartitionKeysTimeWindowPartitionsSubset({self.get_partition_key_ranges(self.partitions_def)})"


class TimeWindowPartitionsSubsetSerializer(NamedTupleSerializer):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ def asset_partitions_with_newly_updated_parents(
child_partitions_subset = (
partition_mapping.get_downstream_partitions_for_partitions(
parent_partitions_subset,
upstream_partitions_def=parent_partitions_def,
downstream_partitions_def=child_partitions_def,
dynamic_partitions_store=self,
current_time=self.evaluation_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,13 +745,13 @@ def test_dst_transition_with_daily_partitions(

subset = partitions_def.subset_with_partition_keys([partition_key])
upstream = time_partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
subset, partitions_def, current_time=current_time
subset, partitions_def, partitions_def, current_time=current_time
)
assert upstream.partitions_subset.get_partition_keys(current_time=current_time) == [
expected_upstream_partition_key
]
downstream = time_partition_mapping.get_downstream_partitions_for_partitions(
subset, partitions_def, current_time=current_time
subset, partitions_def, partitions_def, current_time=current_time
)
assert downstream.get_partition_keys(current_time=current_time) == [
expected_downstream_partition_key
Expand All @@ -767,13 +767,13 @@ def test_mar_2024_dst_transition_with_hourly_partitions():

subset = partitions_def.subset_with_partition_keys(["2024-03-10-03:00"])
upstream = time_partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
subset, partitions_def, current_time=current_time
subset, partitions_def, partitions_def, current_time=current_time
)
assert upstream.partitions_subset.get_partition_keys(current_time=current_time) == [
"2024-03-10-01:00",
]
downstream = time_partition_mapping.get_downstream_partitions_for_partitions(
subset, partitions_def, current_time=current_time
subset, partitions_def, partitions_def, current_time=current_time
)
assert downstream.get_partition_keys(current_time=current_time) == [
"2024-03-10-04:00",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from dagster._check import CheckError
from dagster._core.test_utils import instance_for_test
from dagster._serdes import serialize_value


@pytest.mark.parametrize(
Expand Down Expand Up @@ -155,9 +156,12 @@ def test_static_partitions_subset_identical_serialization():
# serialized subsets should be equal if the original subsets are equal
partitions = StaticPartitionsDefinition([str(i) for i in range(1000)])
subset = [str(i) for i in range(500)]
serialized1 = partitions.subset_with_partition_keys(subset).serialize()
serialized2 = partitions.subset_with_partition_keys(reversed(subset)).serialize()
assert serialized1 == serialized2

in_order_subset = partitions.subset_with_partition_keys(subset)
reverse_order_subset = partitions.subset_with_partition_keys(reversed(subset))

assert in_order_subset.serialize() == reverse_order_subset.serialize()
assert serialize_value(in_order_subset) == serialize_value(reverse_order_subset)


def test_static_partitions_invalid_chars():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,3 +630,27 @@ def load_input(self, context):
partition_key=second_partition_key,
resources=resources,
)


def test_context_returns_multipartition_keys():
partitions_def = MultiPartitionsDefinition(
{"a": StaticPartitionsDefinition(["a", "b"]), "1": StaticPartitionsDefinition(["1", "2"])}
)

@asset(partitions_def=partitions_def)
def upstream(context):
assert isinstance(context.partition_key, MultiPartitionKey)

@asset(partitions_def=partitions_def)
def downstream(context, upstream):
assert isinstance(context.partition_key, MultiPartitionKey)

input_range = context.asset_partition_key_range_for_input("upstream")
assert isinstance(input_range.start, MultiPartitionKey)
assert isinstance(input_range.end, MultiPartitionKey)

output = context.asset_partition_key_range_for_output("result")
assert isinstance(output.start, MultiPartitionKey)
assert isinstance(output.end, MultiPartitionKey)

materialize([upstream, downstream], partition_key="1|a")
Original file line number Diff line number Diff line change
Expand Up @@ -1049,9 +1049,11 @@ def test_dst_transition_15_minute_partitions() -> None:
"2020-11-01-02:00",
"2020-11-01-02:15",
}
assert subset.get_partition_keys_not_in_subset() == []
assert subset.get_partition_keys_not_in_subset(partitions_def) == []
assert (
partitions_def.deserialize_subset(subset.serialize()).get_partition_keys_not_in_subset()
partitions_def.deserialize_subset(subset.serialize()).get_partition_keys_not_in_subset(
partitions_def
)
== []
)

Expand All @@ -1070,9 +1072,11 @@ def test_dst_transition_hourly_partitions() -> None:
"2020-11-01-03:00",
"2020-11-01-04:00",
}
assert subset.get_partition_keys_not_in_subset() == []
assert subset.get_partition_keys_not_in_subset(partitions_def) == []
assert (
partitions_def.deserialize_subset(subset.serialize()).get_partition_keys_not_in_subset()
partitions_def.deserialize_subset(subset.serialize()).get_partition_keys_not_in_subset(
partitions_def
)
== []
)

Expand All @@ -1094,9 +1098,11 @@ def test_dst_transition_hourly_partitions_with_utc_offset() -> None:
"2020-11-01-03:00:00-0800",
"2020-11-01-04:00:00-0800",
}
assert subset.get_partition_keys_not_in_subset() == []
assert subset.get_partition_keys_not_in_subset(partitions_def) == []
assert (
partitions_def.deserialize_subset(subset.serialize()).get_partition_keys_not_in_subset()
partitions_def.deserialize_subset(subset.serialize()).get_partition_keys_not_in_subset(
partitions_def
)
== []
)

Expand All @@ -1116,9 +1122,11 @@ def test_dst_transition_daily_partitions() -> None:
"2020-11-01-01:00",
"2020-11-02-01:00",
}
assert subset.get_partition_keys_not_in_subset() == []
assert subset.get_partition_keys_not_in_subset(partitions_def) == []
assert (
partitions_def.deserialize_subset(subset.serialize()).get_partition_keys_not_in_subset()
partitions_def.deserialize_subset(subset.serialize()).get_partition_keys_not_in_subset(
partitions_def
)
== []
)

Expand Down

0 comments on commit 3195b7e

Please sign in to comment.