diff --git a/message_passing/waiting_for_handlers_and_compensation/README.md b/message_passing/waiting_for_handlers_and_compensation/README.md new file mode 100644 index 0000000..08a265b --- /dev/null +++ b/message_passing/waiting_for_handlers_and_compensation/README.md @@ -0,0 +1,39 @@ +# Waiting for message handlers, and performing compensation and cleanup in message handlers + +This sample demonstrates how to do the following: + +1. Ensure that all update/signal handlers are finished before a successful + workflow return, and on workflow cancellation and failure. +2. Perform compensation/cleanup in an update handler when the workflow is + cancelled or fails. + + + +To run, open two terminals and `cd` to this directory in them. + +Run the worker in one terminal: + + poetry run python worker.py + +And run the workflow-starter code in the other terminal: + + poetry run python starter.py + + +Here's the output you'll see: + +``` +workflow exit type: SUCCESS + 🟢 caller received update result + 🟢 caller received workflow result + + +workflow exit type: FAILURE + 🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited + 🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow + + +workflow exit type: CANCELLATION + 🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited + 🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled +``` \ No newline at end of file diff --git a/message_passing/waiting_for_handlers_and_compensation/__init__.py b/message_passing/waiting_for_handlers_and_compensation/__init__.py new file mode 100644 index 0000000..d537520 --- /dev/null +++ b/message_passing/waiting_for_handlers_and_compensation/__init__.py @@ -0,0 +1,16 @@ +from dataclasses import dataclass +from enum import IntEnum + +TASK_QUEUE = "my-task-queue" +WORKFLOW_ID = "my-workflow-id" + + +class WorkflowExitType(IntEnum): + SUCCESS = 0 + FAILURE = 1 + CANCELLATION = 2 + + +@dataclass +class WorkflowInput: + exit_type: WorkflowExitType diff --git a/message_passing/waiting_for_handlers_and_compensation/activities.py b/message_passing/waiting_for_handlers_and_compensation/activities.py new file mode 100644 index 0000000..36c5a5c --- /dev/null +++ b/message_passing/waiting_for_handlers_and_compensation/activities.py @@ -0,0 +1,18 @@ +import asyncio + +from temporalio import activity + + +@activity.defn +async def activity_executed_to_perform_workflow_compensation(): + await asyncio.sleep(1) + + +@activity.defn +async def activity_executed_by_update_handler(): + await asyncio.sleep(1) + + +@activity.defn +async def activity_executed_by_update_handler_to_perform_compensation(): + await asyncio.sleep(1) diff --git a/message_passing/waiting_for_handlers_and_compensation/starter.py b/message_passing/waiting_for_handlers_and_compensation/starter.py new file mode 100644 index 0000000..fd57c4a --- /dev/null +++ b/message_passing/waiting_for_handlers_and_compensation/starter.py @@ -0,0 +1,73 @@ +import asyncio + +from temporalio import client, common + +from message_passing.waiting_for_handlers_and_compensation import ( + TASK_QUEUE, + WORKFLOW_ID, + WorkflowExitType, + WorkflowInput, +) +from message_passing.waiting_for_handlers_and_compensation.workflows import ( + WaitingForHandlersAndCompensationWorkflow, +) + + +async def starter(exit_type: WorkflowExitType): + cl = await client.Client.connect("localhost:7233") + wf_handle = await cl.start_workflow( + WaitingForHandlersAndCompensationWorkflow.run, + WorkflowInput(exit_type=exit_type), + id=WORKFLOW_ID, + task_queue=TASK_QUEUE, + id_conflict_policy=common.WorkflowIDConflictPolicy.TERMINATE_EXISTING, + ) + await _check_run(wf_handle, exit_type) + + +async def _check_run( + wf_handle: client.WorkflowHandle, + exit_type: WorkflowExitType, +): + try: + up_handle = await wf_handle.start_update( + WaitingForHandlersAndCompensationWorkflow.my_update, + wait_for_stage=client.WorkflowUpdateStage.ACCEPTED, + ) + except Exception as e: + print( + f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}" + ) + + if exit_type == WorkflowExitType.CANCELLATION: + await wf_handle.cancel() + + try: + await up_handle.result() + print(" 🟢 caller received update result") + except Exception as e: + print( + f" 🔴 caught exception while waiting for update result: {e}: {e.__cause__ or ''}" + ) + + try: + await wf_handle.result() + print(" 🟢 caller received workflow result") + except Exception as e: + print( + f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}" + ) + + +async def main(): + for exit_type in [ + WorkflowExitType.SUCCESS, + WorkflowExitType.FAILURE, + WorkflowExitType.CANCELLATION, + ]: + print(f"\n\nworkflow exit type: {exit_type.name}") + await starter(exit_type) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/message_passing/waiting_for_handlers_and_compensation/worker.py b/message_passing/waiting_for_handlers_and_compensation/worker.py new file mode 100644 index 0000000..7daf768 --- /dev/null +++ b/message_passing/waiting_for_handlers_and_compensation/worker.py @@ -0,0 +1,46 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from message_passing.waiting_for_handlers_and_compensation import TASK_QUEUE +from message_passing.waiting_for_handlers_and_compensation.activities import ( + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, + activity_executed_to_perform_workflow_compensation, +) +from message_passing.waiting_for_handlers_and_compensation.workflows import ( + WaitingForHandlersAndCompensationWorkflow, +) + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + + client = await Client.connect("localhost:7233") + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[WaitingForHandlersAndCompensationWorkflow], + activities=[ + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, + activity_executed_to_perform_workflow_compensation, + ], + ): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/message_passing/waiting_for_handlers_and_compensation/workflows.py b/message_passing/waiting_for_handlers_and_compensation/workflows.py new file mode 100644 index 0000000..c2dc5c0 --- /dev/null +++ b/message_passing/waiting_for_handlers_and_compensation/workflows.py @@ -0,0 +1,180 @@ +import asyncio +from datetime import timedelta +from typing import cast + +from temporalio import exceptions, workflow + +from message_passing.waiting_for_handlers_and_compensation import ( + WorkflowExitType, + WorkflowInput, +) +from message_passing.waiting_for_handlers_and_compensation.activities import ( + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, + activity_executed_to_perform_workflow_compensation, +) + + +@workflow.defn +class WaitingForHandlersAndCompensationWorkflow: + """ + This Workflow demonstrates how to wait for message handlers to finish and + perform compensation/cleanup: + + 1. It ensures that all signal and update handlers have finished before a + successful return, and on failure and cancellation. + 2. The update handler performs any necessary compensation/cleanup when the + workflow is cancelled or fails. + """ + + def __init__(self) -> None: + # 👉 If the workflow exits prematurely, this future will be completed + # with the associated exception as its value. Message handlers can then + # "race" this future against a task performing the message handler's own + # application logic; if this future completes before the message handler + # task then the handler should abort and perform compensation. + self.workflow_exit: asyncio.Future[None] = asyncio.Future() + + # The following two attributes are implementation detail of this sample + # and can be ignored + self._update_started = False + self._update_compensation_done = False + self._workflow_compensation_done = False + + @workflow.run + async def run(self, input: WorkflowInput) -> str: + try: + # 👉 Use this `try...except` style, instead of waiting for message + # handlers to finish in a `finally` block. The reason is that some + # exception types cause a workflow task failure as opposed to + # workflow exit, in which case we do *not* want to wait for message + # handlers to finish. + + # 👉 self._run contains your actual application logic. This is + # implemented in a separate method in order to separate + # "platform-level" concerns (waiting for handlers to finish and + # ensuring that compensation is performed when appropriate) from + # application logic. In this sample, its actual implementation is + # below but contains nothing relevant. + result = await self._run(input) + self.workflow_exit.set_result(None) + await workflow.wait_condition(workflow.all_handlers_finished) + return result + # 👉 Catch BaseException since asyncio.CancelledError does not inherit + # from Exception. + except BaseException as e: + if is_workflow_exit_exception(e): + self.workflow_exit.set_exception(e) + await workflow.wait_condition(workflow.all_handlers_finished) + await self.workflow_compensation() + self._workflow_compensation_done = True + raise + + async def workflow_compensation(self): + await workflow.execute_activity( + activity_executed_to_perform_workflow_compensation, + start_to_close_timeout=timedelta(seconds=10), + ) + self._update_compensation_done = True + + @workflow.update + async def my_update(self) -> str: + """ + An update handler that handles exceptions raised in its own execution + and in that of the main workflow method. + + It ensures that: - Compensation/cleanup is always performed when + appropriate - The update caller gets the update result, or + WorkflowUpdateFailedError + """ + # 👉 As with the main workflow method, the update application logic is + # implemented in a separate method in order to separate "platform-level" + # error-handling and compensation concerns from application logic. Note + # that coroutines must be wrapped in tasks in order to use + # workflow.wait. + update_task = asyncio.create_task(self._my_update()) + + # 👉 "Race" the workflow_exit future against the handler's own application + # logic. Always use `workflow.wait` instead of `asyncio.wait` in + # Workflow code: asyncio's version is non-deterministic. + await workflow.wait( # type: ignore + [update_task, self.workflow_exit], return_when=asyncio.FIRST_EXCEPTION + ) + try: + if update_task.done(): + # 👉 The update has finished (whether successfully or not). + # Regardless of whether the main workflow method is about to + # exit or not, the update caller should receive a response + # informing them of the outcome of the update. So return the + # result, or raise the exception that caused the update handler + # to exit. + return await update_task + else: + # 👉 The main workflow method exited prematurely due to an + # error, and this happened before the update finished. Fail the + # update with the workflow exception as cause. + raise exceptions.ApplicationError( + "The update failed because the workflow run exited" + ) from cast(BaseException, self.workflow_exit.exception()) + # 👉 Catch BaseException since asyncio.CancelledError does not inherit + # from Exception. + except BaseException as e: + if is_workflow_exit_exception(e): + try: + await self.my_update_compensation() + except BaseException as e: + raise exceptions.ApplicationError( + "Update compensation failed" + ) from e + raise + + async def my_update_compensation(self): + await workflow.execute_activity( + activity_executed_by_update_handler_to_perform_compensation, + start_to_close_timeout=timedelta(seconds=10), + ) + self._update_compensation_done = True + + @workflow.query + def workflow_compensation_done(self) -> bool: + return self._workflow_compensation_done + + @workflow.query + def update_compensation_done(self) -> bool: + return self._update_compensation_done + + # The following methods are placeholders for the actual application logic + # that you would perform in your main workflow method or update handler. + # Their implementation can be ignored. + + async def _my_update(self) -> str: + # Ignore this method unless you are interested in the implementation + # details of this sample. + self._update_started = True + await workflow.execute_activity( + activity_executed_by_update_handler, + start_to_close_timeout=timedelta(seconds=10), + ) + return "update-result" + + async def _run(self, input: WorkflowInput) -> str: + # Ignore this method unless you are interested in the implementation + # details of this sample. + + # Wait until handlers have started, so that we are demonstrating that we + # wait for them to finish. + await workflow.wait_condition(lambda: self._update_started) + if input.exit_type == WorkflowExitType.SUCCESS: + return "workflow-result" + elif input.exit_type == WorkflowExitType.FAILURE: + raise exceptions.ApplicationError("deliberately failing workflow") + elif input.exit_type == WorkflowExitType.CANCELLATION: + # Block forever; the starter will send a workflow cancellation request. + await asyncio.Future() + raise AssertionError("unreachable") + + +def is_workflow_exit_exception(e: BaseException) -> bool: + # 👉 If you have set additional failure_exception_types you should also + # check for these here. + return isinstance(e, (asyncio.CancelledError, exceptions.FailureError)) diff --git a/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py b/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py new file mode 100644 index 0000000..2b10d39 --- /dev/null +++ b/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py @@ -0,0 +1,106 @@ +import uuid +from enum import Enum + +import pytest +from temporalio import client, worker +from temporalio.testing import WorkflowEnvironment + +from message_passing.waiting_for_handlers_and_compensation import ( + WorkflowExitType, + WorkflowInput, +) +from message_passing.waiting_for_handlers_and_compensation.activities import ( + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, + activity_executed_to_perform_workflow_compensation, +) +from message_passing.waiting_for_handlers_and_compensation.starter import TASK_QUEUE +from message_passing.waiting_for_handlers_and_compensation.workflows import ( + WaitingForHandlersAndCompensationWorkflow, +) + + +class UpdateExpect(Enum): + SUCCESS = "success" + FAILURE = "failure" + + +class WorkflowExpect(Enum): + SUCCESS = "success" + FAILURE = "failure" + + +@pytest.mark.parametrize( + ["exit_type_name", "update_expect", "workflow_expect"], + [ + (WorkflowExitType.SUCCESS.name, UpdateExpect.SUCCESS, WorkflowExpect.SUCCESS), + (WorkflowExitType.FAILURE.name, UpdateExpect.FAILURE, WorkflowExpect.FAILURE), + ( + WorkflowExitType.CANCELLATION.name, + UpdateExpect.FAILURE, + WorkflowExpect.FAILURE, + ), + ], +) +async def test_waiting_for_handlers_and_compensation( + env: WorkflowEnvironment, + exit_type_name: str, + update_expect: UpdateExpect, + workflow_expect: WorkflowExpect, +): + [exit_type] = [t for t in WorkflowExitType if t.name == exit_type_name] + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + ) + async with worker.Worker( + env.client, + task_queue=TASK_QUEUE, + workflows=[WaitingForHandlersAndCompensationWorkflow], + activities=[ + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, + activity_executed_to_perform_workflow_compensation, + ], + ): + wf_handle = await env.client.start_workflow( + WaitingForHandlersAndCompensationWorkflow.run, + WorkflowInput(exit_type=exit_type), + id=str(uuid.uuid4()), + task_queue=TASK_QUEUE, + ) + up_handle = await wf_handle.start_update( + WaitingForHandlersAndCompensationWorkflow.my_update, + wait_for_stage=client.WorkflowUpdateStage.ACCEPTED, + ) + + if exit_type == WorkflowExitType.CANCELLATION: + await wf_handle.cancel() + + if update_expect == UpdateExpect.SUCCESS: + await up_handle.result() + assert not ( + await wf_handle.query( + WaitingForHandlersAndCompensationWorkflow.update_compensation_done + ) + ) + else: + with pytest.raises(client.WorkflowUpdateFailedError): + await up_handle.result() + assert await wf_handle.query( + WaitingForHandlersAndCompensationWorkflow.update_compensation_done + ) + + if workflow_expect == WorkflowExpect.SUCCESS: + await wf_handle.result() + assert not ( + await wf_handle.query( + WaitingForHandlersAndCompensationWorkflow.workflow_compensation_done + ) + ) + else: + with pytest.raises(client.WorkflowFailureError): + await wf_handle.result() + assert await wf_handle.query( + WaitingForHandlersAndCompensationWorkflow.workflow_compensation_done + )