From c566b62d916b2b8555136b470031b7b98c09931b Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 2 Oct 2024 07:35:34 -0400 Subject: [PATCH] message_handler_waiting_compensation_cleanup --- .../README.md | 71 ++++++++++ .../__init__.py | 27 ++++ .../activities.py | 8 ++ .../starter.py | 84 ++++++++++++ .../worker.py | 40 ++++++ .../workflows.py | 121 ++++++++++++++++++ 6 files changed, 351 insertions(+) create mode 100644 message_passing/message_handler_waiting_compensation_cleanup/README.md create mode 100644 message_passing/message_handler_waiting_compensation_cleanup/__init__.py create mode 100644 message_passing/message_handler_waiting_compensation_cleanup/activities.py create mode 100644 message_passing/message_handler_waiting_compensation_cleanup/starter.py create mode 100644 message_passing/message_handler_waiting_compensation_cleanup/worker.py create mode 100644 message_passing/message_handler_waiting_compensation_cleanup/workflows.py diff --git a/message_passing/message_handler_waiting_compensation_cleanup/README.md b/message_passing/message_handler_waiting_compensation_cleanup/README.md new file mode 100644 index 0000000..c112b4f --- /dev/null +++ b/message_passing/message_handler_waiting_compensation_cleanup/README.md @@ -0,0 +1,71 @@ +# Waiting for message handlers, and performing compensation and cleanup in message handlers + +This sample demonstrates the following recommended practices: + +1. Ensuring that all signal and update handlers are finished before a successful + workflow return, and on workflow failure, cancellation, and continue-as-new. +2. Performs any necessary compensation/cleanup in an update handler when the + workflow is cancelled, fails, or continues-as-new. + + +To run, open two terminals and `cd` to this directory in them. + +Run the worker in one terminal: + + poetry run python worker.py + +And run the workflow-starter code in the other terminal: + + poetry run python starter.py + + +Here's the output you'll see, along with some explanation: + +``` +workflow exit type: success + update action on premature workflow exit: continue + 👇 [Caller gets a successful update response because main workflow method waits for handlers to finish] + 🟢 caller received update result + 🟢 caller received workflow result + update action on premature workflow exit: abort_with_compensation + 👇 [Same as above: the workflow is successful for action-on-premature exit is irrelevant] + 🟢 caller received update result + 🟢 caller received workflow result + + +workflow exit type: failure + update action on premature workflow exit: continue + 👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to workflow failure] + 🟢 caller received update result + 🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow + update action on premature workflow exit: abort_with_compensation + 👇 [update aborts, compensates and raises => caller gets failed update result] + 🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited: deliberately failing workflow + 🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow + + +workflow exit type: cancellation + update action on premature workflow exit: continue + 👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to workflow cancellation] + 🟢 caller received update result + 🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled + update action on premature workflow exit: abort_with_compensation + 👇 [update aborts, compensates and raises => caller gets failed update result] + 🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited: + 🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled + + +workflow exit type: continue_as_new + update action on premature workflow exit: continue + 👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to continue-as-new] + 🟢 caller received update result + 👇 [a second update is sent to the post-CAN run, which run succeeds, hence update succeeds] + 🟢 caller received update result + 🟢 caller received workflow result + update action on premature workflow exit: abort_with_compensation + 👇 [update aborts, compensates and raises => caller gets failed update result] + 🔴 caught exception while waiting for update result: update "50cd58dc-2db7-4a70-9204-bf5922203203" not found: + 👇 [a second update is sent to the post-CAN run, which run succeeds, hence update succeeds] + 🟢 caller received update result + 🟢 caller received workflow result +``` \ No newline at end of file diff --git a/message_passing/message_handler_waiting_compensation_cleanup/__init__.py b/message_passing/message_handler_waiting_compensation_cleanup/__init__.py new file mode 100644 index 0000000..b255147 --- /dev/null +++ b/message_passing/message_handler_waiting_compensation_cleanup/__init__.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass +from enum import StrEnum + +TASK_QUEUE = "my-task-queue" +WORKFLOW_ID = "my-workflow-id" + + +class WorkflowExitType(StrEnum): + SUCCESS = "success" + FAILURE = "failure" + CONTINUE_AS_NEW = "continue_as_new" + CANCELLATION = "cancellation" + + +@dataclass +class WorkflowInput: + exit_type: WorkflowExitType + + +class OnWorkflowExitAction(StrEnum): + CONTINUE = "continue" + ABORT_WITH_COMPENSATION = "abort_with_compensation" + + +@dataclass +class UpdateInput: + on_premature_workflow_exit: OnWorkflowExitAction diff --git a/message_passing/message_handler_waiting_compensation_cleanup/activities.py b/message_passing/message_handler_waiting_compensation_cleanup/activities.py new file mode 100644 index 0000000..1a4a16d --- /dev/null +++ b/message_passing/message_handler_waiting_compensation_cleanup/activities.py @@ -0,0 +1,8 @@ +import asyncio + +from temporalio import activity + + +@activity.defn +async def my_activity(): + await asyncio.sleep(1) diff --git a/message_passing/message_handler_waiting_compensation_cleanup/starter.py b/message_passing/message_handler_waiting_compensation_cleanup/starter.py new file mode 100644 index 0000000..e436881 --- /dev/null +++ b/message_passing/message_handler_waiting_compensation_cleanup/starter.py @@ -0,0 +1,84 @@ +import asyncio +from contextlib import contextmanager + +from temporalio import client, common + +from message_passing.message_handler_waiting_compensation_cleanup import ( + TASK_QUEUE, + WORKFLOW_ID, + OnWorkflowExitAction, + UpdateInput, + WorkflowExitType, + WorkflowInput, +) +from message_passing.message_handler_waiting_compensation_cleanup.workflows import ( + MyWorkflow, +) + + +async def starter(exit_type: WorkflowExitType, update_action: OnWorkflowExitAction): + cl = await client.Client.connect("localhost:7233") + wf_handle = await cl.start_workflow( + MyWorkflow.run, + WorkflowInput(exit_type=exit_type), + id=WORKFLOW_ID, + task_queue=TASK_QUEUE, + id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, + ) + await _check_run(wf_handle, exit_type, update_action) + + +async def _check_run( + wf_handle: client.WorkflowHandle, + exit_type: WorkflowExitType, + update_action: OnWorkflowExitAction, +): + with catch("starting update"): + up_handle = await wf_handle.start_update( + MyWorkflow.my_update, + UpdateInput(on_premature_workflow_exit=update_action), + wait_for_stage=client.WorkflowUpdateStage.ACCEPTED, + ) + + if exit_type == WorkflowExitType.CANCELLATION: + await wf_handle.cancel() + + with catch("waiting for update result"): + await up_handle.result() + print(" 🟢 caller received update result") + + if exit_type == WorkflowExitType.CONTINUE_AS_NEW: + await _check_run(wf_handle, WorkflowExitType.SUCCESS, update_action) + else: + with catch("waiting for workflow result"): + await wf_handle.result() + print(" 🟢 caller received workflow result") + + +@contextmanager +def catch(operation: str): + try: + yield + except Exception as e: + cause = getattr(e, "cause", None) + print(f" 🔴 caught exception while {operation}: {e}: {cause or ''}") + + +async def main(): + for exit_type in [ + WorkflowExitType.SUCCESS, + WorkflowExitType.FAILURE, + WorkflowExitType.CANCELLATION, + WorkflowExitType.CONTINUE_AS_NEW, + ]: + print(f"\n\nworkflow exit type: {exit_type}") + for update_action in [ + OnWorkflowExitAction.CONTINUE, + OnWorkflowExitAction.ABORT_WITH_COMPENSATION, + ]: + print(f" update action on premature workflow exit: {update_action}") + await starter(exit_type, update_action) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/message_passing/message_handler_waiting_compensation_cleanup/worker.py b/message_passing/message_handler_waiting_compensation_cleanup/worker.py new file mode 100644 index 0000000..c53cc44 --- /dev/null +++ b/message_passing/message_handler_waiting_compensation_cleanup/worker.py @@ -0,0 +1,40 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from message_passing.message_handler_waiting_compensation_cleanup import TASK_QUEUE +from message_passing.message_handler_waiting_compensation_cleanup.activities import ( + my_activity, +) +from message_passing.message_handler_waiting_compensation_cleanup.workflows import ( + MyWorkflow, +) + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + client = await Client.connect("localhost:7233") + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[MyWorkflow], + activities=[my_activity], + ): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/message_passing/message_handler_waiting_compensation_cleanup/workflows.py b/message_passing/message_handler_waiting_compensation_cleanup/workflows.py new file mode 100644 index 0000000..0adafbc --- /dev/null +++ b/message_passing/message_handler_waiting_compensation_cleanup/workflows.py @@ -0,0 +1,121 @@ +import asyncio +from datetime import timedelta + +from temporalio import exceptions, workflow + +from message_passing.message_handler_waiting_compensation_cleanup import ( + OnWorkflowExitAction, + UpdateInput, + WorkflowExitType, + WorkflowInput, +) +from message_passing.message_handler_waiting_compensation_cleanup.activities import ( + my_activity, +) + + +@workflow.defn +class MyWorkflow: + """ + This Workflow upholds the following recommended practices: + + 1. The main workflow method ensures that all signal and update handlers are + finished before a successful return, and on failure, cancellation, and + continue-as-new. + 2. The update handler performs any necessary compensation/cleanup when the + workflow is cancelled, fails, or continues-as-new. + """ + + def __init__(self) -> None: + self.workflow_exit_exception: asyncio.Future[BaseException] = asyncio.Future() + + @workflow.run + async def run(self, input: WorkflowInput) -> str: + try: + # 👉 Use this `try...except` style, instead of waiting for message + # handlers to finish in a `finally` block. The reason is that other + # exception types will cause a Workflow Task failure, in which case + # we do *not* want to wait for message handlers to finish. + result = await self._run(input) + await workflow.wait_condition(workflow.all_handlers_finished) + return result + except ( + asyncio.CancelledError, + workflow.ContinueAsNewError, + exceptions.FailureError, + ) as exc: + self.workflow_exit_exception.set_result(exc) + await workflow.wait_condition(workflow.all_handlers_finished) + raise exc + + @workflow.update + async def my_update(self, input: UpdateInput) -> str: + """ + This update handler demonstrates how to handle the situation where the + main Workflow method exits prematurely. In that case we perform + compensation/cleanup, and fail the Update. The Update caller will get a + WorkflowUpdateFailedError. + """ + # Coroutines must be wrapped in tasks in order to use workflow.wait. + update_task = asyncio.Task(self._my_update()) + # 👉 Always use `workflow.wait` instead of `asyncio.wait` in Workflow + # code: asyncio's version is non-deterministic. + first_completed, _ = await workflow.wait( + [update_task, self.workflow_exit_exception], + return_when=asyncio.FIRST_COMPLETED, + ) + # 👉 It's possible that the update completed and the workflow exited + # prematurely in the same tick of the event loop. If the Update has + # completed, return the Update result to the caller, whether or not the + # Workflow is exiting. + if ( + update_task in first_completed + or input.on_premature_workflow_exit == OnWorkflowExitAction.CONTINUE + ): + return await update_task + else: + await self._my_update_compensation_and_cleanup() + raise exceptions.ApplicationError( + f"The update failed because the workflow run exited: {await self.workflow_exit_exception}" + ) + + async def _my_update(self) -> str: + """ + This handler calls a slow activity, so + + (1) In the case where the workflow finishes successfully, the worker + would get an UnfinishedUpdateHandlersWarning (TMPRL1102) if the main + workflow task didn't wait for it to finish. + + (2) In the other cases (failure, cancellation, and continue-as-new), the + premature workflow exit will occur before the update is finished. + """ + # Ignore: implementation detail specific to this sample + self._update_started = True + + await workflow.execute_activity( + my_activity, start_to_close_timeout=timedelta(seconds=10) + ) + return "update-result" + + async def _my_update_compensation_and_cleanup(self): + workflow.logger.info( + "performing update handler compensation and cleanup operations" + ) + + async def _run(self, input: WorkflowInput) -> str: + # Ignore this method unless you are interested in the implementation + # details of this sample. + + # Wait until handlers started, so that we are demonstrating that we wait for them to finish. + await workflow.wait_condition(lambda: getattr(self, "_update_started", False)) + if input.exit_type == WorkflowExitType.SUCCESS: + return "workflow-result" + elif input.exit_type == WorkflowExitType.CONTINUE_AS_NEW: + workflow.continue_as_new(WorkflowInput(exit_type=WorkflowExitType.SUCCESS)) + elif input.exit_type == WorkflowExitType.FAILURE: + raise exceptions.ApplicationError("deliberately failing workflow") + elif input.exit_type == WorkflowExitType.CANCELLATION: + # Block forever; the starter will send a workflow cancellation request. + await asyncio.Future() + raise AssertionError("unreachable")