From b3bebf882cfd84201bed3f1fb3d88bb4904571b9 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 2 Oct 2024 07:35:34 -0400 Subject: [PATCH 01/10] message_handler_waiting_compensation_cleanup --- .../README.md | 71 ++++++++++ .../__init__.py | 27 ++++ .../activities.py | 8 ++ .../starter.py | 84 ++++++++++++ .../worker.py | 40 ++++++ .../workflows.py | 121 ++++++++++++++++++ 6 files changed, 351 insertions(+) create mode 100644 message_passing/message_handler_waiting_compensation_cleanup/README.md create mode 100644 message_passing/message_handler_waiting_compensation_cleanup/__init__.py create mode 100644 message_passing/message_handler_waiting_compensation_cleanup/activities.py create mode 100644 message_passing/message_handler_waiting_compensation_cleanup/starter.py create mode 100644 message_passing/message_handler_waiting_compensation_cleanup/worker.py create mode 100644 message_passing/message_handler_waiting_compensation_cleanup/workflows.py diff --git a/message_passing/message_handler_waiting_compensation_cleanup/README.md b/message_passing/message_handler_waiting_compensation_cleanup/README.md new file mode 100644 index 0000000..0bbb1a9 --- /dev/null +++ b/message_passing/message_handler_waiting_compensation_cleanup/README.md @@ -0,0 +1,71 @@ +# Waiting for message handlers, and performing compensation and cleanup in message handlers + +This sample demonstrates the following recommended practices: + +1. Ensuring that all signal and update handlers are finished before a successful + workflow return, and on workflow failure, cancellation, and continue-as-new. +2. Performing necessary compensation/cleanup in an update handler when the + workflow is cancelled, fails, or continues-as-new. + + +To run, open two terminals and `cd` to this directory in them. + +Run the worker in one terminal: + + poetry run python worker.py + +And run the workflow-starter code in the other terminal: + + poetry run python starter.py + + +Here's the output you'll see, along with some explanation: + +``` +workflow exit type: success + update action on premature workflow exit: continue + 👇 [Caller gets a successful update response because main workflow method waits for handlers to finish] + 🟢 caller received update result + 🟢 caller received workflow result + update action on premature workflow exit: abort_with_compensation + 👇 [Same as above: the workflow is successful for action-on-premature exit is irrelevant] + 🟢 caller received update result + 🟢 caller received workflow result + + +workflow exit type: failure + update action on premature workflow exit: continue + 👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to workflow failure] + 🟢 caller received update result + 🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow + update action on premature workflow exit: abort_with_compensation + 👇 [update aborts, compensates and raises => caller gets failed update result] + 🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited: deliberately failing workflow + 🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow + + +workflow exit type: cancellation + update action on premature workflow exit: continue + 👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to workflow cancellation] + 🟢 caller received update result + 🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled + update action on premature workflow exit: abort_with_compensation + 👇 [update aborts, compensates and raises => caller gets failed update result] + 🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited: + 🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled + + +workflow exit type: continue_as_new + update action on premature workflow exit: continue + 👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to continue-as-new] + 🟢 caller received update result + 👇 [a second update is sent to the post-CAN run, which run succeeds, hence update succeeds] + 🟢 caller received update result + 🟢 caller received workflow result + update action on premature workflow exit: abort_with_compensation + 👇 [update aborts, compensates and raises => caller gets failed update result] + 🔴 caught exception while waiting for update result: update "50cd58dc-2db7-4a70-9204-bf5922203203" not found: + 👇 [a second update is sent to the post-CAN run, which run succeeds, hence update succeeds] + 🟢 caller received update result + 🟢 caller received workflow result +``` \ No newline at end of file diff --git a/message_passing/message_handler_waiting_compensation_cleanup/__init__.py b/message_passing/message_handler_waiting_compensation_cleanup/__init__.py new file mode 100644 index 0000000..b255147 --- /dev/null +++ b/message_passing/message_handler_waiting_compensation_cleanup/__init__.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass +from enum import StrEnum + +TASK_QUEUE = "my-task-queue" +WORKFLOW_ID = "my-workflow-id" + + +class WorkflowExitType(StrEnum): + SUCCESS = "success" + FAILURE = "failure" + CONTINUE_AS_NEW = "continue_as_new" + CANCELLATION = "cancellation" + + +@dataclass +class WorkflowInput: + exit_type: WorkflowExitType + + +class OnWorkflowExitAction(StrEnum): + CONTINUE = "continue" + ABORT_WITH_COMPENSATION = "abort_with_compensation" + + +@dataclass +class UpdateInput: + on_premature_workflow_exit: OnWorkflowExitAction diff --git a/message_passing/message_handler_waiting_compensation_cleanup/activities.py b/message_passing/message_handler_waiting_compensation_cleanup/activities.py new file mode 100644 index 0000000..1a4a16d --- /dev/null +++ b/message_passing/message_handler_waiting_compensation_cleanup/activities.py @@ -0,0 +1,8 @@ +import asyncio + +from temporalio import activity + + +@activity.defn +async def my_activity(): + await asyncio.sleep(1) diff --git a/message_passing/message_handler_waiting_compensation_cleanup/starter.py b/message_passing/message_handler_waiting_compensation_cleanup/starter.py new file mode 100644 index 0000000..e436881 --- /dev/null +++ b/message_passing/message_handler_waiting_compensation_cleanup/starter.py @@ -0,0 +1,84 @@ +import asyncio +from contextlib import contextmanager + +from temporalio import client, common + +from message_passing.message_handler_waiting_compensation_cleanup import ( + TASK_QUEUE, + WORKFLOW_ID, + OnWorkflowExitAction, + UpdateInput, + WorkflowExitType, + WorkflowInput, +) +from message_passing.message_handler_waiting_compensation_cleanup.workflows import ( + MyWorkflow, +) + + +async def starter(exit_type: WorkflowExitType, update_action: OnWorkflowExitAction): + cl = await client.Client.connect("localhost:7233") + wf_handle = await cl.start_workflow( + MyWorkflow.run, + WorkflowInput(exit_type=exit_type), + id=WORKFLOW_ID, + task_queue=TASK_QUEUE, + id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, + ) + await _check_run(wf_handle, exit_type, update_action) + + +async def _check_run( + wf_handle: client.WorkflowHandle, + exit_type: WorkflowExitType, + update_action: OnWorkflowExitAction, +): + with catch("starting update"): + up_handle = await wf_handle.start_update( + MyWorkflow.my_update, + UpdateInput(on_premature_workflow_exit=update_action), + wait_for_stage=client.WorkflowUpdateStage.ACCEPTED, + ) + + if exit_type == WorkflowExitType.CANCELLATION: + await wf_handle.cancel() + + with catch("waiting for update result"): + await up_handle.result() + print(" 🟢 caller received update result") + + if exit_type == WorkflowExitType.CONTINUE_AS_NEW: + await _check_run(wf_handle, WorkflowExitType.SUCCESS, update_action) + else: + with catch("waiting for workflow result"): + await wf_handle.result() + print(" 🟢 caller received workflow result") + + +@contextmanager +def catch(operation: str): + try: + yield + except Exception as e: + cause = getattr(e, "cause", None) + print(f" 🔴 caught exception while {operation}: {e}: {cause or ''}") + + +async def main(): + for exit_type in [ + WorkflowExitType.SUCCESS, + WorkflowExitType.FAILURE, + WorkflowExitType.CANCELLATION, + WorkflowExitType.CONTINUE_AS_NEW, + ]: + print(f"\n\nworkflow exit type: {exit_type}") + for update_action in [ + OnWorkflowExitAction.CONTINUE, + OnWorkflowExitAction.ABORT_WITH_COMPENSATION, + ]: + print(f" update action on premature workflow exit: {update_action}") + await starter(exit_type, update_action) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/message_passing/message_handler_waiting_compensation_cleanup/worker.py b/message_passing/message_handler_waiting_compensation_cleanup/worker.py new file mode 100644 index 0000000..c53cc44 --- /dev/null +++ b/message_passing/message_handler_waiting_compensation_cleanup/worker.py @@ -0,0 +1,40 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from message_passing.message_handler_waiting_compensation_cleanup import TASK_QUEUE +from message_passing.message_handler_waiting_compensation_cleanup.activities import ( + my_activity, +) +from message_passing.message_handler_waiting_compensation_cleanup.workflows import ( + MyWorkflow, +) + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + client = await Client.connect("localhost:7233") + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[MyWorkflow], + activities=[my_activity], + ): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/message_passing/message_handler_waiting_compensation_cleanup/workflows.py b/message_passing/message_handler_waiting_compensation_cleanup/workflows.py new file mode 100644 index 0000000..0adafbc --- /dev/null +++ b/message_passing/message_handler_waiting_compensation_cleanup/workflows.py @@ -0,0 +1,121 @@ +import asyncio +from datetime import timedelta + +from temporalio import exceptions, workflow + +from message_passing.message_handler_waiting_compensation_cleanup import ( + OnWorkflowExitAction, + UpdateInput, + WorkflowExitType, + WorkflowInput, +) +from message_passing.message_handler_waiting_compensation_cleanup.activities import ( + my_activity, +) + + +@workflow.defn +class MyWorkflow: + """ + This Workflow upholds the following recommended practices: + + 1. The main workflow method ensures that all signal and update handlers are + finished before a successful return, and on failure, cancellation, and + continue-as-new. + 2. The update handler performs any necessary compensation/cleanup when the + workflow is cancelled, fails, or continues-as-new. + """ + + def __init__(self) -> None: + self.workflow_exit_exception: asyncio.Future[BaseException] = asyncio.Future() + + @workflow.run + async def run(self, input: WorkflowInput) -> str: + try: + # 👉 Use this `try...except` style, instead of waiting for message + # handlers to finish in a `finally` block. The reason is that other + # exception types will cause a Workflow Task failure, in which case + # we do *not* want to wait for message handlers to finish. + result = await self._run(input) + await workflow.wait_condition(workflow.all_handlers_finished) + return result + except ( + asyncio.CancelledError, + workflow.ContinueAsNewError, + exceptions.FailureError, + ) as exc: + self.workflow_exit_exception.set_result(exc) + await workflow.wait_condition(workflow.all_handlers_finished) + raise exc + + @workflow.update + async def my_update(self, input: UpdateInput) -> str: + """ + This update handler demonstrates how to handle the situation where the + main Workflow method exits prematurely. In that case we perform + compensation/cleanup, and fail the Update. The Update caller will get a + WorkflowUpdateFailedError. + """ + # Coroutines must be wrapped in tasks in order to use workflow.wait. + update_task = asyncio.Task(self._my_update()) + # 👉 Always use `workflow.wait` instead of `asyncio.wait` in Workflow + # code: asyncio's version is non-deterministic. + first_completed, _ = await workflow.wait( + [update_task, self.workflow_exit_exception], + return_when=asyncio.FIRST_COMPLETED, + ) + # 👉 It's possible that the update completed and the workflow exited + # prematurely in the same tick of the event loop. If the Update has + # completed, return the Update result to the caller, whether or not the + # Workflow is exiting. + if ( + update_task in first_completed + or input.on_premature_workflow_exit == OnWorkflowExitAction.CONTINUE + ): + return await update_task + else: + await self._my_update_compensation_and_cleanup() + raise exceptions.ApplicationError( + f"The update failed because the workflow run exited: {await self.workflow_exit_exception}" + ) + + async def _my_update(self) -> str: + """ + This handler calls a slow activity, so + + (1) In the case where the workflow finishes successfully, the worker + would get an UnfinishedUpdateHandlersWarning (TMPRL1102) if the main + workflow task didn't wait for it to finish. + + (2) In the other cases (failure, cancellation, and continue-as-new), the + premature workflow exit will occur before the update is finished. + """ + # Ignore: implementation detail specific to this sample + self._update_started = True + + await workflow.execute_activity( + my_activity, start_to_close_timeout=timedelta(seconds=10) + ) + return "update-result" + + async def _my_update_compensation_and_cleanup(self): + workflow.logger.info( + "performing update handler compensation and cleanup operations" + ) + + async def _run(self, input: WorkflowInput) -> str: + # Ignore this method unless you are interested in the implementation + # details of this sample. + + # Wait until handlers started, so that we are demonstrating that we wait for them to finish. + await workflow.wait_condition(lambda: getattr(self, "_update_started", False)) + if input.exit_type == WorkflowExitType.SUCCESS: + return "workflow-result" + elif input.exit_type == WorkflowExitType.CONTINUE_AS_NEW: + workflow.continue_as_new(WorkflowInput(exit_type=WorkflowExitType.SUCCESS)) + elif input.exit_type == WorkflowExitType.FAILURE: + raise exceptions.ApplicationError("deliberately failing workflow") + elif input.exit_type == WorkflowExitType.CANCELLATION: + # Block forever; the starter will send a workflow cancellation request. + await asyncio.Future() + raise AssertionError("unreachable") From 9355bc289609cf6d52a1a69b460d84afe5a5d755 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 2 Oct 2024 13:23:33 -0400 Subject: [PATCH 02/10] 3.8 compat --- .../__init__.py | 18 +++++++++--------- .../workflows.py | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/message_passing/message_handler_waiting_compensation_cleanup/__init__.py b/message_passing/message_handler_waiting_compensation_cleanup/__init__.py index b255147..a27233c 100644 --- a/message_passing/message_handler_waiting_compensation_cleanup/__init__.py +++ b/message_passing/message_handler_waiting_compensation_cleanup/__init__.py @@ -1,15 +1,15 @@ from dataclasses import dataclass -from enum import StrEnum +from enum import IntEnum TASK_QUEUE = "my-task-queue" WORKFLOW_ID = "my-workflow-id" -class WorkflowExitType(StrEnum): - SUCCESS = "success" - FAILURE = "failure" - CONTINUE_AS_NEW = "continue_as_new" - CANCELLATION = "cancellation" +class WorkflowExitType(IntEnum): + SUCCESS = 0 + FAILURE = 1 + CONTINUE_AS_NEW = 2 + CANCELLATION = 3 @dataclass @@ -17,9 +17,9 @@ class WorkflowInput: exit_type: WorkflowExitType -class OnWorkflowExitAction(StrEnum): - CONTINUE = "continue" - ABORT_WITH_COMPENSATION = "abort_with_compensation" +class OnWorkflowExitAction(IntEnum): + CONTINUE = 0 + ABORT_WITH_COMPENSATION = 1 @dataclass diff --git a/message_passing/message_handler_waiting_compensation_cleanup/workflows.py b/message_passing/message_handler_waiting_compensation_cleanup/workflows.py index 0adafbc..aac04b0 100644 --- a/message_passing/message_handler_waiting_compensation_cleanup/workflows.py +++ b/message_passing/message_handler_waiting_compensation_cleanup/workflows.py @@ -60,7 +60,7 @@ async def my_update(self, input: UpdateInput) -> str: update_task = asyncio.Task(self._my_update()) # 👉 Always use `workflow.wait` instead of `asyncio.wait` in Workflow # code: asyncio's version is non-deterministic. - first_completed, _ = await workflow.wait( + first_completed, _ = await workflow.wait( # type: ignore [update_task, self.workflow_exit_exception], return_when=asyncio.FIRST_COMPLETED, ) From 9192133738168cfa97c1cc7ff84cae7efa703544 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 3 Oct 2024 12:10:01 -0400 Subject: [PATCH 03/10] Improvements from code review --- .../__init__.py | 4 +- .../starter.py | 40 +++++++++---------- .../workflows.py | 7 +++- 3 files changed, 25 insertions(+), 26 deletions(-) diff --git a/message_passing/message_handler_waiting_compensation_cleanup/__init__.py b/message_passing/message_handler_waiting_compensation_cleanup/__init__.py index a27233c..b3a49b4 100644 --- a/message_passing/message_handler_waiting_compensation_cleanup/__init__.py +++ b/message_passing/message_handler_waiting_compensation_cleanup/__init__.py @@ -8,8 +8,8 @@ class WorkflowExitType(IntEnum): SUCCESS = 0 FAILURE = 1 - CONTINUE_AS_NEW = 2 - CANCELLATION = 3 + CANCELLATION = 2 + CONTINUE_AS_NEW = 3 @dataclass diff --git a/message_passing/message_handler_waiting_compensation_cleanup/starter.py b/message_passing/message_handler_waiting_compensation_cleanup/starter.py index e436881..ebf4803 100644 --- a/message_passing/message_handler_waiting_compensation_cleanup/starter.py +++ b/message_passing/message_handler_waiting_compensation_cleanup/starter.py @@ -1,7 +1,6 @@ import asyncio -from contextlib import contextmanager -from temporalio import client, common +from temporalio import client from message_passing.message_handler_waiting_compensation_cleanup import ( TASK_QUEUE, @@ -23,7 +22,6 @@ async def starter(exit_type: WorkflowExitType, update_action: OnWorkflowExitActi WorkflowInput(exit_type=exit_type), id=WORKFLOW_ID, task_queue=TASK_QUEUE, - id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, ) await _check_run(wf_handle, exit_type, update_action) @@ -33,45 +31,43 @@ async def _check_run( exit_type: WorkflowExitType, update_action: OnWorkflowExitAction, ): - with catch("starting update"): + try: up_handle = await wf_handle.start_update( MyWorkflow.my_update, UpdateInput(on_premature_workflow_exit=update_action), wait_for_stage=client.WorkflowUpdateStage.ACCEPTED, ) + except Exception as e: + print( + f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}" + ) if exit_type == WorkflowExitType.CANCELLATION: await wf_handle.cancel() - with catch("waiting for update result"): + try: await up_handle.result() print(" 🟢 caller received update result") + except Exception as e: + print( + f" 🔴 caught exception while waiting for update result: {e}: {e.__cause__ or ''}" + ) if exit_type == WorkflowExitType.CONTINUE_AS_NEW: await _check_run(wf_handle, WorkflowExitType.SUCCESS, update_action) else: - with catch("waiting for workflow result"): + try: await wf_handle.result() print(" 🟢 caller received workflow result") - - -@contextmanager -def catch(operation: str): - try: - yield - except Exception as e: - cause = getattr(e, "cause", None) - print(f" 🔴 caught exception while {operation}: {e}: {cause or ''}") + except Exception as e: + print( + f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}" + ) async def main(): - for exit_type in [ - WorkflowExitType.SUCCESS, - WorkflowExitType.FAILURE, - WorkflowExitType.CANCELLATION, - WorkflowExitType.CONTINUE_AS_NEW, - ]: - print(f"\n\nworkflow exit type: {exit_type}") + for exit_type in WorkflowExitType: + print(f"\n\nworkflow exit type: {exit_type.name}") for update_action in [ OnWorkflowExitAction.CONTINUE, OnWorkflowExitAction.ABORT_WITH_COMPENSATION, diff --git a/message_passing/message_handler_waiting_compensation_cleanup/workflows.py b/message_passing/message_handler_waiting_compensation_cleanup/workflows.py index aac04b0..528575b 100644 --- a/message_passing/message_handler_waiting_compensation_cleanup/workflows.py +++ b/message_passing/message_handler_waiting_compensation_cleanup/workflows.py @@ -36,6 +36,9 @@ async def run(self, input: WorkflowInput) -> str: # handlers to finish in a `finally` block. The reason is that other # exception types will cause a Workflow Task failure, in which case # we do *not* want to wait for message handlers to finish. + + # self._run would contain your actual workflow business logic. In + # this sample, its actual implementation contains nothing relevant. result = await self._run(input) await workflow.wait_condition(workflow.all_handlers_finished) return result @@ -57,7 +60,7 @@ async def my_update(self, input: UpdateInput) -> str: WorkflowUpdateFailedError. """ # Coroutines must be wrapped in tasks in order to use workflow.wait. - update_task = asyncio.Task(self._my_update()) + update_task = asyncio.create_task(self._my_update()) # 👉 Always use `workflow.wait` instead of `asyncio.wait` in Workflow # code: asyncio's version is non-deterministic. first_completed, _ = await workflow.wait( # type: ignore @@ -100,7 +103,7 @@ async def _my_update(self) -> str: async def _my_update_compensation_and_cleanup(self): workflow.logger.info( - "performing update handler compensation and cleanup operations" + "Performing update handler compensation and cleanup operations" ) async def _run(self, input: WorkflowInput) -> str: From 60384af701a8839db724f189c256cca1ff3adb21 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 3 Oct 2024 19:39:05 -0400 Subject: [PATCH 04/10] Rewrite / cleanup --- .../activities.py | 8 - .../workflows.py | 124 -------------- .../README.md | 0 .../__init__.py | 10 -- .../activities.py | 13 ++ .../starter.py | 41 ++--- .../worker.py | 18 +- .../workflows.py | 160 ++++++++++++++++++ .../workflow_test.py | 82 +++++++++ 9 files changed, 284 insertions(+), 172 deletions(-) delete mode 100644 message_passing/message_handler_waiting_compensation_cleanup/activities.py delete mode 100644 message_passing/message_handler_waiting_compensation_cleanup/workflows.py rename message_passing/{message_handler_waiting_compensation_cleanup => waiting_for_handlers_and_compensation}/README.md (100%) rename message_passing/{message_handler_waiting_compensation_cleanup => waiting_for_handlers_and_compensation}/__init__.py (63%) create mode 100644 message_passing/waiting_for_handlers_and_compensation/activities.py rename message_passing/{message_handler_waiting_compensation_cleanup => waiting_for_handlers_and_compensation}/starter.py (56%) rename message_passing/{message_handler_waiting_compensation_cleanup => waiting_for_handlers_and_compensation}/worker.py (53%) create mode 100644 message_passing/waiting_for_handlers_and_compensation/workflows.py create mode 100644 tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py diff --git a/message_passing/message_handler_waiting_compensation_cleanup/activities.py b/message_passing/message_handler_waiting_compensation_cleanup/activities.py deleted file mode 100644 index 1a4a16d..0000000 --- a/message_passing/message_handler_waiting_compensation_cleanup/activities.py +++ /dev/null @@ -1,8 +0,0 @@ -import asyncio - -from temporalio import activity - - -@activity.defn -async def my_activity(): - await asyncio.sleep(1) diff --git a/message_passing/message_handler_waiting_compensation_cleanup/workflows.py b/message_passing/message_handler_waiting_compensation_cleanup/workflows.py deleted file mode 100644 index 528575b..0000000 --- a/message_passing/message_handler_waiting_compensation_cleanup/workflows.py +++ /dev/null @@ -1,124 +0,0 @@ -import asyncio -from datetime import timedelta - -from temporalio import exceptions, workflow - -from message_passing.message_handler_waiting_compensation_cleanup import ( - OnWorkflowExitAction, - UpdateInput, - WorkflowExitType, - WorkflowInput, -) -from message_passing.message_handler_waiting_compensation_cleanup.activities import ( - my_activity, -) - - -@workflow.defn -class MyWorkflow: - """ - This Workflow upholds the following recommended practices: - - 1. The main workflow method ensures that all signal and update handlers are - finished before a successful return, and on failure, cancellation, and - continue-as-new. - 2. The update handler performs any necessary compensation/cleanup when the - workflow is cancelled, fails, or continues-as-new. - """ - - def __init__(self) -> None: - self.workflow_exit_exception: asyncio.Future[BaseException] = asyncio.Future() - - @workflow.run - async def run(self, input: WorkflowInput) -> str: - try: - # 👉 Use this `try...except` style, instead of waiting for message - # handlers to finish in a `finally` block. The reason is that other - # exception types will cause a Workflow Task failure, in which case - # we do *not* want to wait for message handlers to finish. - - # self._run would contain your actual workflow business logic. In - # this sample, its actual implementation contains nothing relevant. - result = await self._run(input) - await workflow.wait_condition(workflow.all_handlers_finished) - return result - except ( - asyncio.CancelledError, - workflow.ContinueAsNewError, - exceptions.FailureError, - ) as exc: - self.workflow_exit_exception.set_result(exc) - await workflow.wait_condition(workflow.all_handlers_finished) - raise exc - - @workflow.update - async def my_update(self, input: UpdateInput) -> str: - """ - This update handler demonstrates how to handle the situation where the - main Workflow method exits prematurely. In that case we perform - compensation/cleanup, and fail the Update. The Update caller will get a - WorkflowUpdateFailedError. - """ - # Coroutines must be wrapped in tasks in order to use workflow.wait. - update_task = asyncio.create_task(self._my_update()) - # 👉 Always use `workflow.wait` instead of `asyncio.wait` in Workflow - # code: asyncio's version is non-deterministic. - first_completed, _ = await workflow.wait( # type: ignore - [update_task, self.workflow_exit_exception], - return_when=asyncio.FIRST_COMPLETED, - ) - # 👉 It's possible that the update completed and the workflow exited - # prematurely in the same tick of the event loop. If the Update has - # completed, return the Update result to the caller, whether or not the - # Workflow is exiting. - if ( - update_task in first_completed - or input.on_premature_workflow_exit == OnWorkflowExitAction.CONTINUE - ): - return await update_task - else: - await self._my_update_compensation_and_cleanup() - raise exceptions.ApplicationError( - f"The update failed because the workflow run exited: {await self.workflow_exit_exception}" - ) - - async def _my_update(self) -> str: - """ - This handler calls a slow activity, so - - (1) In the case where the workflow finishes successfully, the worker - would get an UnfinishedUpdateHandlersWarning (TMPRL1102) if the main - workflow task didn't wait for it to finish. - - (2) In the other cases (failure, cancellation, and continue-as-new), the - premature workflow exit will occur before the update is finished. - """ - # Ignore: implementation detail specific to this sample - self._update_started = True - - await workflow.execute_activity( - my_activity, start_to_close_timeout=timedelta(seconds=10) - ) - return "update-result" - - async def _my_update_compensation_and_cleanup(self): - workflow.logger.info( - "Performing update handler compensation and cleanup operations" - ) - - async def _run(self, input: WorkflowInput) -> str: - # Ignore this method unless you are interested in the implementation - # details of this sample. - - # Wait until handlers started, so that we are demonstrating that we wait for them to finish. - await workflow.wait_condition(lambda: getattr(self, "_update_started", False)) - if input.exit_type == WorkflowExitType.SUCCESS: - return "workflow-result" - elif input.exit_type == WorkflowExitType.CONTINUE_AS_NEW: - workflow.continue_as_new(WorkflowInput(exit_type=WorkflowExitType.SUCCESS)) - elif input.exit_type == WorkflowExitType.FAILURE: - raise exceptions.ApplicationError("deliberately failing workflow") - elif input.exit_type == WorkflowExitType.CANCELLATION: - # Block forever; the starter will send a workflow cancellation request. - await asyncio.Future() - raise AssertionError("unreachable") diff --git a/message_passing/message_handler_waiting_compensation_cleanup/README.md b/message_passing/waiting_for_handlers_and_compensation/README.md similarity index 100% rename from message_passing/message_handler_waiting_compensation_cleanup/README.md rename to message_passing/waiting_for_handlers_and_compensation/README.md diff --git a/message_passing/message_handler_waiting_compensation_cleanup/__init__.py b/message_passing/waiting_for_handlers_and_compensation/__init__.py similarity index 63% rename from message_passing/message_handler_waiting_compensation_cleanup/__init__.py rename to message_passing/waiting_for_handlers_and_compensation/__init__.py index b3a49b4..35eb78c 100644 --- a/message_passing/message_handler_waiting_compensation_cleanup/__init__.py +++ b/message_passing/waiting_for_handlers_and_compensation/__init__.py @@ -15,13 +15,3 @@ class WorkflowExitType(IntEnum): @dataclass class WorkflowInput: exit_type: WorkflowExitType - - -class OnWorkflowExitAction(IntEnum): - CONTINUE = 0 - ABORT_WITH_COMPENSATION = 1 - - -@dataclass -class UpdateInput: - on_premature_workflow_exit: OnWorkflowExitAction diff --git a/message_passing/waiting_for_handlers_and_compensation/activities.py b/message_passing/waiting_for_handlers_and_compensation/activities.py new file mode 100644 index 0000000..cec6372 --- /dev/null +++ b/message_passing/waiting_for_handlers_and_compensation/activities.py @@ -0,0 +1,13 @@ +import asyncio + +from temporalio import activity + + +@activity.defn +async def activity_executed_by_update_handler(): + await asyncio.sleep(1) + + +@activity.defn +async def activity_executed_by_update_handler_to_perform_compensation(): + await asyncio.sleep(1) diff --git a/message_passing/message_handler_waiting_compensation_cleanup/starter.py b/message_passing/waiting_for_handlers_and_compensation/starter.py similarity index 56% rename from message_passing/message_handler_waiting_compensation_cleanup/starter.py rename to message_passing/waiting_for_handlers_and_compensation/starter.py index ebf4803..757f357 100644 --- a/message_passing/message_handler_waiting_compensation_cleanup/starter.py +++ b/message_passing/waiting_for_handlers_and_compensation/starter.py @@ -1,46 +1,41 @@ import asyncio -from temporalio import client +from temporalio import client, common -from message_passing.message_handler_waiting_compensation_cleanup import ( +from message_passing.waiting_for_handlers_and_compensation import ( TASK_QUEUE, WORKFLOW_ID, - OnWorkflowExitAction, - UpdateInput, WorkflowExitType, WorkflowInput, ) -from message_passing.message_handler_waiting_compensation_cleanup.workflows import ( - MyWorkflow, +from message_passing.waiting_for_handlers_and_compensation.workflows import ( + WaitingForHandlersAndCompensationWorkflow, ) -async def starter(exit_type: WorkflowExitType, update_action: OnWorkflowExitAction): +async def starter(exit_type: WorkflowExitType): cl = await client.Client.connect("localhost:7233") wf_handle = await cl.start_workflow( - MyWorkflow.run, + WaitingForHandlersAndCompensationWorkflow.run, WorkflowInput(exit_type=exit_type), id=WORKFLOW_ID, task_queue=TASK_QUEUE, + id_conflict_policy=common.WorkflowIDConflictPolicy.TERMINATE_EXISTING, ) - await _check_run(wf_handle, exit_type, update_action) + await _check_run(wf_handle, exit_type) async def _check_run( wf_handle: client.WorkflowHandle, exit_type: WorkflowExitType, - update_action: OnWorkflowExitAction, ): try: up_handle = await wf_handle.start_update( - MyWorkflow.my_update, - UpdateInput(on_premature_workflow_exit=update_action), + WaitingForHandlersAndCompensationWorkflow.my_update, wait_for_stage=client.WorkflowUpdateStage.ACCEPTED, ) except Exception as e: - print( - f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}" - ) + print(f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}") if exit_type == WorkflowExitType.CANCELLATION: await wf_handle.cancel() @@ -54,7 +49,7 @@ async def _check_run( ) if exit_type == WorkflowExitType.CONTINUE_AS_NEW: - await _check_run(wf_handle, WorkflowExitType.SUCCESS, update_action) + await _check_run(wf_handle, WorkflowExitType.SUCCESS) else: try: await wf_handle.result() @@ -66,14 +61,14 @@ async def _check_run( async def main(): - for exit_type in WorkflowExitType: + for exit_type in [ + WorkflowExitType.SUCCESS, + WorkflowExitType.FAILURE, + WorkflowExitType.CANCELLATION, + WorkflowExitType.CONTINUE_AS_NEW, + ]: print(f"\n\nworkflow exit type: {exit_type.name}") - for update_action in [ - OnWorkflowExitAction.CONTINUE, - OnWorkflowExitAction.ABORT_WITH_COMPENSATION, - ]: - print(f" update action on premature workflow exit: {update_action}") - await starter(exit_type, update_action) + await starter(exit_type) if __name__ == "__main__": diff --git a/message_passing/message_handler_waiting_compensation_cleanup/worker.py b/message_passing/waiting_for_handlers_and_compensation/worker.py similarity index 53% rename from message_passing/message_handler_waiting_compensation_cleanup/worker.py rename to message_passing/waiting_for_handlers_and_compensation/worker.py index c53cc44..6402069 100644 --- a/message_passing/message_handler_waiting_compensation_cleanup/worker.py +++ b/message_passing/waiting_for_handlers_and_compensation/worker.py @@ -4,12 +4,13 @@ from temporalio.client import Client from temporalio.worker import Worker -from message_passing.message_handler_waiting_compensation_cleanup import TASK_QUEUE -from message_passing.message_handler_waiting_compensation_cleanup.activities import ( - my_activity, +from message_passing.waiting_for_handlers_and_compensation import TASK_QUEUE +from message_passing.waiting_for_handlers_and_compensation.activities import ( + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, ) -from message_passing.message_handler_waiting_compensation_cleanup.workflows import ( - MyWorkflow, +from message_passing.waiting_for_handlers_and_compensation.workflows import ( + WaitingForHandlersAndCompensationWorkflow, ) interrupt_event = asyncio.Event() @@ -23,8 +24,11 @@ async def main(): async with Worker( client, task_queue=TASK_QUEUE, - workflows=[MyWorkflow], - activities=[my_activity], + workflows=[WaitingForHandlersAndCompensationWorkflow], + activities=[ + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, + ], ): logging.info("Worker started, ctrl+c to exit") await interrupt_event.wait() diff --git a/message_passing/waiting_for_handlers_and_compensation/workflows.py b/message_passing/waiting_for_handlers_and_compensation/workflows.py new file mode 100644 index 0000000..8b37989 --- /dev/null +++ b/message_passing/waiting_for_handlers_and_compensation/workflows.py @@ -0,0 +1,160 @@ +import asyncio +from datetime import timedelta +from typing import cast + +from temporalio import exceptions, workflow + +from message_passing.waiting_for_handlers_and_compensation import ( + WorkflowExitType, + WorkflowInput, +) +from message_passing.waiting_for_handlers_and_compensation.activities import ( + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, +) + + +@workflow.defn +class WaitingForHandlersAndCompensationWorkflow: + """ + This Workflow demonstrates how to wait for message handlers to finish and + perform compensation/cleanup: + + 1. It ensures that all signal and update handlers have finished before a + successful return, and on failure, cancellation, and continue-as-new. + 2. The update handler performs any necessary compensation/cleanup when the + workflow is cancelled, fails, or continues-as-new. + """ + + def __init__(self) -> None: + # 👉 If the workflow exits prematurely, this future will be completed + # with the associated exception as its value. Message handlers can then + # "race" this future against a task performing the message handler's own + # application logic; if this future completes before the message handler + # task then the handler should abort and perform compensation. + self.workflow_exit = asyncio.Future() + + @workflow.run + async def run(self, input: WorkflowInput) -> str: + try: + # 👉 Use this `try...except` style, instead of waiting for message + # handlers to finish in a `finally` block. The reason is that some + # exception types cause a workflow task failure as opposed to + # workflow exit, in which case we do *not* want to wait for message + # handlers to finish. + + # 👉 self._run contains your actual application logic. This is + # implemented in a separate method in order to separate + # "platform-level" concerns (waiting for handlers to finish and + # ensuring that compensation is performed when appropriate) from + # application logic. In this sample, its actual implementation is + # below but contains nothing relevant. + result = await self._run(input) + self.workflow_exit.set_result(None) + await workflow.wait_condition(workflow.all_handlers_finished) + return result + # 👉 Catch BaseException since asyncio.CancelledError does not inherit + # from Exception. + except BaseException as e: + if is_workflow_exit_exception(e): + self.workflow_exit.set_exception(e) + await workflow.wait_condition(workflow.all_handlers_finished) + raise + + @workflow.update + async def my_update(self) -> str: + """ + An update handler that handles exceptions in itself and in the main + workflow method. + + It ensures that: + - Compensation/cleanup is always performed when appropriate + - The update caller gets the update result, or WorkflowUpdateFailedError + """ + # 👉 As with the main workflow method, the update application logic is + # implemented in a separate method in order to separate "platform-level" + # error-handling and compensation concerns from application logic. Note + # that coroutines must be wrapped in tasks in order to use + # workflow.wait. + update_task = asyncio.create_task(self._my_update()) + + # 👉 "Race" the workflow_exit future against the handler's own application + # logic. Always use `workflow.wait` instead of `asyncio.wait` in + # Workflow code: asyncio's version is non-deterministic. + await workflow.wait( + [update_task, self.workflow_exit], return_when=asyncio.FIRST_EXCEPTION + ) + try: + if update_task.done(): + # 👉 The update has finished (whether successfully or not). + # Regardless of whether the main workflow method is about to + # exit or not, the update caller should receive a response + # informing them of the outcome of the update. So return the + # result, or raise the exception that caused the update handler + # to exit. + return await update_task + else: + # 👉 The main workflow method exited prematurely due to an + # error, and this happened before the update finished. Fail the + # update with the workflow exception as cause. + raise exceptions.ApplicationError( + "The update failed because the workflow run exited" + ) from cast(BaseException, self.workflow_exit.exception()) + # 👉 Catch BaseException since asyncio.CancelledError does not inherit + # from Exception. + except BaseException as e: + if is_workflow_exit_exception(e): + try: + await self.my_update_compensation() + except BaseException as e: + raise exceptions.ApplicationError( + "Update compensation failed" + ) from e + raise + + async def my_update_compensation(self): + await workflow.execute_activity( + activity_executed_by_update_handler_to_perform_compensation, + start_to_close_timeout=timedelta(seconds=10), + ) + + # The following two methods are placeholders for the actual application + # logic that you would perform in your main workflow method or update + # handler. Their implementation can be ignored. + + async def _my_update(self) -> str: + # Ignore this method unless you are interested in the implementation + # details of this sample. + self._update_started = True + await workflow.execute_activity( + activity_executed_by_update_handler, + start_to_close_timeout=timedelta(seconds=10), + ) + return "update-result" + + async def _run(self, input: WorkflowInput) -> str: + # Ignore this method unless you are interested in the implementation + # details of this sample. + + # Wait until handlers have started, so that we are demonstrating that we + # wait for them to finish. + await workflow.wait_condition(lambda: getattr(self, "_update_started", False)) + if input.exit_type == WorkflowExitType.SUCCESS: + return "workflow-result" + elif input.exit_type == WorkflowExitType.CONTINUE_AS_NEW: + workflow.continue_as_new(WorkflowInput(exit_type=WorkflowExitType.SUCCESS)) + elif input.exit_type == WorkflowExitType.FAILURE: + raise exceptions.ApplicationError("deliberately failing workflow") + elif input.exit_type == WorkflowExitType.CANCELLATION: + # Block forever; the starter will send a workflow cancellation request. + await asyncio.Future() + raise AssertionError("unreachable") + + +def is_workflow_exit_exception(e: BaseException) -> bool: + # 👉 If you have set additional failure_exception_types you should also + # check for these here. + return isinstance( + e, + (asyncio.CancelledError, workflow.ContinueAsNewError, exceptions.FailureError), + ) diff --git a/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py b/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py new file mode 100644 index 0000000..db92dbe --- /dev/null +++ b/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py @@ -0,0 +1,82 @@ +import uuid + +import pytest +from temporalio.client import Client, WorkflowHandle, WorkflowUpdateStage +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from message_passing.waiting_for_handlers_and_compensation import ( + WorkflowExitType, + WorkflowInput, +) +from message_passing.waiting_for_handlers_and_compensation.starter import ( + TASK_QUEUE, +) +from message_passing.waiting_for_handlers_and_compensation.workflows import ( + WaitingForHandlersAndCompensationWorkflow, +) + + +async def test_waiting_for_handlers_and_compensation( + client: Client, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + ) + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[WaitingForHandlersAndCompensationWorkflow], + ): + await starter( + WorkflowExitType.SUCCESS, + client, + ) + + +async def starter(exit_type: WorkflowExitType, cl: Client): + wf_handle = await cl.start_workflow( + WaitingForHandlersAndCompensationWorkflow.run, + WorkflowInput(exit_type=exit_type), + id=str(uuid.uuid4()), + task_queue=TASK_QUEUE, + ) + await _check_run(wf_handle, exit_type) + + +async def _check_run( + wf_handle: WorkflowHandle, + exit_type: WorkflowExitType, +): + try: + up_handle = await wf_handle.start_update( + WaitingForHandlersAndCompensationWorkflow.my_update, + wait_for_stage=WorkflowUpdateStage.ACCEPTED, + ) + except Exception as e: + print( + f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}" + ) + + if exit_type == WorkflowExitType.CANCELLATION: + await wf_handle.cancel() + + try: + await up_handle.result() + print(" 🟢 caller received update result") + except Exception as e: + print( + f" 🔴 caught exception while waiting for update result: {e}: {e.__cause__ or ''}" + ) + + if exit_type == WorkflowExitType.CONTINUE_AS_NEW: + await _check_run(wf_handle, WorkflowExitType.SUCCESS) + else: + try: + await wf_handle.result() + print(" 🟢 caller received workflow result") + except Exception as e: + print( + f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}" + ) From 9a30e77baf4115976f6babcc2108dbcd1a4c6941 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 4 Oct 2024 11:49:13 -0400 Subject: [PATCH 05/10] Test --- .../workflows.py | 16 ++- .../workflow_test.py | 114 ++++++++++-------- 2 files changed, 75 insertions(+), 55 deletions(-) diff --git a/message_passing/waiting_for_handlers_and_compensation/workflows.py b/message_passing/waiting_for_handlers_and_compensation/workflows.py index 8b37989..3cd6f2b 100644 --- a/message_passing/waiting_for_handlers_and_compensation/workflows.py +++ b/message_passing/waiting_for_handlers_and_compensation/workflows.py @@ -33,6 +33,7 @@ def __init__(self) -> None: # application logic; if this future completes before the message handler # task then the handler should abort and perform compensation. self.workflow_exit = asyncio.Future() + self._update_compensation_done = False @workflow.run async def run(self, input: WorkflowInput) -> str: @@ -64,12 +65,12 @@ async def run(self, input: WorkflowInput) -> str: @workflow.update async def my_update(self) -> str: """ - An update handler that handles exceptions in itself and in the main - workflow method. + An update handler that handles exceptions raised in its own execution + and in that of the main workflow method. - It ensures that: - - Compensation/cleanup is always performed when appropriate - - The update caller gets the update result, or WorkflowUpdateFailedError + It ensures that: - Compensation/cleanup is always performed when + appropriate - The update caller gets the update result, or + WorkflowUpdateFailedError """ # 👉 As with the main workflow method, the update application logic is # implemented in a separate method in order to separate "platform-level" @@ -117,6 +118,11 @@ async def my_update_compensation(self): activity_executed_by_update_handler_to_perform_compensation, start_to_close_timeout=timedelta(seconds=10), ) + self._update_compensation_done = True + + @workflow.query + def update_compensation_done(self) -> bool: + return self._update_compensation_done # The following two methods are placeholders for the actual application # logic that you would perform in your main workflow method or update diff --git a/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py b/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py index db92dbe..1dfe624 100644 --- a/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py +++ b/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py @@ -1,82 +1,96 @@ import uuid +from enum import Enum import pytest -from temporalio.client import Client, WorkflowHandle, WorkflowUpdateStage +from temporalio import client, worker from temporalio.testing import WorkflowEnvironment -from temporalio.worker import Worker from message_passing.waiting_for_handlers_and_compensation import ( WorkflowExitType, WorkflowInput, ) -from message_passing.waiting_for_handlers_and_compensation.starter import ( - TASK_QUEUE, +from message_passing.waiting_for_handlers_and_compensation.activities import ( + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, ) +from message_passing.waiting_for_handlers_and_compensation.starter import TASK_QUEUE from message_passing.waiting_for_handlers_and_compensation.workflows import ( WaitingForHandlersAndCompensationWorkflow, ) +class UpdateExpect(Enum): + SUCCESS = "success" + FAILURE = "failure" + + +class WorkflowExpect(Enum): + SUCCESS = "success" + FAILURE = "failure" + + +@pytest.mark.parametrize( + ["exit_type_name", "update_expect", "workflow_expect"], + [ + (WorkflowExitType.SUCCESS.name, UpdateExpect.SUCCESS, WorkflowExpect.SUCCESS), + (WorkflowExitType.FAILURE.name, UpdateExpect.FAILURE, WorkflowExpect.FAILURE), + ( + WorkflowExitType.CANCELLATION.name, + UpdateExpect.FAILURE, + WorkflowExpect.FAILURE, + ), + ], +) async def test_waiting_for_handlers_and_compensation( - client: Client, env: WorkflowEnvironment + env: WorkflowEnvironment, + exit_type_name: str, + update_expect: UpdateExpect, + workflow_expect: WorkflowExpect, ): + [exit_type] = [t for t in WorkflowExitType if t.name == exit_type_name] if env.supports_time_skipping: pytest.skip( "Java test server: https://github.com/temporalio/sdk-java/issues/1903" ) - async with Worker( - client, + async with worker.Worker( + env.client, task_queue=TASK_QUEUE, workflows=[WaitingForHandlersAndCompensationWorkflow], + activities=[ + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, + ], ): - await starter( - WorkflowExitType.SUCCESS, - client, + wf_handle = await env.client.start_workflow( + WaitingForHandlersAndCompensationWorkflow.run, + WorkflowInput(exit_type=exit_type), + id=str(uuid.uuid4()), + task_queue=TASK_QUEUE, ) - - -async def starter(exit_type: WorkflowExitType, cl: Client): - wf_handle = await cl.start_workflow( - WaitingForHandlersAndCompensationWorkflow.run, - WorkflowInput(exit_type=exit_type), - id=str(uuid.uuid4()), - task_queue=TASK_QUEUE, - ) - await _check_run(wf_handle, exit_type) - - -async def _check_run( - wf_handle: WorkflowHandle, - exit_type: WorkflowExitType, -): - try: up_handle = await wf_handle.start_update( WaitingForHandlersAndCompensationWorkflow.my_update, - wait_for_stage=WorkflowUpdateStage.ACCEPTED, - ) - except Exception as e: - print( - f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}" + wait_for_stage=client.WorkflowUpdateStage.ACCEPTED, ) - if exit_type == WorkflowExitType.CANCELLATION: - await wf_handle.cancel() + if exit_type == WorkflowExitType.CANCELLATION: + await wf_handle.cancel() - try: - await up_handle.result() - print(" 🟢 caller received update result") - except Exception as e: - print( - f" 🔴 caught exception while waiting for update result: {e}: {e.__cause__ or ''}" - ) + if update_expect == UpdateExpect.SUCCESS: + await up_handle.result() + assert not ( + await wf_handle.query( + WaitingForHandlersAndCompensationWorkflow.update_compensation_done + ) + ) + else: + with pytest.raises(client.WorkflowUpdateFailedError): + await up_handle.result() + assert await wf_handle.query( + WaitingForHandlersAndCompensationWorkflow.update_compensation_done + ) - if exit_type == WorkflowExitType.CONTINUE_AS_NEW: - await _check_run(wf_handle, WorkflowExitType.SUCCESS) - else: - try: + if workflow_expect == WorkflowExpect.SUCCESS: await wf_handle.result() - print(" 🟢 caller received workflow result") - except Exception as e: - print( - f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}" - ) + else: + with pytest.raises(client.WorkflowFailureError): + await wf_handle.result() From ef9bb60f4998ca5d7fa2571ac09483813a1fdc3e Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 4 Oct 2024 11:59:02 -0400 Subject: [PATCH 06/10] Satisfy mypy --- .../waiting_for_handlers_and_compensation/workflows.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/message_passing/waiting_for_handlers_and_compensation/workflows.py b/message_passing/waiting_for_handlers_and_compensation/workflows.py index 3cd6f2b..8b1e80a 100644 --- a/message_passing/waiting_for_handlers_and_compensation/workflows.py +++ b/message_passing/waiting_for_handlers_and_compensation/workflows.py @@ -32,7 +32,7 @@ def __init__(self) -> None: # "race" this future against a task performing the message handler's own # application logic; if this future completes before the message handler # task then the handler should abort and perform compensation. - self.workflow_exit = asyncio.Future() + self.workflow_exit: asyncio.Future[None] = asyncio.Future() self._update_compensation_done = False @workflow.run @@ -82,7 +82,7 @@ async def my_update(self) -> str: # 👉 "Race" the workflow_exit future against the handler's own application # logic. Always use `workflow.wait` instead of `asyncio.wait` in # Workflow code: asyncio's version is non-deterministic. - await workflow.wait( + await workflow.wait( # type: ignore [update_task, self.workflow_exit], return_when=asyncio.FIRST_EXCEPTION ) try: From c68fc7857890c0c8526854c9c9cea78af00c3f46 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 4 Oct 2024 12:02:29 -0400 Subject: [PATCH 07/10] Stop using ad-hoc attributes --- .../waiting_for_handlers_and_compensation/workflows.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/message_passing/waiting_for_handlers_and_compensation/workflows.py b/message_passing/waiting_for_handlers_and_compensation/workflows.py index 8b1e80a..4a63e1a 100644 --- a/message_passing/waiting_for_handlers_and_compensation/workflows.py +++ b/message_passing/waiting_for_handlers_and_compensation/workflows.py @@ -33,6 +33,10 @@ def __init__(self) -> None: # application logic; if this future completes before the message handler # task then the handler should abort and perform compensation. self.workflow_exit: asyncio.Future[None] = asyncio.Future() + + # The following two attributes are implementation detail of this sample + # and can be ignored + self._update_started = False self._update_compensation_done = False @workflow.run @@ -144,7 +148,7 @@ async def _run(self, input: WorkflowInput) -> str: # Wait until handlers have started, so that we are demonstrating that we # wait for them to finish. - await workflow.wait_condition(lambda: getattr(self, "_update_started", False)) + await workflow.wait_condition(lambda: self._update_started) if input.exit_type == WorkflowExitType.SUCCESS: return "workflow-result" elif input.exit_type == WorkflowExitType.CONTINUE_AS_NEW: From 1ea0905abb0be37edf301cb4cdf7bfef6698f1ee Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 4 Oct 2024 12:08:18 -0400 Subject: [PATCH 08/10] Remove Continue-As-New from the sample --- .../README.md | 11 +++++----- .../__init__.py | 1 - .../starter.py | 22 +++++++++---------- .../workflows.py | 11 +++------- 4 files changed, 19 insertions(+), 26 deletions(-) diff --git a/message_passing/waiting_for_handlers_and_compensation/README.md b/message_passing/waiting_for_handlers_and_compensation/README.md index 0bbb1a9..dfbac6e 100644 --- a/message_passing/waiting_for_handlers_and_compensation/README.md +++ b/message_passing/waiting_for_handlers_and_compensation/README.md @@ -1,11 +1,12 @@ # Waiting for message handlers, and performing compensation and cleanup in message handlers -This sample demonstrates the following recommended practices: +This sample demonstrates how to do the following: + +1. Ensure that all update/signal handlers are finished before a successful + workflow return, and on workflow cancellation and failure. +2. Perform compensation/cleanup in an update handler when the workflow is + cancelled or fails. -1. Ensuring that all signal and update handlers are finished before a successful - workflow return, and on workflow failure, cancellation, and continue-as-new. -2. Performing necessary compensation/cleanup in an update handler when the - workflow is cancelled, fails, or continues-as-new. To run, open two terminals and `cd` to this directory in them. diff --git a/message_passing/waiting_for_handlers_and_compensation/__init__.py b/message_passing/waiting_for_handlers_and_compensation/__init__.py index 35eb78c..d537520 100644 --- a/message_passing/waiting_for_handlers_and_compensation/__init__.py +++ b/message_passing/waiting_for_handlers_and_compensation/__init__.py @@ -9,7 +9,6 @@ class WorkflowExitType(IntEnum): SUCCESS = 0 FAILURE = 1 CANCELLATION = 2 - CONTINUE_AS_NEW = 3 @dataclass diff --git a/message_passing/waiting_for_handlers_and_compensation/starter.py b/message_passing/waiting_for_handlers_and_compensation/starter.py index 757f357..fd57c4a 100644 --- a/message_passing/waiting_for_handlers_and_compensation/starter.py +++ b/message_passing/waiting_for_handlers_and_compensation/starter.py @@ -35,7 +35,9 @@ async def _check_run( wait_for_stage=client.WorkflowUpdateStage.ACCEPTED, ) except Exception as e: - print(f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}") + print( + f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}" + ) if exit_type == WorkflowExitType.CANCELLATION: await wf_handle.cancel() @@ -48,16 +50,13 @@ async def _check_run( f" 🔴 caught exception while waiting for update result: {e}: {e.__cause__ or ''}" ) - if exit_type == WorkflowExitType.CONTINUE_AS_NEW: - await _check_run(wf_handle, WorkflowExitType.SUCCESS) - else: - try: - await wf_handle.result() - print(" 🟢 caller received workflow result") - except Exception as e: - print( - f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}" - ) + try: + await wf_handle.result() + print(" 🟢 caller received workflow result") + except Exception as e: + print( + f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}" + ) async def main(): @@ -65,7 +64,6 @@ async def main(): WorkflowExitType.SUCCESS, WorkflowExitType.FAILURE, WorkflowExitType.CANCELLATION, - WorkflowExitType.CONTINUE_AS_NEW, ]: print(f"\n\nworkflow exit type: {exit_type.name}") await starter(exit_type) diff --git a/message_passing/waiting_for_handlers_and_compensation/workflows.py b/message_passing/waiting_for_handlers_and_compensation/workflows.py index 4a63e1a..d67f7d1 100644 --- a/message_passing/waiting_for_handlers_and_compensation/workflows.py +++ b/message_passing/waiting_for_handlers_and_compensation/workflows.py @@ -21,9 +21,9 @@ class WaitingForHandlersAndCompensationWorkflow: perform compensation/cleanup: 1. It ensures that all signal and update handlers have finished before a - successful return, and on failure, cancellation, and continue-as-new. + successful return, and on failure and cancellation. 2. The update handler performs any necessary compensation/cleanup when the - workflow is cancelled, fails, or continues-as-new. + workflow is cancelled or fails. """ def __init__(self) -> None: @@ -151,8 +151,6 @@ async def _run(self, input: WorkflowInput) -> str: await workflow.wait_condition(lambda: self._update_started) if input.exit_type == WorkflowExitType.SUCCESS: return "workflow-result" - elif input.exit_type == WorkflowExitType.CONTINUE_AS_NEW: - workflow.continue_as_new(WorkflowInput(exit_type=WorkflowExitType.SUCCESS)) elif input.exit_type == WorkflowExitType.FAILURE: raise exceptions.ApplicationError("deliberately failing workflow") elif input.exit_type == WorkflowExitType.CANCELLATION: @@ -164,7 +162,4 @@ async def _run(self, input: WorkflowInput) -> str: def is_workflow_exit_exception(e: BaseException) -> bool: # 👉 If you have set additional failure_exception_types you should also # check for these here. - return isinstance( - e, - (asyncio.CancelledError, workflow.ContinueAsNewError, exceptions.FailureError), - ) + return isinstance(e, (asyncio.CancelledError, exceptions.FailureError)) From af43ea4b5deda3076c7756a2c7b618f3c8e1aef6 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 4 Oct 2024 12:40:31 -0400 Subject: [PATCH 09/10] Add workflow compensation --- .../activities.py | 5 +++++ .../worker.py | 2 ++ .../workflows.py | 21 ++++++++++++++++--- .../workflow_test.py | 10 +++++++++ 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/message_passing/waiting_for_handlers_and_compensation/activities.py b/message_passing/waiting_for_handlers_and_compensation/activities.py index cec6372..36c5a5c 100644 --- a/message_passing/waiting_for_handlers_and_compensation/activities.py +++ b/message_passing/waiting_for_handlers_and_compensation/activities.py @@ -3,6 +3,11 @@ from temporalio import activity +@activity.defn +async def activity_executed_to_perform_workflow_compensation(): + await asyncio.sleep(1) + + @activity.defn async def activity_executed_by_update_handler(): await asyncio.sleep(1) diff --git a/message_passing/waiting_for_handlers_and_compensation/worker.py b/message_passing/waiting_for_handlers_and_compensation/worker.py index 6402069..7daf768 100644 --- a/message_passing/waiting_for_handlers_and_compensation/worker.py +++ b/message_passing/waiting_for_handlers_and_compensation/worker.py @@ -8,6 +8,7 @@ from message_passing.waiting_for_handlers_and_compensation.activities import ( activity_executed_by_update_handler, activity_executed_by_update_handler_to_perform_compensation, + activity_executed_to_perform_workflow_compensation, ) from message_passing.waiting_for_handlers_and_compensation.workflows import ( WaitingForHandlersAndCompensationWorkflow, @@ -28,6 +29,7 @@ async def main(): activities=[ activity_executed_by_update_handler, activity_executed_by_update_handler_to_perform_compensation, + activity_executed_to_perform_workflow_compensation, ], ): logging.info("Worker started, ctrl+c to exit") diff --git a/message_passing/waiting_for_handlers_and_compensation/workflows.py b/message_passing/waiting_for_handlers_and_compensation/workflows.py index d67f7d1..c2dc5c0 100644 --- a/message_passing/waiting_for_handlers_and_compensation/workflows.py +++ b/message_passing/waiting_for_handlers_and_compensation/workflows.py @@ -11,6 +11,7 @@ from message_passing.waiting_for_handlers_and_compensation.activities import ( activity_executed_by_update_handler, activity_executed_by_update_handler_to_perform_compensation, + activity_executed_to_perform_workflow_compensation, ) @@ -38,6 +39,7 @@ def __init__(self) -> None: # and can be ignored self._update_started = False self._update_compensation_done = False + self._workflow_compensation_done = False @workflow.run async def run(self, input: WorkflowInput) -> str: @@ -64,8 +66,17 @@ async def run(self, input: WorkflowInput) -> str: if is_workflow_exit_exception(e): self.workflow_exit.set_exception(e) await workflow.wait_condition(workflow.all_handlers_finished) + await self.workflow_compensation() + self._workflow_compensation_done = True raise + async def workflow_compensation(self): + await workflow.execute_activity( + activity_executed_to_perform_workflow_compensation, + start_to_close_timeout=timedelta(seconds=10), + ) + self._update_compensation_done = True + @workflow.update async def my_update(self) -> str: """ @@ -124,13 +135,17 @@ async def my_update_compensation(self): ) self._update_compensation_done = True + @workflow.query + def workflow_compensation_done(self) -> bool: + return self._workflow_compensation_done + @workflow.query def update_compensation_done(self) -> bool: return self._update_compensation_done - # The following two methods are placeholders for the actual application - # logic that you would perform in your main workflow method or update - # handler. Their implementation can be ignored. + # The following methods are placeholders for the actual application logic + # that you would perform in your main workflow method or update handler. + # Their implementation can be ignored. async def _my_update(self) -> str: # Ignore this method unless you are interested in the implementation diff --git a/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py b/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py index 1dfe624..2b10d39 100644 --- a/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py +++ b/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py @@ -12,6 +12,7 @@ from message_passing.waiting_for_handlers_and_compensation.activities import ( activity_executed_by_update_handler, activity_executed_by_update_handler_to_perform_compensation, + activity_executed_to_perform_workflow_compensation, ) from message_passing.waiting_for_handlers_and_compensation.starter import TASK_QUEUE from message_passing.waiting_for_handlers_and_compensation.workflows import ( @@ -59,6 +60,7 @@ async def test_waiting_for_handlers_and_compensation( activities=[ activity_executed_by_update_handler, activity_executed_by_update_handler_to_perform_compensation, + activity_executed_to_perform_workflow_compensation, ], ): wf_handle = await env.client.start_workflow( @@ -91,6 +93,14 @@ async def test_waiting_for_handlers_and_compensation( if workflow_expect == WorkflowExpect.SUCCESS: await wf_handle.result() + assert not ( + await wf_handle.query( + WaitingForHandlersAndCompensationWorkflow.workflow_compensation_done + ) + ) else: with pytest.raises(client.WorkflowFailureError): await wf_handle.result() + assert await wf_handle.query( + WaitingForHandlersAndCompensationWorkflow.workflow_compensation_done + ) From af64650ee58d7a4d2c756189d79b7727295a74b8 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 4 Oct 2024 13:12:12 -0400 Subject: [PATCH 10/10] Update output in README now that there is no "CONTINUE" case --- .../README.md | 45 +++---------------- 1 file changed, 6 insertions(+), 39 deletions(-) diff --git a/message_passing/waiting_for_handlers_and_compensation/README.md b/message_passing/waiting_for_handlers_and_compensation/README.md index dfbac6e..08a265b 100644 --- a/message_passing/waiting_for_handlers_and_compensation/README.md +++ b/message_passing/waiting_for_handlers_and_compensation/README.md @@ -20,53 +20,20 @@ And run the workflow-starter code in the other terminal: poetry run python starter.py -Here's the output you'll see, along with some explanation: +Here's the output you'll see: ``` -workflow exit type: success - update action on premature workflow exit: continue - 👇 [Caller gets a successful update response because main workflow method waits for handlers to finish] - 🟢 caller received update result - 🟢 caller received workflow result - update action on premature workflow exit: abort_with_compensation - 👇 [Same as above: the workflow is successful for action-on-premature exit is irrelevant] +workflow exit type: SUCCESS 🟢 caller received update result 🟢 caller received workflow result -workflow exit type: failure - update action on premature workflow exit: continue - 👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to workflow failure] - 🟢 caller received update result - 🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow - update action on premature workflow exit: abort_with_compensation - 👇 [update aborts, compensates and raises => caller gets failed update result] - 🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited: deliberately failing workflow +workflow exit type: FAILURE + 🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited 🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow -workflow exit type: cancellation - update action on premature workflow exit: continue - 👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to workflow cancellation] - 🟢 caller received update result - 🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled - update action on premature workflow exit: abort_with_compensation - 👇 [update aborts, compensates and raises => caller gets failed update result] - 🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited: +workflow exit type: CANCELLATION + 🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited 🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled - - -workflow exit type: continue_as_new - update action on premature workflow exit: continue - 👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to continue-as-new] - 🟢 caller received update result - 👇 [a second update is sent to the post-CAN run, which run succeeds, hence update succeeds] - 🟢 caller received update result - 🟢 caller received workflow result - update action on premature workflow exit: abort_with_compensation - 👇 [update aborts, compensates and raises => caller gets failed update result] - 🔴 caught exception while waiting for update result: update "50cd58dc-2db7-4a70-9204-bf5922203203" not found: - 👇 [a second update is sent to the post-CAN run, which run succeeds, hence update succeeds] - 🟢 caller received update result - 🟢 caller received workflow result ``` \ No newline at end of file