Skip to content

Commit

Permalink
move end_date tests to test_time_window_partitions (#17175)
Browse files Browse the repository at this point in the history
## Summary & Motivation

They don't have much to do with partition mapping, so it doesn't make
sense to include them in the partition mapping test file.

## How I Tested These Changes
  • Loading branch information
sryza authored Oct 12, 2023
1 parent d06a550 commit 7ee29e4
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from datetime import datetime, timezone
from typing import Optional, Sequence

import pendulum
import pytest
from dagster import (
DailyPartitionsDefinition,
HourlyPartitionsDefinition,
MonthlyPartitionsDefinition,
TimeWindow,
TimeWindowPartitionMapping,
TimeWindowPartitionsDefinition,
WeeklyPartitionsDefinition,
Expand Down Expand Up @@ -468,83 +466,6 @@ def test_get_upstream_with_current_time(
)


@pytest.mark.parametrize(
"partitions_def,first_partition_window,last_partition_window,number_of_partitions,fmt",
[
(
HourlyPartitionsDefinition(start_date="2022-01-01-00:00", end_date="2022-02-01-00:00"),
["2022-01-01-00:00", "2022-01-01-01:00"],
["2022-01-31-23:00", "2022-02-01-00:00"],
744,
"%Y-%m-%d-%H:%M",
),
(
DailyPartitionsDefinition(start_date="2022-01-01", end_date="2022-02-01"),
["2022-01-01", "2022-01-02"],
["2022-01-31", "2022-02-01"],
31,
"%Y-%m-%d",
),
(
WeeklyPartitionsDefinition(start_date="2022-01-01", end_date="2022-02-01"),
["2022-01-02", "2022-01-09"], # 2022-01-02 is Sunday, weekly cron starts from Sunday
["2022-01-23", "2022-01-30"],
4,
"%Y-%m-%d",
),
(
MonthlyPartitionsDefinition(start_date="2022-01-01", end_date="2023-01-01"),
["2022-01-01", "2022-02-01"],
["2022-12-01", "2023-01-01"],
12,
"%Y-%m-%d",
),
],
)
def test_partition_with_end_date(
partitions_def: TimeWindowPartitionsDefinition,
first_partition_window: Sequence[str],
last_partition_window: Sequence[str],
number_of_partitions: int,
fmt: str,
):
first_partition_window_ = TimeWindow(
start=pendulum.instance(datetime.strptime(first_partition_window[0], fmt), tz="UTC"),
end=pendulum.instance(datetime.strptime(first_partition_window[1], fmt), tz="UTC"),
)

last_partition_window_ = TimeWindow(
start=pendulum.instance(datetime.strptime(last_partition_window[0], fmt), tz="UTC"),
end=pendulum.instance(datetime.strptime(last_partition_window[1], fmt), tz="UTC"),
)

# get_last_partition_window
assert partitions_def.get_last_partition_window() == last_partition_window_
# get_next_partition_window
assert partitions_def.get_next_partition_window(partitions_def.start) == first_partition_window_
assert (
partitions_def.get_next_partition_window(last_partition_window_.start)
== last_partition_window_
)
assert not partitions_def.get_next_partition_window(last_partition_window_.end)
# get_partition_keys
assert len(partitions_def.get_partition_keys()) == number_of_partitions
assert partitions_def.get_partition_keys()[0] == first_partition_window[0]
assert partitions_def.get_partition_keys()[-1] == last_partition_window[0]
# get_next_partition_key
assert (
partitions_def.get_next_partition_key(first_partition_window[0])
== first_partition_window[1]
)
assert not partitions_def.get_next_partition_key(last_partition_window[0])
# get_last_partition_key
assert partitions_def.get_last_partition_key() == last_partition_window[0]
# has_partition_key
assert partitions_def.has_partition_key(first_partition_window[0])
assert partitions_def.has_partition_key(last_partition_window[0])
assert not partitions_def.has_partition_key(last_partition_window[1])


def test_different_start_time_partitions_defs():
jan_start = DailyPartitionsDefinition("2023-01-01")
feb_start = DailyPartitionsDefinition("2023-02-01")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1188,3 +1188,80 @@ def test_has_partition_key():
)
assert partitions_def.has_partition_key("2020-01-01")
assert partitions_def.has_partition_key("2020-03-15")


@pytest.mark.parametrize(
"partitions_def,first_partition_window,last_partition_window,number_of_partitions,fmt",
[
(
HourlyPartitionsDefinition(start_date="2022-01-01-00:00", end_date="2022-02-01-00:00"),
["2022-01-01-00:00", "2022-01-01-01:00"],
["2022-01-31-23:00", "2022-02-01-00:00"],
744,
"%Y-%m-%d-%H:%M",
),
(
DailyPartitionsDefinition(start_date="2022-01-01", end_date="2022-02-01"),
["2022-01-01", "2022-01-02"],
["2022-01-31", "2022-02-01"],
31,
"%Y-%m-%d",
),
(
WeeklyPartitionsDefinition(start_date="2022-01-01", end_date="2022-02-01"),
["2022-01-02", "2022-01-09"], # 2022-01-02 is Sunday, weekly cron starts from Sunday
["2022-01-23", "2022-01-30"],
4,
"%Y-%m-%d",
),
(
MonthlyPartitionsDefinition(start_date="2022-01-01", end_date="2023-01-01"),
["2022-01-01", "2022-02-01"],
["2022-12-01", "2023-01-01"],
12,
"%Y-%m-%d",
),
],
)
def test_partition_with_end_date(
partitions_def: TimeWindowPartitionsDefinition,
first_partition_window: Sequence[str],
last_partition_window: Sequence[str],
number_of_partitions: int,
fmt: str,
):
first_partition_window_ = TimeWindow(
start=pendulum.instance(datetime.strptime(first_partition_window[0], fmt), tz="UTC"),
end=pendulum.instance(datetime.strptime(first_partition_window[1], fmt), tz="UTC"),
)

last_partition_window_ = TimeWindow(
start=pendulum.instance(datetime.strptime(last_partition_window[0], fmt), tz="UTC"),
end=pendulum.instance(datetime.strptime(last_partition_window[1], fmt), tz="UTC"),
)

# get_last_partition_window
assert partitions_def.get_last_partition_window() == last_partition_window_
# get_next_partition_window
assert partitions_def.get_next_partition_window(partitions_def.start) == first_partition_window_
assert (
partitions_def.get_next_partition_window(last_partition_window_.start)
== last_partition_window_
)
assert not partitions_def.get_next_partition_window(last_partition_window_.end)
# get_partition_keys
assert len(partitions_def.get_partition_keys()) == number_of_partitions
assert partitions_def.get_partition_keys()[0] == first_partition_window[0]
assert partitions_def.get_partition_keys()[-1] == last_partition_window[0]
# get_next_partition_key
assert (
partitions_def.get_next_partition_key(first_partition_window[0])
== first_partition_window[1]
)
assert not partitions_def.get_next_partition_key(last_partition_window[0])
# get_last_partition_key
assert partitions_def.get_last_partition_key() == last_partition_window[0]
# has_partition_key
assert partitions_def.has_partition_key(first_partition_window[0])
assert partitions_def.has_partition_key(last_partition_window[0])
assert not partitions_def.has_partition_key(last_partition_window[1])

0 comments on commit 7ee29e4

Please sign in to comment.