-
Notifications
You must be signed in to change notification settings - Fork 55
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
message_handler_waiting_compensation_cleanup
- Loading branch information
1 parent
8903768
commit c566b62
Showing
6 changed files
with
351 additions
and
0 deletions.
There are no files selected for viewing
71 changes: 71 additions & 0 deletions
71
message_passing/message_handler_waiting_compensation_cleanup/README.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
# Waiting for message handlers, and performing compensation and cleanup in message handlers | ||
|
||
This sample demonstrates the following recommended practices: | ||
|
||
1. Ensuring that all signal and update handlers are finished before a successful | ||
workflow return, and on workflow failure, cancellation, and continue-as-new. | ||
2. Performs any necessary compensation/cleanup in an update handler when the | ||
workflow is cancelled, fails, or continues-as-new. | ||
|
||
|
||
To run, open two terminals and `cd` to this directory in them. | ||
|
||
Run the worker in one terminal: | ||
|
||
poetry run python worker.py | ||
|
||
And run the workflow-starter code in the other terminal: | ||
|
||
poetry run python starter.py | ||
|
||
|
||
Here's the output you'll see, along with some explanation: | ||
|
||
``` | ||
workflow exit type: success | ||
update action on premature workflow exit: continue | ||
👇 [Caller gets a successful update response because main workflow method waits for handlers to finish] | ||
🟢 caller received update result | ||
🟢 caller received workflow result | ||
update action on premature workflow exit: abort_with_compensation | ||
👇 [Same as above: the workflow is successful for action-on-premature exit is irrelevant] | ||
🟢 caller received update result | ||
🟢 caller received workflow result | ||
workflow exit type: failure | ||
update action on premature workflow exit: continue | ||
👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to workflow failure] | ||
🟢 caller received update result | ||
🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow | ||
update action on premature workflow exit: abort_with_compensation | ||
👇 [update aborts, compensates and raises => caller gets failed update result] | ||
🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited: deliberately failing workflow | ||
🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow | ||
workflow exit type: cancellation | ||
update action on premature workflow exit: continue | ||
👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to workflow cancellation] | ||
🟢 caller received update result | ||
🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled | ||
update action on premature workflow exit: abort_with_compensation | ||
👇 [update aborts, compensates and raises => caller gets failed update result] | ||
🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited: | ||
🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled | ||
workflow exit type: continue_as_new | ||
update action on premature workflow exit: continue | ||
👇 [update does not abort and main workflow method waits for handlers to finish => caller gets successful update result prior to continue-as-new] | ||
🟢 caller received update result | ||
👇 [a second update is sent to the post-CAN run, which run succeeds, hence update succeeds] | ||
🟢 caller received update result | ||
🟢 caller received workflow result | ||
update action on premature workflow exit: abort_with_compensation | ||
👇 [update aborts, compensates and raises => caller gets failed update result] | ||
🔴 caught exception while waiting for update result: update "50cd58dc-2db7-4a70-9204-bf5922203203" not found: | ||
👇 [a second update is sent to the post-CAN run, which run succeeds, hence update succeeds] | ||
🟢 caller received update result | ||
🟢 caller received workflow result | ||
``` |
27 changes: 27 additions & 0 deletions
27
message_passing/message_handler_waiting_compensation_cleanup/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from dataclasses import dataclass | ||
from enum import StrEnum | ||
|
||
TASK_QUEUE = "my-task-queue" | ||
WORKFLOW_ID = "my-workflow-id" | ||
|
||
|
||
class WorkflowExitType(StrEnum): | ||
SUCCESS = "success" | ||
FAILURE = "failure" | ||
CONTINUE_AS_NEW = "continue_as_new" | ||
CANCELLATION = "cancellation" | ||
|
||
|
||
@dataclass | ||
class WorkflowInput: | ||
exit_type: WorkflowExitType | ||
|
||
|
||
class OnWorkflowExitAction(StrEnum): | ||
CONTINUE = "continue" | ||
ABORT_WITH_COMPENSATION = "abort_with_compensation" | ||
|
||
|
||
@dataclass | ||
class UpdateInput: | ||
on_premature_workflow_exit: OnWorkflowExitAction |
8 changes: 8 additions & 0 deletions
8
message_passing/message_handler_waiting_compensation_cleanup/activities.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
import asyncio | ||
|
||
from temporalio import activity | ||
|
||
|
||
@activity.defn | ||
async def my_activity(): | ||
await asyncio.sleep(1) |
84 changes: 84 additions & 0 deletions
84
message_passing/message_handler_waiting_compensation_cleanup/starter.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import asyncio | ||
from contextlib import contextmanager | ||
|
||
from temporalio import client, common | ||
|
||
from message_passing.message_handler_waiting_compensation_cleanup import ( | ||
TASK_QUEUE, | ||
WORKFLOW_ID, | ||
OnWorkflowExitAction, | ||
UpdateInput, | ||
WorkflowExitType, | ||
WorkflowInput, | ||
) | ||
from message_passing.message_handler_waiting_compensation_cleanup.workflows import ( | ||
MyWorkflow, | ||
) | ||
|
||
|
||
async def starter(exit_type: WorkflowExitType, update_action: OnWorkflowExitAction): | ||
cl = await client.Client.connect("localhost:7233") | ||
wf_handle = await cl.start_workflow( | ||
MyWorkflow.run, | ||
WorkflowInput(exit_type=exit_type), | ||
id=WORKFLOW_ID, | ||
task_queue=TASK_QUEUE, | ||
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, | ||
) | ||
await _check_run(wf_handle, exit_type, update_action) | ||
|
||
|
||
async def _check_run( | ||
wf_handle: client.WorkflowHandle, | ||
exit_type: WorkflowExitType, | ||
update_action: OnWorkflowExitAction, | ||
): | ||
with catch("starting update"): | ||
up_handle = await wf_handle.start_update( | ||
MyWorkflow.my_update, | ||
UpdateInput(on_premature_workflow_exit=update_action), | ||
wait_for_stage=client.WorkflowUpdateStage.ACCEPTED, | ||
) | ||
|
||
if exit_type == WorkflowExitType.CANCELLATION: | ||
await wf_handle.cancel() | ||
|
||
with catch("waiting for update result"): | ||
await up_handle.result() | ||
print(" 🟢 caller received update result") | ||
|
||
if exit_type == WorkflowExitType.CONTINUE_AS_NEW: | ||
await _check_run(wf_handle, WorkflowExitType.SUCCESS, update_action) | ||
else: | ||
with catch("waiting for workflow result"): | ||
await wf_handle.result() | ||
print(" 🟢 caller received workflow result") | ||
|
||
|
||
@contextmanager | ||
def catch(operation: str): | ||
try: | ||
yield | ||
except Exception as e: | ||
cause = getattr(e, "cause", None) | ||
print(f" 🔴 caught exception while {operation}: {e}: {cause or ''}") | ||
|
||
|
||
async def main(): | ||
for exit_type in [ | ||
WorkflowExitType.SUCCESS, | ||
WorkflowExitType.FAILURE, | ||
WorkflowExitType.CANCELLATION, | ||
WorkflowExitType.CONTINUE_AS_NEW, | ||
]: | ||
print(f"\n\nworkflow exit type: {exit_type}") | ||
for update_action in [ | ||
OnWorkflowExitAction.CONTINUE, | ||
OnWorkflowExitAction.ABORT_WITH_COMPENSATION, | ||
]: | ||
print(f" update action on premature workflow exit: {update_action}") | ||
await starter(exit_type, update_action) | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
40 changes: 40 additions & 0 deletions
40
message_passing/message_handler_waiting_compensation_cleanup/worker.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import asyncio | ||
import logging | ||
|
||
from temporalio.client import Client | ||
from temporalio.worker import Worker | ||
|
||
from message_passing.message_handler_waiting_compensation_cleanup import TASK_QUEUE | ||
from message_passing.message_handler_waiting_compensation_cleanup.activities import ( | ||
my_activity, | ||
) | ||
from message_passing.message_handler_waiting_compensation_cleanup.workflows import ( | ||
MyWorkflow, | ||
) | ||
|
||
interrupt_event = asyncio.Event() | ||
|
||
|
||
async def main(): | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
client = await Client.connect("localhost:7233") | ||
|
||
async with Worker( | ||
client, | ||
task_queue=TASK_QUEUE, | ||
workflows=[MyWorkflow], | ||
activities=[my_activity], | ||
): | ||
logging.info("Worker started, ctrl+c to exit") | ||
await interrupt_event.wait() | ||
logging.info("Shutting down") | ||
|
||
|
||
if __name__ == "__main__": | ||
loop = asyncio.new_event_loop() | ||
try: | ||
loop.run_until_complete(main()) | ||
except KeyboardInterrupt: | ||
interrupt_event.set() | ||
loop.run_until_complete(loop.shutdown_asyncgens()) |
121 changes: 121 additions & 0 deletions
121
message_passing/message_handler_waiting_compensation_cleanup/workflows.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import asyncio | ||
from datetime import timedelta | ||
|
||
from temporalio import exceptions, workflow | ||
|
||
from message_passing.message_handler_waiting_compensation_cleanup import ( | ||
OnWorkflowExitAction, | ||
UpdateInput, | ||
WorkflowExitType, | ||
WorkflowInput, | ||
) | ||
from message_passing.message_handler_waiting_compensation_cleanup.activities import ( | ||
my_activity, | ||
) | ||
|
||
|
||
@workflow.defn | ||
class MyWorkflow: | ||
""" | ||
This Workflow upholds the following recommended practices: | ||
1. The main workflow method ensures that all signal and update handlers are | ||
finished before a successful return, and on failure, cancellation, and | ||
continue-as-new. | ||
2. The update handler performs any necessary compensation/cleanup when the | ||
workflow is cancelled, fails, or continues-as-new. | ||
""" | ||
|
||
def __init__(self) -> None: | ||
self.workflow_exit_exception: asyncio.Future[BaseException] = asyncio.Future() | ||
|
||
@workflow.run | ||
async def run(self, input: WorkflowInput) -> str: | ||
try: | ||
# 👉 Use this `try...except` style, instead of waiting for message | ||
# handlers to finish in a `finally` block. The reason is that other | ||
# exception types will cause a Workflow Task failure, in which case | ||
# we do *not* want to wait for message handlers to finish. | ||
result = await self._run(input) | ||
await workflow.wait_condition(workflow.all_handlers_finished) | ||
return result | ||
except ( | ||
asyncio.CancelledError, | ||
workflow.ContinueAsNewError, | ||
exceptions.FailureError, | ||
) as exc: | ||
self.workflow_exit_exception.set_result(exc) | ||
await workflow.wait_condition(workflow.all_handlers_finished) | ||
raise exc | ||
|
||
@workflow.update | ||
async def my_update(self, input: UpdateInput) -> str: | ||
""" | ||
This update handler demonstrates how to handle the situation where the | ||
main Workflow method exits prematurely. In that case we perform | ||
compensation/cleanup, and fail the Update. The Update caller will get a | ||
WorkflowUpdateFailedError. | ||
""" | ||
# Coroutines must be wrapped in tasks in order to use workflow.wait. | ||
update_task = asyncio.Task(self._my_update()) | ||
# 👉 Always use `workflow.wait` instead of `asyncio.wait` in Workflow | ||
# code: asyncio's version is non-deterministic. | ||
first_completed, _ = await workflow.wait( | ||
[update_task, self.workflow_exit_exception], | ||
return_when=asyncio.FIRST_COMPLETED, | ||
) | ||
# 👉 It's possible that the update completed and the workflow exited | ||
# prematurely in the same tick of the event loop. If the Update has | ||
# completed, return the Update result to the caller, whether or not the | ||
# Workflow is exiting. | ||
if ( | ||
update_task in first_completed | ||
or input.on_premature_workflow_exit == OnWorkflowExitAction.CONTINUE | ||
): | ||
return await update_task | ||
else: | ||
await self._my_update_compensation_and_cleanup() | ||
raise exceptions.ApplicationError( | ||
f"The update failed because the workflow run exited: {await self.workflow_exit_exception}" | ||
) | ||
|
||
async def _my_update(self) -> str: | ||
""" | ||
This handler calls a slow activity, so | ||
(1) In the case where the workflow finishes successfully, the worker | ||
would get an UnfinishedUpdateHandlersWarning (TMPRL1102) if the main | ||
workflow task didn't wait for it to finish. | ||
(2) In the other cases (failure, cancellation, and continue-as-new), the | ||
premature workflow exit will occur before the update is finished. | ||
""" | ||
# Ignore: implementation detail specific to this sample | ||
self._update_started = True | ||
|
||
await workflow.execute_activity( | ||
my_activity, start_to_close_timeout=timedelta(seconds=10) | ||
) | ||
return "update-result" | ||
|
||
async def _my_update_compensation_and_cleanup(self): | ||
workflow.logger.info( | ||
"performing update handler compensation and cleanup operations" | ||
) | ||
|
||
async def _run(self, input: WorkflowInput) -> str: | ||
# Ignore this method unless you are interested in the implementation | ||
# details of this sample. | ||
|
||
# Wait until handlers started, so that we are demonstrating that we wait for them to finish. | ||
await workflow.wait_condition(lambda: getattr(self, "_update_started", False)) | ||
if input.exit_type == WorkflowExitType.SUCCESS: | ||
return "workflow-result" | ||
elif input.exit_type == WorkflowExitType.CONTINUE_AS_NEW: | ||
workflow.continue_as_new(WorkflowInput(exit_type=WorkflowExitType.SUCCESS)) | ||
elif input.exit_type == WorkflowExitType.FAILURE: | ||
raise exceptions.ApplicationError("deliberately failing workflow") | ||
elif input.exit_type == WorkflowExitType.CANCELLATION: | ||
# Block forever; the starter will send a workflow cancellation request. | ||
await asyncio.Future() | ||
raise AssertionError("unreachable") |