Skip to content

Commit

Permalink
filter by multiple statuses in BulkActionsFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Aug 22, 2024
1 parent c7ee34c commit c87e7f2
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 13 deletions.
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ class BulkActionsFilter:
all values will be permitted for that field.
Args:
status (Optional[BulkActionStatus]): A status to filter by.
statuses (Optional[Sequence[BulkActionStatus]]): A list of statuses to filter by.
created_before (Optional[DateTime]): Filter by bulk actions that were created before this datetime. Note that the
create_time for each bulk action is stored in UTC.
created_after (Optional[DateTime]): Filter by bulk actions that were created after this datetime. Note that the
create_time for each bulk action is stored in UTC.
"""

status: Optional[BulkActionStatus] = None
statuses: Optional[Sequence[BulkActionStatus]] = None
created_before: Optional[datetime] = None
created_after: Optional[datetime] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,14 +841,16 @@ def get_backfills(
) -> Sequence[PartitionBackfill]:
check.opt_inst_param(status, "status", BulkActionStatus)
query = db_select([BulkActionsTable.c.body, BulkActionsTable.c.timestamp])
if status or (filters and filters.status):
if status and filters and filters.status and status != filters.status:
if status or (filters and filters.statuses):
if status and filters and filters.statuses:
raise DagsterInvariantViolationError(
"Conflicting status filters provided to get_backfills. Choose one of status or BulkActionsFilter.status."
"Conflicting status filters provided to get_backfills. Choose one of status or BulkActionsFilter.statuses."
)
status = status or (filters.status if filters else None)
assert status
query = query.where(BulkActionsTable.c.status == status.value)
statuses = [status] or (filters.statuses if filters else None)
assert statuses
query = query.where(
BulkActionsTable.c.status.in_([status.value for status in statuses])
)
if cursor:
cursor_query = db_select([BulkActionsTable.c.id]).where(
BulkActionsTable.c.key == cursor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,28 +1353,75 @@ def test_backfill_status_filtering(self, storage: RunStorage):
)
storage.add_backfill(one)
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED)))
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.REQUESTED])
)
)
== 1
)
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED)))
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED])
)
)
== 0
)
assert (
len(
storage.get_backfills(
filters=BulkActionsFilter(
statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED]
)
)
)
== 1
)
backfills = storage.get_backfills(
filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED)
filters=BulkActionsFilter(statuses=[BulkActionStatus.REQUESTED])
)
assert backfills[0] == one

storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED))
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED)))
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.REQUESTED])
)
)
== 0
)
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED)))
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED])
)
)
== 1
)

two = PartitionBackfill(
"two",
partition_set_origin=origin,
status=BulkActionStatus.REQUESTED,
partition_names=["a", "b", "c"],
from_failure=False,
tags={},
backfill_timestamp=time.time(),
)
storage.add_backfill(two)
assert (
len(
storage.get_backfills(
filters=BulkActionsFilter(
statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED]
)
)
)
== 2
)

def test_backfill_created_time_filtering(self, storage: RunStorage):
origin = self.fake_partition_set_origin("fake_partition_set")
backfills = storage.get_backfills()
Expand Down

0 comments on commit c87e7f2

Please sign in to comment.