Skip to content

Commit

Permalink
Add a ResumeFlowRun automations action (#15269)
Browse files Browse the repository at this point in the history
Co-authored-by: Chris White <[email protected]>
  • Loading branch information
abrookins and cicdw authored Sep 9, 2024
1 parent 9390684 commit 789d083
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 6 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ repos:
.pre-commit-config.yaml|
src/prefect/server/api/.*|
src/prefect/server/schemas/.*|
src/prefect/server/events/.*|
scripts/generate_mintlify_openapi_docs.py
)$
43 changes: 43 additions & 0 deletions docs/3.0/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -10664,6 +10664,9 @@
{
"$ref": "#/components/schemas/SuspendFlowRun"
},
{
"$ref": "#/components/schemas/ResumeFlowRun"
},
{
"$ref": "#/components/schemas/PauseWorkPool"
},
Expand Down Expand Up @@ -10718,6 +10721,9 @@
{
"$ref": "#/components/schemas/SuspendFlowRun"
},
{
"$ref": "#/components/schemas/ResumeFlowRun"
},
{
"$ref": "#/components/schemas/PauseWorkPool"
},
Expand Down Expand Up @@ -10772,6 +10778,9 @@
{
"$ref": "#/components/schemas/SuspendFlowRun"
},
{
"$ref": "#/components/schemas/ResumeFlowRun"
},
{
"$ref": "#/components/schemas/PauseWorkPool"
},
Expand Down Expand Up @@ -10898,6 +10907,9 @@
{
"$ref": "#/components/schemas/SuspendFlowRun"
},
{
"$ref": "#/components/schemas/ResumeFlowRun"
},
{
"$ref": "#/components/schemas/PauseWorkPool"
},
Expand Down Expand Up @@ -10952,6 +10964,9 @@
{
"$ref": "#/components/schemas/SuspendFlowRun"
},
{
"$ref": "#/components/schemas/ResumeFlowRun"
},
{
"$ref": "#/components/schemas/PauseWorkPool"
},
Expand Down Expand Up @@ -11006,6 +11021,9 @@
{
"$ref": "#/components/schemas/SuspendFlowRun"
},
{
"$ref": "#/components/schemas/ResumeFlowRun"
},
{
"$ref": "#/components/schemas/PauseWorkPool"
},
Expand Down Expand Up @@ -11218,6 +11236,9 @@
{
"$ref": "#/components/schemas/SuspendFlowRun"
},
{
"$ref": "#/components/schemas/ResumeFlowRun"
},
{
"$ref": "#/components/schemas/PauseWorkPool"
},
Expand Down Expand Up @@ -11272,6 +11293,9 @@
{
"$ref": "#/components/schemas/SuspendFlowRun"
},
{
"$ref": "#/components/schemas/ResumeFlowRun"
},
{
"$ref": "#/components/schemas/PauseWorkPool"
},
Expand Down Expand Up @@ -11326,6 +11350,9 @@
{
"$ref": "#/components/schemas/SuspendFlowRun"
},
{
"$ref": "#/components/schemas/ResumeFlowRun"
},
{
"$ref": "#/components/schemas/PauseWorkPool"
},
Expand Down Expand Up @@ -20828,6 +20855,22 @@
"title": "ResumeDeployment",
"description": "Resumes the given Deployment"
},
"ResumeFlowRun": {
"properties": {
"type": {
"type": "string",
"enum": [
"resume-flow-run"
],
"const": "resume-flow-run",
"title": "Type",
"default": "resume-flow-run"
}
},
"type": "object",
"title": "ResumeFlowRun",
"description": "Resumes a paused or suspended flow run associated with the trigger"
},
"ResumeWorkPool": {
"properties": {
"type": {
Expand Down
6 changes: 6 additions & 0 deletions src/prefect/events/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ class CancelFlowRun(Action):
type: Literal["cancel-flow-run"] = "cancel-flow-run"


class ResumeFlowRun(Action):
"""Resumes a flow run associated with the trigger"""

type: Literal["resume-flow-run"] = "resume-flow-run"


class SuspendFlowRun(Action):
"""Suspends a flow run associated with the trigger"""

Expand Down
9 changes: 8 additions & 1 deletion src/prefect/server/api/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from prefect.server.schemas.actions import DeploymentFlowRunCreate, StateCreate
from prefect.server.schemas.core import WorkPool
from prefect.server.schemas.filters import VariableFilter, VariableFilterName
from prefect.server.schemas.responses import DeploymentResponse
from prefect.server.schemas.responses import DeploymentResponse, OrchestrationResult
from prefect.types import StrictVariableValue

logger = get_logger(__name__)
Expand Down Expand Up @@ -79,6 +79,13 @@ async def read_flow_run_raw(self, flow_run_id: UUID) -> Response:
async def read_task_run_raw(self, task_run_id: UUID) -> Response:
return await self._http_client.get(f"/task_runs/{task_run_id}")

async def resume_flow_run(self, flow_run_id: UUID) -> OrchestrationResult:
response = await self._http_client.post(
f"/flow_runs/{flow_run_id}/resume",
)
response.raise_for_status()
return OrchestrationResult.model_validate(response.json())

async def pause_deployment(self, deployment_id: UUID) -> Response:
return await self._http_client.post(
f"/deployments/{deployment_id}/pause_deployment",
Expand Down
47 changes: 43 additions & 4 deletions src/prefect/server/events/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -965,10 +965,10 @@ async def command(
return await orchestration.resume_deployment(deployment_id)


class FlowRunStateChangeAction(ExternalDataAction):
"""Changes the state of a flow run associated with the trigger"""
class FlowRunAction(ExternalDataAction):
"""An action that operates on a flow run"""

async def flow_run_to_change(self, triggered_action: "TriggeredAction") -> UUID:
async def flow_run(self, triggered_action: "TriggeredAction") -> UUID:
# Proactive triggers won't have an event, but they might be tracking
# buckets per-resource, so check for that first
labels = triggered_action.triggering_labels
Expand All @@ -983,12 +983,16 @@ async def flow_run_to_change(self, triggered_action: "TriggeredAction") -> UUID:

raise ActionFailed("No flow run could be inferred")


class FlowRunStateChangeAction(FlowRunAction):
"""Changes the state of a flow run associated with the trigger"""

@abc.abstractmethod
async def new_state(self, triggered_action: "TriggeredAction") -> StateCreate:
"""Return the new state for the flow run"""

async def act(self, triggered_action: "TriggeredAction") -> None:
flow_run_id = await self.flow_run_to_change(triggered_action)
flow_run_id = await self.flow_run(triggered_action)

self._resulting_related_resources.append(
RelatedResource.model_validate(
Expand Down Expand Up @@ -1083,6 +1087,40 @@ async def new_state(self, triggered_action: "TriggeredAction") -> StateCreate:
)


class ResumeFlowRun(FlowRunAction):
"""Resumes a paused or suspended flow run associated with the trigger"""

type: Literal["resume-flow-run"] = "resume-flow-run"

async def act(self, triggered_action: "TriggeredAction") -> None:
flow_run_id = await self.flow_run(triggered_action)

self._resulting_related_resources.append(
RelatedResource.model_validate(
{
"prefect.resource.id": f"prefect.flow-run.{flow_run_id}",
"prefect.resource.role": "target",
}
)
)

logger.debug(
"Resuming flow run",
extra={
"flow_run_id": str(flow_run_id),
**self.logging_context(triggered_action),
},
)

async with await self.orchestration_client(triggered_action) as orchestration:
result = await orchestration.resume_flow_run(flow_run_id)

if not isinstance(result.details, StateAcceptDetails):
raise ActionFailed(
f"Failed to resume flow run: {result.details.reason}"
)


class CallWebhook(JinjaTemplateAction):
"""Call a webhook when an Automation is triggered."""

Expand Down Expand Up @@ -1636,6 +1674,7 @@ async def command(
PauseAutomation,
ResumeAutomation,
SuspendFlowRun,
ResumeFlowRun,
PauseWorkPool,
ResumeWorkPool,
]
Expand Down
Loading

0 comments on commit 789d083

Please sign in to comment.