diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index e6d19be2610f6..fdc0dbba16d3b 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -50,14 +50,14 @@ class BulkActionsFilter: all values will be permitted for that field. Args: - status (Optional[BulkActionStatus]): A status to filter by. + statuses (Optional[Sequence[BulkActionStatus]]): A list of statuses to filter by. created_before (Optional[DateTime]): Filter by bulk actions that were created before this datetime. Note that the create_time for each bulk action is stored in UTC. created_after (Optional[DateTime]): Filter by bulk actions that were created after this datetime. Note that the create_time for each bulk action is stored in UTC. """ - status: Optional[BulkActionStatus] = None + statuses: Optional[Sequence[BulkActionStatus]] = None created_before: Optional[datetime] = None created_after: Optional[datetime] = None diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index 571bbbfdf5d78..4e0da3fcb0864 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -841,14 +841,16 @@ def get_backfills( ) -> Sequence[PartitionBackfill]: check.opt_inst_param(status, "status", BulkActionStatus) query = db_select([BulkActionsTable.c.body, BulkActionsTable.c.timestamp]) - if status or (filters and filters.status): - if status and filters and filters.status and status != filters.status: + if status or (filters and filters.statuses): + if status and filters and filters.statuses: raise DagsterInvariantViolationError( - "Conflicting status filters provided to get_backfills. Choose one of status or BulkActionsFilter.status." + "Conflicting status filters provided to get_backfills. Choose one of status or BulkActionsFilter.statuses." ) - status = status or (filters.status if filters else None) - assert status - query = query.where(BulkActionsTable.c.status == status.value) + statuses = [status] or (filters.statuses if filters else None) + assert statuses + query = query.where( + BulkActionsTable.c.status.in_([status.value for status in statuses]) + ) if cursor: cursor_query = db_select([BulkActionsTable.c.id]).where( BulkActionsTable.c.key == cursor diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py index 75f95ec862ca8..417276cc5f26e 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py @@ -1353,28 +1353,75 @@ def test_backfill_status_filtering(self, storage: RunStorage): ) storage.add_backfill(one) assert ( - len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED))) + len( + storage.get_backfills( + filters=BulkActionsFilter(statuses=[BulkActionStatus.REQUESTED]) + ) + ) == 1 ) assert ( - len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED))) + len( + storage.get_backfills( + filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED]) + ) + ) == 0 ) + assert ( + len( + storage.get_backfills( + filters=BulkActionsFilter( + statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED] + ) + ) + ) + == 1 + ) backfills = storage.get_backfills( - filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED) + filters=BulkActionsFilter(statuses=[BulkActionStatus.REQUESTED]) ) assert backfills[0] == one storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED)) assert ( - len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED))) + len( + storage.get_backfills( + filters=BulkActionsFilter(statuses=[BulkActionStatus.REQUESTED]) + ) + ) == 0 ) assert ( - len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED))) + len( + storage.get_backfills( + filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED]) + ) + ) == 1 ) + two = PartitionBackfill( + "two", + partition_set_origin=origin, + status=BulkActionStatus.REQUESTED, + partition_names=["a", "b", "c"], + from_failure=False, + tags={}, + backfill_timestamp=time.time(), + ) + storage.add_backfill(two) + assert ( + len( + storage.get_backfills( + filters=BulkActionsFilter( + statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED] + ) + ) + ) + == 2 + ) + def test_backfill_created_time_filtering(self, storage: RunStorage): origin = self.fake_partition_set_origin("fake_partition_set") backfills = storage.get_backfills()