From ba9202d6c1a6995852be17a9537056638bd9ee0d Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 14 Aug 2024 14:50:56 -0400 Subject: [PATCH] filters all the way through to gql layer --- .../ui-core/src/graphql/schema.graphql | 23 +++--- .../packages/ui-core/src/graphql/types.ts | 22 ++++++ .../implementation/fetch_backfills.py | 5 +- .../implementation/fetch_runs.py | 3 +- .../dagster_graphql/schema/inputs.py | 29 ++++++++ .../dagster_graphql/schema/roots/query.py | 4 ++ .../graphql/test_asset_backfill.py | 71 ++++++++++++++++++- 7 files changed, 145 insertions(+), 12 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index aa7daa1095141..3db7c425849bc 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -2371,6 +2371,20 @@ enum AssetEventType { ASSET_OBSERVATION } +input BulkActionsFilter { + status: [BulkActionStatus!] + createdBefore: Float + createdAfter: Float +} + +enum BulkActionStatus { + REQUESTED + COMPLETED + FAILED + CANCELED + CANCELING +} + type DaemonHealth { id: String! daemonStatus(daemonType: String): DaemonStatus! @@ -2807,14 +2821,6 @@ type PartitionBackfill implements RunsFeedEntry { logEvents(cursor: String): InstigationEventConnection! } -enum BulkActionStatus { - REQUESTED - COMPLETED - FAILED - CANCELED - CANCELING -} - type AssetBackfillTargetPartitions { ranges: [PartitionKeyRange!] partitionKeys: [String!] @@ -3269,6 +3275,7 @@ type Query { status: BulkActionStatus cursor: String limit: Int + filters: BulkActionsFilter ): PartitionBackfillsOrError! permissions: [Permission!]! canBulkTerminate: Boolean! diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 21e30296eeef2..8a0ef3549d47d 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -730,6 +730,12 @@ export enum BulkActionStatus { REQUESTED = 'REQUESTED', } +export type BulkActionsFilter = { + createdAfter?: InputMaybe; + createdBefore?: InputMaybe; + status?: InputMaybe>; +}; + export type CancelBackfillResult = CancelBackfillSuccess | PythonError | UnauthorizedError; export type CancelBackfillSuccess = { @@ -3841,6 +3847,7 @@ export type QueryPartitionBackfillOrErrorArgs = { export type QueryPartitionBackfillsOrErrorArgs = { cursor?: InputMaybe; + filters?: InputMaybe; limit?: InputMaybe; status?: InputMaybe; }; @@ -6930,6 +6937,21 @@ export const buildBoolMetadataEntry = ( }; }; +export const buildBulkActionsFilter = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): BulkActionsFilter => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('BulkActionsFilter'); + return { + createdAfter: + overrides && overrides.hasOwnProperty('createdAfter') ? overrides.createdAfter! : 6.09, + createdBefore: + overrides && overrides.hasOwnProperty('createdBefore') ? overrides.createdBefore! : 1.5, + status: overrides && overrides.hasOwnProperty('status') ? overrides.status! : [], + }; +}; + export const buildCancelBackfillSuccess = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_backfills.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_backfills.py index 5bf37b54ac3c6..8e750639e5f92 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_backfills.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_backfills.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING, Optional -from dagster._core.execution.backfill import BulkActionStatus +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus if TYPE_CHECKING: from dagster_graphql.schema.util import ResolveInfo @@ -23,11 +23,12 @@ def get_backfills( status: Optional[BulkActionStatus] = None, cursor: Optional[str] = None, limit: Optional[int] = None, + filters: Optional[BulkActionsFilter] = None, ) -> "GraphenePartitionBackfills": from ..schema.backfill import GraphenePartitionBackfill, GraphenePartitionBackfills backfills = graphene_info.context.instance.get_backfills( - status=status, cursor=cursor, limit=limit + status=status, cursor=cursor, limit=limit, filters=filters ) return GraphenePartitionBackfills( results=[GraphenePartitionBackfill(backfill) for backfill in backfills] diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index cdbb3ddf481e8..20bf56921101b 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -19,6 +19,7 @@ ) from dagster._core.definitions.selector import JobSubsetSelector from dagster._core.errors import DagsterInvariantViolationError, DagsterRunNotFoundError +from dagster._core.execution.backfill import BulkActionsFilter from dagster._core.instance import DagsterInstance from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter from dagster._core.storage.event_log.base import AssetRecord @@ -490,7 +491,7 @@ def get_runs_feed_entries( for backfill in instance.get_backfills( cursor=runs_feed_cursor.backfill_cursor, limit=limit, - created_before=created_before_cursor, + filters=BulkActionsFilter(created_before=created_before_cursor), ) ] runs = [ diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py index c7fdf5dbcbfa1..2c4947beecbc9 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py @@ -1,5 +1,6 @@ import graphene from dagster._core.events import DagsterEventType +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter from dagster._time import datetime_from_timestamp from dagster._utils import check @@ -373,6 +374,33 @@ class Meta: name = "TagInput" +class GrapheneBulkActionsFilter(graphene.InputObjectType): + status = graphene.List( + graphene.NonNull("dagster_graphql.schema.backfill.GrapheneBulkActionStatus") + ) + createdBefore = graphene.InputField(graphene.Float) + createdAfter = graphene.InputField(graphene.Float) + + class Meta: + description = """This type represents a filter on Dagster Bulk Actions (backfills).""" + name = "BulkActionsFilter" + + def to_selector(self): + if self.status: + status = BulkActionStatus.from_graphql_input(self.status) + else: + status = None + + created_before = datetime_from_timestamp(self.createdBefore) if self.createdBefore else None + created_after = datetime_from_timestamp(self.createdAfter) if self.createdAfter else None + + return BulkActionsFilter( + status=status, + created_before=created_before, + created_after=created_after, + ) + + types = [ GrapheneAssetKeyInput, GrapheneExecutionMetadata, @@ -394,4 +422,5 @@ class Meta: GrapheneStepOutputHandle, GrapheneTagInput, GrapheneReportRunlessAssetEventsParams, + GrapheneBulkActionsFilter, ] diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index 4fc6c8fa70d3e..ac69793e033ba 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -127,6 +127,7 @@ GrapheneAssetBackfillPreviewParams, GrapheneAssetGroupSelector, GrapheneAssetKeyInput, + GrapheneBulkActionsFilter, GrapheneGraphSelector, GrapheneInstigationSelector, GraphenePipelineSelector, @@ -474,6 +475,7 @@ class Meta: status=graphene.Argument(GrapheneBulkActionStatus), cursor=graphene.String(), limit=graphene.Int(), + filters=graphene.Argument(GrapheneBulkActionsFilter), description="Retrieve backfills after applying a status filter, cursor, and limit.", ) @@ -1105,12 +1107,14 @@ def resolve_partitionBackfillsOrError( status: Optional[GrapheneBulkActionStatus] = None, cursor: Optional[str] = None, limit: Optional[int] = None, + filters: Optional[GrapheneBulkActionsFilter] = None, ): return get_backfills( graphene_info, status=BulkActionStatus.from_graphql_input(status) if status else None, cursor=cursor, limit=limit, + filters=filters.to_selector() if filters else None, ) def resolve_assetBackfillPreview( diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py index f6435026bed67..aaf5014daf5ec 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py @@ -129,6 +129,20 @@ } """ +BACKFILLS_WITH_FILTERS_QUERY = """ + query BackfillsWithFiltersQuery($filters: BulkActionsFilter) { + partitionBackfillsOrError(filters: $filters) { + ... on PartitionBackfills { + results { + id + timestamp + status + } + } + } + } +""" + def get_repo() -> RepositoryDefinition: partitions_def = StaticPartitionsDefinition(["a", "b", "c"]) @@ -1056,7 +1070,9 @@ def test_asset_backfill_error_raised_upon_invalid_params_provided(): def _get_backfill_data( - launch_backfill_result: GqlResult, instance: DagsterInstance, repo + launch_backfill_result: GqlResult, + instance: DagsterInstance, + repo, ) -> Tuple[str, AssetBackfillData]: assert launch_backfill_result assert launch_backfill_result.data @@ -1131,3 +1147,56 @@ def override_backfill_storage_setting(self): ) assert len(backfill_logs.data["partitionBackfillOrError"]["logEvents"]["events"]) > 0 + + +def test_get_backfills_with_filters(): + repo = get_repo() + all_asset_keys = repo.asset_graph.materializable_asset_keys + + with instance_for_test() as instance: + with define_out_of_process_context(__file__, "get_repo", instance) as context: + # launchPartitionBackfill + all_backfills = [] + for _ in range(5): + launch_backfill_result = execute_dagster_graphql( + context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": ["a", "b"], + "assetSelection": [key.to_graphql_input() for key in all_asset_keys], + } + }, + ) + assert ( + "backfillId" in launch_backfill_result.data["launchPartitionBackfill"] + ), _get_error_message(launch_backfill_result) + + backfill_id = launch_backfill_result.data["launchPartitionBackfill"]["backfillId"] + backfill = instance.get_backfill(backfill_id) + all_backfills.append(backfill) + + # on PartitionBackfills + get_backfills_result = execute_dagster_graphql( + context, + BACKFILLS_WITH_FILTERS_QUERY, + variables={"filters": {"createdBefore": all_backfills[3].backfill_timestamp}}, + ) + assert not get_backfills_result.errors + assert get_backfills_result.data + backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"] + + for result in backfill_results: + assert result["timestamp"] < all_backfills[3].backfill_timestamp + + get_backfills_result = execute_dagster_graphql( + context, + BACKFILLS_WITH_FILTERS_QUERY, + variables={"filters": {"createdAfter": all_backfills[3].backfill_timestamp}}, + ) + assert not get_backfills_result.errors + assert get_backfills_result.data + backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"] + + for result in backfill_results: + assert result["timestamp"] > all_backfills[3].backfill_timestamp