Skip to content

Commit

Permalink
[graphql] Resolver to terminate runs (#16707)
Browse files Browse the repository at this point in the history
This PR adds a resolver to terminate a set of runs, intended to support
a "terminate all queued runs" button. Under the hood, it calls
`instance.optimize_for_webserver` to pool connections and reduce query
time. The implementation in this PR takes 3.5 min to terminate 5K runs,
compared to ~15 min without the `optimize_for_webserver` call.

The resolver accepts a list of run IDs. This is intended so that the
front end can call `instance.get_run_ids(cursor=..., limit=...)` to
fetch run IDs in batches, then pass this to the resolver. This enables
displaying a progress bar.
  • Loading branch information
clairelin135 authored Sep 28, 2023
1 parent 79e0e0c commit c9cbe64
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 27 deletions.

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 40 additions & 2 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions js_modules/dagster-ui/packages/ui-core/src/runs/RunUtils.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,6 @@ export const TERMINATE_MUTATION = gql`
canTerminate
}
}
... on UnauthorizedError {
message
}
...PythonErrorFragment
}
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from dagster._annotations import deprecated
from dagster._core.definitions.events import AssetKey
from dagster._core.events import EngineEventData
from dagster._core.instance import DagsterInstance
from dagster._core.instance import (
DagsterInstance,
)
from dagster._core.storage.captured_log_manager import CapturedLogManager
from dagster._core.storage.compute_log_manager import ComputeIOType, ComputeLogFileData
from dagster._core.storage.dagster_run import CANCELABLE_RUN_STATUSES
Expand All @@ -19,12 +21,11 @@
)

if TYPE_CHECKING:
from dagster_graphql.schema.roots.mutation import GrapheneTerminateRunPolicy
from dagster_graphql.schema.roots.mutation import (
GrapheneTerminateRunPolicy,
)

from ..utils import (
assert_permission,
assert_permission_for_location,
)
from ..utils import assert_permission, assert_permission_for_location
from .backfill import (
cancel_partition_backfill as cancel_partition_backfill,
create_and_launch_partition_backfill as create_and_launch_partition_backfill,
Expand All @@ -47,6 +48,7 @@
GrapheneAssetWipeSuccess,
GrapheneDeletePipelineRunSuccess,
GrapheneTerminateRunFailure,
GrapheneTerminateRunsResult,
GrapheneTerminateRunSuccess,
)

Expand All @@ -71,7 +73,9 @@ def _force_mark_as_canceled(


def terminate_pipeline_execution(
graphene_info: "ResolveInfo", run_id: str, terminate_policy: "GrapheneTerminateRunPolicy"
graphene_info: "ResolveInfo",
run_id: str,
terminate_policy: "GrapheneTerminateRunPolicy",
) -> Union["GrapheneTerminateRunSuccess", "GrapheneTerminateRunFailure"]:
from ...schema.errors import GrapheneRunNotFoundError
from ...schema.pipelines.pipeline import GrapheneRun
Expand All @@ -92,7 +96,6 @@ def terminate_pipeline_execution(
)

if not record:
assert_permission(graphene_info, Permissions.TERMINATE_PIPELINE_EXECUTION)
return GrapheneRunNotFoundError(run_id)

run = record.dagster_run
Expand All @@ -101,11 +104,19 @@ def terminate_pipeline_execution(
location_name = run.external_job_origin.location_name if run.external_job_origin else None

if location_name:
assert_permission_for_location(
graphene_info, Permissions.TERMINATE_PIPELINE_EXECUTION, location_name
)
if not graphene_info.context.has_permission_for_location(
Permissions.TERMINATE_PIPELINE_EXECUTION, location_name
):
return GrapheneTerminateRunFailure(
run=graphene_run,
message="You do not have permission to terminate this run",
)
else:
assert_permission(graphene_info, Permissions.TERMINATE_PIPELINE_EXECUTION)
if not graphene_info.context.has_permission(Permissions.TERMINATE_PIPELINE_EXECUTION):
return GrapheneTerminateRunFailure(
run=graphene_run,
message="You do not have permission to terminate this run",
)

can_cancel_run = run.status in CANCELABLE_RUN_STATUSES

Expand Down Expand Up @@ -143,6 +154,30 @@ def terminate_pipeline_execution(
)


def terminate_pipeline_execution_for_runs(
graphene_info: "ResolveInfo",
run_ids: Sequence[str],
terminate_policy: "GrapheneTerminateRunPolicy",
) -> "GrapheneTerminateRunsResult":
from ...schema.roots.mutation import (
GrapheneTerminateRunsResult,
)

check.sequence_param(run_ids, "run_id", of_type=str)

terminate_run_results = []

for run_id in run_ids:
result = terminate_pipeline_execution(
graphene_info,
run_id,
terminate_policy,
)
terminate_run_results.append(result)

return GrapheneTerminateRunsResult(terminateRunResults=terminate_run_results)


def delete_pipeline_run(
graphene_info: "ResolveInfo", run_id: str
) -> Union["GrapheneDeletePipelineRunSuccess", "GrapheneRunNotFoundError"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ...implementation.execution import (
delete_pipeline_run,
terminate_pipeline_execution,
terminate_pipeline_execution_for_runs,
wipe_assets,
)
from ...implementation.external import fetch_workspace, get_full_external_job_or_raise
Expand Down Expand Up @@ -243,12 +244,28 @@ class Meta:
GrapheneTerminateRunSuccess,
GrapheneTerminateRunFailure,
GrapheneRunNotFoundError,
GrapheneUnauthorizedError,
GraphenePythonError,
)
name = "TerminateRunResult"


class GrapheneTerminateRunsResult(graphene.ObjectType):
"""Indicates the runs that successfully terminated and those that failed to terminate."""

terminateRunResults = non_null_list(GrapheneTerminateRunResult)

class Meta:
name = "TerminateRunsResult"


class GrapheneTerminateRunsResultOrError(graphene.Union):
"""The output from runs termination."""

class Meta:
types = (GrapheneTerminateRunsResult, GraphenePythonError)
name = "TerminateRunsResultOrError"


def create_execution_params_and_launch_pipeline_exec(graphene_info, execution_params_dict):
execution_params = create_execution_params(graphene_info, execution_params_dict)
assert_permission_for_location(
Expand Down Expand Up @@ -451,6 +468,33 @@ def mutate(
)


class GrapheneTerminateRunsMutation(graphene.Mutation):
"""Terminates a set of runs given their run IDs."""

Output = graphene.NonNull(GrapheneTerminateRunsResultOrError)

class Arguments:
runIds = non_null_list(graphene.String)
terminatePolicy = graphene.Argument(GrapheneTerminateRunPolicy)

class Meta:
name = "TerminateRunsMutation"

@capture_error
@require_permission_check(Permissions.TERMINATE_PIPELINE_EXECUTION)
def mutate(
self,
graphene_info: ResolveInfo,
runIds: Sequence[str],
terminatePolicy: Optional[GrapheneTerminateRunPolicy] = None,
):
return terminate_pipeline_execution_for_runs(
graphene_info,
runIds,
terminatePolicy or GrapheneTerminateRunPolicy.SAFE_TERMINATE,
)


class GrapheneReloadRepositoryLocationMutationResult(graphene.Union):
"""The output from reloading a code location server."""

Expand Down Expand Up @@ -767,6 +811,7 @@ class Meta:
scheduleDryRun = GrapheneScheduleDryRunMutation.Field()
terminatePipelineExecution = GrapheneTerminateRunMutation.Field()
terminateRun = GrapheneTerminateRunMutation.Field()
terminateRuns = GrapheneTerminateRunsMutation.Field()
deletePipelineRun = GrapheneDeleteRunMutation.Field()
deleteRun = GrapheneDeleteRunMutation.Field()
reloadRepositoryLocation = GrapheneReloadRepositoryLocationMutation.Field()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dagster._core.execution.backfill import BulkActionStatus
from dagster._core.nux import get_has_seen_nux
from dagster._core.scheduler.instigation import InstigatorStatus, InstigatorType
from dagster._core.workspace.permissions import Permissions

from dagster_graphql.implementation.fetch_auto_materialize_asset_evaluations import (
fetch_auto_materialize_asset_evaluations,
Expand Down Expand Up @@ -433,6 +434,11 @@ class Meta:
description="Retrieve the set of permissions for the Dagster deployment.",
)

canBulkTerminate = graphene.Field(
graphene.NonNull(graphene.Boolean),
description="Returns whether the user has permission to terminate runs in the deployment",
)

assetsLatestInfo = graphene.Field(
non_null_list(GrapheneAssetLatestInfo),
assetKeys=graphene.Argument(non_null_list(GrapheneAssetKeyInput)),
Expand Down Expand Up @@ -944,6 +950,9 @@ def resolve_permissions(self, graphene_info: ResolveInfo):
permissions = graphene_info.context.permissions
return [GraphenePermission(permission, value) for permission, value in permissions.items()]

def resolve_canBulkTerminate(self, graphene_info: ResolveInfo) -> bool:
return graphene_info.context.has_permission(Permissions.TERMINATE_PIPELINE_EXECUTION)

def resolve_assetsLatestInfo(
self, graphene_info: ResolveInfo, assetKeys: Sequence[GrapheneAssetKeyInput]
):
Expand Down
Loading

1 comment on commit c9cbe64

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-9npkcy31g-elementl.vercel.app

Built with commit c9cbe64.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.