Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Waiting for handlers to finish in all exit cases + abort and compensation in a message handler #144

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Waiting for message handlers, and performing compensation and cleanup in message handlers

This sample demonstrates the following recommended practices:

1. Ensuring that all signal and update handlers are finished before a successful
workflow return, and on workflow failure, cancellation, and continue-as-new.
2. Performing necessary compensation/cleanup in an update handler when the
workflow is cancelled, fails, or continues-as-new.


To run, open two terminals and `cd` to this directory in them.

Run the worker in one terminal:

poetry run python worker.py

And run the workflow-starter code in the other terminal:

poetry run python starter.py


Here's the output you'll see, along with some explanation:

```
workflow exit type: success
update action on premature workflow exit: continue
👇 [Caller gets a successful update response because main workflow method waits for handlers to finish]
🟢 caller received update result
🟢 caller received workflow result
update action on premature workflow exit: abort_with_compensation
👇 [Same as above: the workflow is successful for action-on-premature exit is irrelevant]
🟢 caller received update result
🟢 caller received workflow result


workflow exit type: failure
update action on premature workflow exit: continue
👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to workflow failure]
🟢 caller received update result
🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow
update action on premature workflow exit: abort_with_compensation
👇 [update aborts, compensates and raises => caller gets failed update result]
🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited: deliberately failing workflow
🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow


workflow exit type: cancellation
update action on premature workflow exit: continue
👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to workflow cancellation]
🟢 caller received update result
🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled
update action on premature workflow exit: abort_with_compensation
👇 [update aborts, compensates and raises => caller gets failed update result]
🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited:
🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled


workflow exit type: continue_as_new
update action on premature workflow exit: continue
👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to continue-as-new]
🟢 caller received update result
👇 [a second update is sent to the post-CAN run, which run succeeds, hence update succeeds]
🟢 caller received update result
🟢 caller received workflow result
update action on premature workflow exit: abort_with_compensation
👇 [update aborts, compensates and raises => caller gets failed update result]
🔴 caught exception while waiting for update result: update "50cd58dc-2db7-4a70-9204-bf5922203203" not found:
👇 [a second update is sent to the post-CAN run, which run succeeds, hence update succeeds]
🟢 caller received update result
🟢 caller received workflow result
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from dataclasses import dataclass
from enum import IntEnum

TASK_QUEUE = "my-task-queue"
WORKFLOW_ID = "my-workflow-id"


class WorkflowExitType(IntEnum):
SUCCESS = 0
FAILURE = 1
CONTINUE_AS_NEW = 2
CANCELLATION = 3


@dataclass
class WorkflowInput:
exit_type: WorkflowExitType


class OnWorkflowExitAction(IntEnum):
CONTINUE = 0
ABORT_WITH_COMPENSATION = 1


@dataclass
class UpdateInput:
on_premature_workflow_exit: OnWorkflowExitAction
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import asyncio

from temporalio import activity


@activity.defn
async def my_activity():
await asyncio.sleep(1)
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import asyncio
from contextlib import contextmanager

from temporalio import client, common

from message_passing.message_handler_waiting_compensation_cleanup import (
TASK_QUEUE,
WORKFLOW_ID,
OnWorkflowExitAction,
UpdateInput,
WorkflowExitType,
WorkflowInput,
)
from message_passing.message_handler_waiting_compensation_cleanup.workflows import (
MyWorkflow,
)


async def starter(exit_type: WorkflowExitType, update_action: OnWorkflowExitAction):
cl = await client.Client.connect("localhost:7233")
wf_handle = await cl.start_workflow(
MyWorkflow.run,
WorkflowInput(exit_type=exit_type),
id=WORKFLOW_ID,
task_queue=TASK_QUEUE,
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
dandavison marked this conversation as resolved.
Show resolved Hide resolved
)
await _check_run(wf_handle, exit_type, update_action)


async def _check_run(
wf_handle: client.WorkflowHandle,
exit_type: WorkflowExitType,
update_action: OnWorkflowExitAction,
):
with catch("starting update"):
up_handle = await wf_handle.start_update(
MyWorkflow.my_update,
UpdateInput(on_premature_workflow_exit=update_action),
wait_for_stage=client.WorkflowUpdateStage.ACCEPTED,
)

if exit_type == WorkflowExitType.CANCELLATION:
await wf_handle.cancel()

with catch("waiting for update result"):
await up_handle.result()
print(" 🟢 caller received update result")

if exit_type == WorkflowExitType.CONTINUE_AS_NEW:
await _check_run(wf_handle, WorkflowExitType.SUCCESS, update_action)
else:
with catch("waiting for workflow result"):
await wf_handle.result()
print(" 🟢 caller received workflow result")


@contextmanager
def catch(operation: str):
dandavison marked this conversation as resolved.
Show resolved Hide resolved
try:
yield
except Exception as e:
cause = getattr(e, "cause", None)
print(f" 🔴 caught exception while {operation}: {e}: {cause or ''}")
dandavison marked this conversation as resolved.
Show resolved Hide resolved


async def main():
for exit_type in [
WorkflowExitType.SUCCESS,
WorkflowExitType.FAILURE,
WorkflowExitType.CANCELLATION,
WorkflowExitType.CONTINUE_AS_NEW,
]:
dandavison marked this conversation as resolved.
Show resolved Hide resolved
print(f"\n\nworkflow exit type: {exit_type}")
for update_action in [
OnWorkflowExitAction.CONTINUE,
OnWorkflowExitAction.ABORT_WITH_COMPENSATION,
]:
print(f" update action on premature workflow exit: {update_action}")
await starter(exit_type, update_action)


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from message_passing.message_handler_waiting_compensation_cleanup import TASK_QUEUE
from message_passing.message_handler_waiting_compensation_cleanup.activities import (
my_activity,
)
from message_passing.message_handler_waiting_compensation_cleanup.workflows import (
MyWorkflow,
)

interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)

client = await Client.connect("localhost:7233")

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[MyWorkflow],
activities=[my_activity],
):
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import asyncio
from datetime import timedelta

from temporalio import exceptions, workflow

from message_passing.message_handler_waiting_compensation_cleanup import (
OnWorkflowExitAction,
UpdateInput,
WorkflowExitType,
WorkflowInput,
)
from message_passing.message_handler_waiting_compensation_cleanup.activities import (
my_activity,
)


@workflow.defn
class MyWorkflow:
"""
This Workflow upholds the following recommended practices:

1. The main workflow method ensures that all signal and update handlers are
finished before a successful return, and on failure, cancellation, and
continue-as-new.
2. The update handler performs any necessary compensation/cleanup when the
workflow is cancelled, fails, or continues-as-new.
"""

def __init__(self) -> None:
self.workflow_exit_exception: asyncio.Future[BaseException] = asyncio.Future()

@workflow.run
async def run(self, input: WorkflowInput) -> str:
try:
# 👉 Use this `try...except` style, instead of waiting for message
# handlers to finish in a `finally` block. The reason is that other
# exception types will cause a Workflow Task failure, in which case
# we do *not* want to wait for message handlers to finish.
result = await self._run(input)
dandavison marked this conversation as resolved.
Show resolved Hide resolved
await workflow.wait_condition(workflow.all_handlers_finished)
return result
except (
asyncio.CancelledError,
workflow.ContinueAsNewError,
exceptions.FailureError,
dandavison marked this conversation as resolved.
Show resolved Hide resolved
) as exc:
self.workflow_exit_exception.set_result(exc)
await workflow.wait_condition(workflow.all_handlers_finished)
raise exc

@workflow.update
async def my_update(self, input: UpdateInput) -> str:
"""
This update handler demonstrates how to handle the situation where the
main Workflow method exits prematurely. In that case we perform
compensation/cleanup, and fail the Update. The Update caller will get a
WorkflowUpdateFailedError.
"""
# Coroutines must be wrapped in tasks in order to use workflow.wait.
update_task = asyncio.Task(self._my_update())
dandavison marked this conversation as resolved.
Show resolved Hide resolved
# 👉 Always use `workflow.wait` instead of `asyncio.wait` in Workflow
# code: asyncio's version is non-deterministic.
first_completed, _ = await workflow.wait( # type: ignore
[update_task, self.workflow_exit_exception],
return_when=asyncio.FIRST_COMPLETED,
)
# 👉 It's possible that the update completed and the workflow exited
# prematurely in the same tick of the event loop. If the Update has
# completed, return the Update result to the caller, whether or not the
# Workflow is exiting.
if (
update_task in first_completed
or input.on_premature_workflow_exit == OnWorkflowExitAction.CONTINUE
):
return await update_task
else:
await self._my_update_compensation_and_cleanup()
dandavison marked this conversation as resolved.
Show resolved Hide resolved
dandavison marked this conversation as resolved.
Show resolved Hide resolved
raise exceptions.ApplicationError(
f"The update failed because the workflow run exited: {await self.workflow_exit_exception}"
)
dandavison marked this conversation as resolved.
Show resolved Hide resolved

async def _my_update(self) -> str:
"""
This handler calls a slow activity, so

(1) In the case where the workflow finishes successfully, the worker
would get an UnfinishedUpdateHandlersWarning (TMPRL1102) if the main
workflow task didn't wait for it to finish.

(2) In the other cases (failure, cancellation, and continue-as-new), the
premature workflow exit will occur before the update is finished.
"""
# Ignore: implementation detail specific to this sample
self._update_started = True
dandavison marked this conversation as resolved.
Show resolved Hide resolved

await workflow.execute_activity(
my_activity, start_to_close_timeout=timedelta(seconds=10)
)
return "update-result"

async def _my_update_compensation_and_cleanup(self):
workflow.logger.info(
"performing update handler compensation and cleanup operations"
dandavison marked this conversation as resolved.
Show resolved Hide resolved
)

async def _run(self, input: WorkflowInput) -> str:
# Ignore this method unless you are interested in the implementation
# details of this sample.

# Wait until handlers started, so that we are demonstrating that we wait for them to finish.
await workflow.wait_condition(lambda: getattr(self, "_update_started", False))
dandavison marked this conversation as resolved.
Show resolved Hide resolved
if input.exit_type == WorkflowExitType.SUCCESS:
return "workflow-result"
elif input.exit_type == WorkflowExitType.CONTINUE_AS_NEW:
workflow.continue_as_new(WorkflowInput(exit_type=WorkflowExitType.SUCCESS))
elif input.exit_type == WorkflowExitType.FAILURE:
raise exceptions.ApplicationError("deliberately failing workflow")
dandavison marked this conversation as resolved.
Show resolved Hide resolved
elif input.exit_type == WorkflowExitType.CANCELLATION:
# Block forever; the starter will send a workflow cancellation request.
await asyncio.Future()
raise AssertionError("unreachable")