Skip to content

Commit

Permalink
split branch
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Aug 15, 2024
1 parent ecb72d4 commit 89cfc2a
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 29 deletions.
9 changes: 9 additions & 0 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from enum import Enum
from typing import Mapping, NamedTuple, Optional, Sequence, Union

Expand All @@ -14,6 +15,7 @@
from dagster._core.remote_representation.origin import RemotePartitionSetOrigin
from dagster._core.storage.tags import USER_TAG
from dagster._core.workspace.workspace import IWorkspace
from dagster._record import record
from dagster._serdes import whitelist_for_serdes
from dagster._utils.error import SerializableErrorInfo

Expand All @@ -38,6 +40,13 @@ def from_graphql_input(graphql_str):
return BulkActionStatus(graphql_str)


@record
class BulkActionsFilter:
status: Optional[BulkActionStatus] = None
created_before: Optional[datetime] = None
created_after: Optional[datetime] = None


@whitelist_for_serdes
class PartitionBackfill(
NamedTuple(
Expand Down
16 changes: 7 additions & 9 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import weakref
from abc import abstractmethod
from collections import defaultdict
from datetime import datetime
from enum import Enum
from tempfile import TemporaryDirectory
from types import TracebackType
Expand Down Expand Up @@ -124,7 +123,11 @@
JobFailureData,
)
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import (
BulkActionsFilter,
BulkActionStatus,
PartitionBackfill,
)
from dagster._core.execution.plan.plan import ExecutionPlan
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.execution.stats import RunStepKeyStatsSnapshot
Expand Down Expand Up @@ -3077,15 +3080,10 @@ def get_backfills(
status: Optional["BulkActionStatus"] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
filters: Optional["BulkActionsFilter"] = None,
) -> Sequence["PartitionBackfill"]:
return self._run_storage.get_backfills(
status=status,
cursor=cursor,
limit=limit,
created_before=created_before,
created_after=created_after,
status=status, cursor=cursor, limit=limit, filters=filters
)

def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]:
Expand Down
14 changes: 7 additions & 7 deletions python_modules/dagster/dagster/_core/storage/legacy_storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from datetime import datetime
from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Set, Tuple, Union

from dagster import _check as check
Expand Down Expand Up @@ -34,7 +33,11 @@
from dagster._core.event_api import AssetRecordsFilter, RunStatusChangeRecordsFilter
from dagster._core.events import DagsterEvent, DagsterEventType
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import (
BulkActionsFilter,
BulkActionStatus,
PartitionBackfill,
)
from dagster._core.execution.stats import RunStepKeyStatsSnapshot
from dagster._core.instance import DagsterInstance
from dagster._core.remote_representation.origin import RemoteJobOrigin
Expand Down Expand Up @@ -313,12 +316,9 @@ def get_backfills(
status: Optional["BulkActionStatus"] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
filters: Optional["BulkActionsFilter"] = None,
) -> Sequence["PartitionBackfill"]:
return self._storage.run_storage.get_backfills(
status, cursor, limit, created_before=created_before, created_after=created_after
)
return self._storage.run_storage.get_backfills(status, cursor, limit, filters=filters)

def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]:
return self._storage.run_storage.get_backfill(backfill_id)
Expand Down
6 changes: 2 additions & 4 deletions python_modules/dagster/dagster/_core/storage/runs/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, Dict, Mapping, Optional, Sequence, Set, Tuple, Union

from typing_extensions import TypedDict

from dagster._core.events import DagsterEvent
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill
from dagster._core.execution.telemetry import RunTelemetryData
from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance
from dagster._core.snap import ExecutionPlanSnapshot, JobSnapshot
Expand Down Expand Up @@ -374,8 +373,7 @@ def get_backfills(
status: Optional[BulkActionStatus] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
filters: Optional[BulkActionsFilter] = None,
) -> Sequence[PartitionBackfill]:
"""Get a list of partition backfills."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
DagsterEventType,
RunFailureReason,
)
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill
from dagster._core.remote_representation.origin import RemoteJobOrigin
from dagster._core.snap import (
ExecutionPlanSnapshot,
Expand Down Expand Up @@ -837,22 +837,31 @@ def get_backfills(
status: Optional[BulkActionStatus] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
filters: Optional[BulkActionsFilter] = None,
) -> Sequence[PartitionBackfill]:
check.opt_inst_param(status, "status", BulkActionStatus)
query = db_select([BulkActionsTable.c.body, BulkActionsTable.c.timestamp])
if status:
query = query.where(BulkActionsTable.c.status == status.value)
if status or (filters and filters.status):
if status and filters and filters.status and status != filters.status:
raise DagsterInvariantViolationError(
"Conflicting status filters provided to get_backfills. Choose one of status or BulkActionsFilter.status."
)
status = status or (filters.status if filters else None)
if status: # should also be non-None at this point, but pyright must be appeased
query = query.where(BulkActionsTable.c.status == status.value)
if cursor:
cursor_query = db_select([BulkActionsTable.c.id]).where(
BulkActionsTable.c.key == cursor
)
query = query.where(BulkActionsTable.c.id < cursor_query)
if created_after:
query = query.where(BulkActionsTable.c.timestamp > created_after)
if created_before:
query = query.where(BulkActionsTable.c.timestamp < created_before.replace(tzinfo=None))
if filters and filters.created_after:
query = query.where(
BulkActionsTable.c.timestamp > filters.created_after.replace(tzinfo=None)
)
if filters and filters.created_before:
query = query.where(
BulkActionsTable.c.timestamp < filters.created_before.replace(tzinfo=None)
)
if limit:
query = query.limit(limit)
query = query.order_by(BulkActionsTable.c.id.desc())
Expand Down

0 comments on commit 89cfc2a

Please sign in to comment.