From 89cfc2a4fda2af02c22c47fe786d6fa1f6de9245 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 15 Aug 2024 15:43:35 -0400 Subject: [PATCH] split branch --- .../dagster/_core/execution/backfill.py | 9 +++++++ .../dagster/_core/instance/__init__.py | 16 +++++------ .../dagster/_core/storage/legacy_storage.py | 14 +++++----- .../dagster/_core/storage/runs/base.py | 6 ++--- .../_core/storage/runs/sql_run_storage.py | 27 ++++++++++++------- 5 files changed, 43 insertions(+), 29 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index 10f9d62b6ad85..e74946ba2c8e9 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -1,3 +1,4 @@ +from datetime import datetime from enum import Enum from typing import Mapping, NamedTuple, Optional, Sequence, Union @@ -14,6 +15,7 @@ from dagster._core.remote_representation.origin import RemotePartitionSetOrigin from dagster._core.storage.tags import USER_TAG from dagster._core.workspace.workspace import IWorkspace +from dagster._record import record from dagster._serdes import whitelist_for_serdes from dagster._utils.error import SerializableErrorInfo @@ -38,6 +40,13 @@ def from_graphql_input(graphql_str): return BulkActionStatus(graphql_str) +@record +class BulkActionsFilter: + status: Optional[BulkActionStatus] = None + created_before: Optional[datetime] = None + created_after: Optional[datetime] = None + + @whitelist_for_serdes class PartitionBackfill( NamedTuple( diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 4a82ecd372215..1134a8fac584a 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -6,7 +6,6 @@ import weakref from abc import abstractmethod from collections import defaultdict -from datetime import datetime from enum import Enum from tempfile import TemporaryDirectory from types import TracebackType @@ -124,7 +123,11 @@ JobFailureData, ) from dagster._core.events.log import EventLogEntry - from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill + from dagster._core.execution.backfill import ( + BulkActionsFilter, + BulkActionStatus, + PartitionBackfill, + ) from dagster._core.execution.plan.plan import ExecutionPlan from dagster._core.execution.plan.resume_retry import ReexecutionStrategy from dagster._core.execution.stats import RunStepKeyStatsSnapshot @@ -3077,15 +3080,10 @@ def get_backfills( status: Optional["BulkActionStatus"] = None, cursor: Optional[str] = None, limit: Optional[int] = None, - created_before: Optional[datetime] = None, - created_after: Optional[datetime] = None, + filters: Optional["BulkActionsFilter"] = None, ) -> Sequence["PartitionBackfill"]: return self._run_storage.get_backfills( - status=status, - cursor=cursor, - limit=limit, - created_before=created_before, - created_after=created_after, + status=status, cursor=cursor, limit=limit, filters=filters ) def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]: diff --git a/python_modules/dagster/dagster/_core/storage/legacy_storage.py b/python_modules/dagster/dagster/_core/storage/legacy_storage.py index c18bb801878d3..5b3cdb4dfbbe0 100644 --- a/python_modules/dagster/dagster/_core/storage/legacy_storage.py +++ b/python_modules/dagster/dagster/_core/storage/legacy_storage.py @@ -1,4 +1,3 @@ -from datetime import datetime from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Set, Tuple, Union from dagster import _check as check @@ -34,7 +33,11 @@ from dagster._core.event_api import AssetRecordsFilter, RunStatusChangeRecordsFilter from dagster._core.events import DagsterEvent, DagsterEventType from dagster._core.events.log import EventLogEntry - from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill + from dagster._core.execution.backfill import ( + BulkActionsFilter, + BulkActionStatus, + PartitionBackfill, + ) from dagster._core.execution.stats import RunStepKeyStatsSnapshot from dagster._core.instance import DagsterInstance from dagster._core.remote_representation.origin import RemoteJobOrigin @@ -313,12 +316,9 @@ def get_backfills( status: Optional["BulkActionStatus"] = None, cursor: Optional[str] = None, limit: Optional[int] = None, - created_before: Optional[datetime] = None, - created_after: Optional[datetime] = None, + filters: Optional["BulkActionsFilter"] = None, ) -> Sequence["PartitionBackfill"]: - return self._storage.run_storage.get_backfills( - status, cursor, limit, created_before=created_before, created_after=created_after - ) + return self._storage.run_storage.get_backfills(status, cursor, limit, filters=filters) def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]: return self._storage.run_storage.get_backfill(backfill_id) diff --git a/python_modules/dagster/dagster/_core/storage/runs/base.py b/python_modules/dagster/dagster/_core/storage/runs/base.py index 3e72ede9f1827..1f9c35d9ba768 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/base.py +++ b/python_modules/dagster/dagster/_core/storage/runs/base.py @@ -1,11 +1,10 @@ from abc import ABC, abstractmethod -from datetime import datetime from typing import TYPE_CHECKING, Dict, Mapping, Optional, Sequence, Set, Tuple, Union from typing_extensions import TypedDict from dagster._core.events import DagsterEvent -from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill from dagster._core.execution.telemetry import RunTelemetryData from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance from dagster._core.snap import ExecutionPlanSnapshot, JobSnapshot @@ -374,8 +373,7 @@ def get_backfills( status: Optional[BulkActionStatus] = None, cursor: Optional[str] = None, limit: Optional[int] = None, - created_before: Optional[datetime] = None, - created_after: Optional[datetime] = None, + filters: Optional[BulkActionsFilter] = None, ) -> Sequence[PartitionBackfill]: """Get a list of partition backfills.""" diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index 2e0a996028a22..70e7754d51199 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -38,7 +38,7 @@ DagsterEventType, RunFailureReason, ) -from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill from dagster._core.remote_representation.origin import RemoteJobOrigin from dagster._core.snap import ( ExecutionPlanSnapshot, @@ -837,22 +837,31 @@ def get_backfills( status: Optional[BulkActionStatus] = None, cursor: Optional[str] = None, limit: Optional[int] = None, - created_before: Optional[datetime] = None, - created_after: Optional[datetime] = None, + filters: Optional[BulkActionsFilter] = None, ) -> Sequence[PartitionBackfill]: check.opt_inst_param(status, "status", BulkActionStatus) query = db_select([BulkActionsTable.c.body, BulkActionsTable.c.timestamp]) - if status: - query = query.where(BulkActionsTable.c.status == status.value) + if status or (filters and filters.status): + if status and filters and filters.status and status != filters.status: + raise DagsterInvariantViolationError( + "Conflicting status filters provided to get_backfills. Choose one of status or BulkActionsFilter.status." + ) + status = status or (filters.status if filters else None) + if status: # should also be non-None at this point, but pyright must be appeased + query = query.where(BulkActionsTable.c.status == status.value) if cursor: cursor_query = db_select([BulkActionsTable.c.id]).where( BulkActionsTable.c.key == cursor ) query = query.where(BulkActionsTable.c.id < cursor_query) - if created_after: - query = query.where(BulkActionsTable.c.timestamp > created_after) - if created_before: - query = query.where(BulkActionsTable.c.timestamp < created_before.replace(tzinfo=None)) + if filters and filters.created_after: + query = query.where( + BulkActionsTable.c.timestamp > filters.created_after.replace(tzinfo=None) + ) + if filters and filters.created_before: + query = query.where( + BulkActionsTable.c.timestamp < filters.created_before.replace(tzinfo=None) + ) if limit: query = query.limit(limit) query = query.order_by(BulkActionsTable.c.id.desc())