From d8b072b0e64d74d31862408e022167b56cf5626e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 15 Aug 2024 15:53:52 -0400 Subject: [PATCH] test suite --- .../storage_tests/utils/run_storage.py | 73 ++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py index 43622082b016a..acfc329cd117e 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py @@ -14,7 +14,7 @@ DagsterSnapshotDoesNotExist, ) from dagster._core.events import DagsterEvent, DagsterEventType, JobFailureData, RunFailureReason -from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill from dagster._core.instance import DagsterInstance, InstanceType from dagster._core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher from dagster._core.remote_representation import ( @@ -1337,6 +1337,77 @@ def test_backfill(self, storage: RunStorage): assert len(storage.get_backfills()) == 1 assert len(storage.get_backfills(status=BulkActionStatus.REQUESTED)) == 0 + def test_backfill_status_filtering(self, storage: RunStorage): + origin = self.fake_partition_set_origin("fake_partition_set") + backfills = storage.get_backfills() + assert len(backfills) == 0 + + one = PartitionBackfill( + "one", + partition_set_origin=origin, + status=BulkActionStatus.REQUESTED, + partition_names=["a", "b", "c"], + from_failure=False, + tags={}, + backfill_timestamp=time.time(), + ) + storage.add_backfill(one) + assert ( + len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED))) + == 1 + ) + assert ( + len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED))) + == 0 + ) + backfill = storage.get_backfills( + filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED) + ) + assert backfill == one + + storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED)) + assert ( + len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED))) + == 0 + ) + assert ( + len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED))) + == 1 + ) + + def test_backfill_created_time_filtering(self, storage: RunStorage): + origin = self.fake_partition_set_origin("fake_partition_set") + backfills = storage.get_backfills() + assert len(backfills) == 0 + + all_backfills = [] + for i in range(5): + backfill = PartitionBackfill( + f"backfill_{i}", + partition_set_origin=origin, + status=BulkActionStatus.REQUESTED, + partition_names=["a", "b", "c"], + from_failure=False, + tags={}, + backfill_timestamp=time.time(), + ) + storage.add_backfill(backfill) + all_backfills.append(backfill) + + created_before = storage.get_backfills( + filters=BulkActionsFilter(created_before=all_backfills[3].backfill_timestamp) + ) + assert len(created_before) == 2 + for backfill in created_before: + assert backfill.backfill_timestamp < all_backfills[3].backfill_timestamp + + created_after = storage.get_backfills( + filters=BulkActionsFilter(created_after=all_backfills[3].backfill_timestamp) + ) + assert len(created_after) == 2 + for backfill in created_after: + assert backfill.backfill_timestamp > all_backfills[3].backfill_timestamp + def test_secondary_index(self, storage): self._skip_in_memory(storage)