Skip to content

Commit

Permalink
test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Aug 15, 2024
1 parent 89cfc2a commit d8b072b
Showing 1 changed file with 72 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
DagsterSnapshotDoesNotExist,
)
from dagster._core.events import DagsterEvent, DagsterEventType, JobFailureData, RunFailureReason
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill
from dagster._core.instance import DagsterInstance, InstanceType
from dagster._core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher
from dagster._core.remote_representation import (
Expand Down Expand Up @@ -1337,6 +1337,77 @@ def test_backfill(self, storage: RunStorage):
assert len(storage.get_backfills()) == 1
assert len(storage.get_backfills(status=BulkActionStatus.REQUESTED)) == 0

def test_backfill_status_filtering(self, storage: RunStorage):
origin = self.fake_partition_set_origin("fake_partition_set")
backfills = storage.get_backfills()
assert len(backfills) == 0

one = PartitionBackfill(
"one",
partition_set_origin=origin,
status=BulkActionStatus.REQUESTED,
partition_names=["a", "b", "c"],
from_failure=False,
tags={},
backfill_timestamp=time.time(),
)
storage.add_backfill(one)
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED)))
== 1
)
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED)))
== 0
)
backfill = storage.get_backfills(
filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED)
)
assert backfill == one

storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED))
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED)))
== 0
)
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED)))
== 1
)

def test_backfill_created_time_filtering(self, storage: RunStorage):
origin = self.fake_partition_set_origin("fake_partition_set")
backfills = storage.get_backfills()
assert len(backfills) == 0

all_backfills = []
for i in range(5):
backfill = PartitionBackfill(
f"backfill_{i}",
partition_set_origin=origin,
status=BulkActionStatus.REQUESTED,
partition_names=["a", "b", "c"],
from_failure=False,
tags={},
backfill_timestamp=time.time(),
)
storage.add_backfill(backfill)
all_backfills.append(backfill)

created_before = storage.get_backfills(
filters=BulkActionsFilter(created_before=all_backfills[3].backfill_timestamp)
)
assert len(created_before) == 2
for backfill in created_before:
assert backfill.backfill_timestamp < all_backfills[3].backfill_timestamp

created_after = storage.get_backfills(
filters=BulkActionsFilter(created_after=all_backfills[3].backfill_timestamp)
)
assert len(created_after) == 2
for backfill in created_after:
assert backfill.backfill_timestamp > all_backfills[3].backfill_timestamp

def test_secondary_index(self, storage):
self._skip_in_memory(storage)

Expand Down

0 comments on commit d8b072b

Please sign in to comment.