From 422db1ba64c41f660f803ebab3274b7a52ea58bc Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 4 Oct 2024 11:49:13 -0400 Subject: [PATCH] Test --- .../workflows.py | 16 ++- .../workflow_test.py | 112 ++++++++++-------- 2 files changed, 75 insertions(+), 53 deletions(-) diff --git a/message_passing/waiting_for_handlers_and_compensation/workflows.py b/message_passing/waiting_for_handlers_and_compensation/workflows.py index 8b37989..3cd6f2b 100644 --- a/message_passing/waiting_for_handlers_and_compensation/workflows.py +++ b/message_passing/waiting_for_handlers_and_compensation/workflows.py @@ -33,6 +33,7 @@ def __init__(self) -> None: # application logic; if this future completes before the message handler # task then the handler should abort and perform compensation. self.workflow_exit = asyncio.Future() + self._update_compensation_done = False @workflow.run async def run(self, input: WorkflowInput) -> str: @@ -64,12 +65,12 @@ async def run(self, input: WorkflowInput) -> str: @workflow.update async def my_update(self) -> str: """ - An update handler that handles exceptions in itself and in the main - workflow method. + An update handler that handles exceptions raised in its own execution + and in that of the main workflow method. - It ensures that: - - Compensation/cleanup is always performed when appropriate - - The update caller gets the update result, or WorkflowUpdateFailedError + It ensures that: - Compensation/cleanup is always performed when + appropriate - The update caller gets the update result, or + WorkflowUpdateFailedError """ # 👉 As with the main workflow method, the update application logic is # implemented in a separate method in order to separate "platform-level" @@ -117,6 +118,11 @@ async def my_update_compensation(self): activity_executed_by_update_handler_to_perform_compensation, start_to_close_timeout=timedelta(seconds=10), ) + self._update_compensation_done = True + + @workflow.query + def update_compensation_done(self) -> bool: + return self._update_compensation_done # The following two methods are placeholders for the actual application # logic that you would perform in your main workflow method or update diff --git a/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py b/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py index db92dbe..efa990f 100644 --- a/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py +++ b/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py @@ -1,14 +1,18 @@ import uuid +from enum import Enum import pytest -from temporalio.client import Client, WorkflowHandle, WorkflowUpdateStage +from temporalio import client, worker from temporalio.testing import WorkflowEnvironment -from temporalio.worker import Worker from message_passing.waiting_for_handlers_and_compensation import ( WorkflowExitType, WorkflowInput, ) +from message_passing.waiting_for_handlers_and_compensation.activities import ( + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, +) from message_passing.waiting_for_handlers_and_compensation.starter import ( TASK_QUEUE, ) @@ -17,66 +21,78 @@ ) +class UpdateExpect(Enum): + SUCCESS = "success" + FAILURE = "failure" + + +class WorkflowExpect(Enum): + SUCCESS = "success" + FAILURE = "failure" + + +@pytest.mark.parametrize( + ["exit_type_name", "update_expect", "workflow_expect"], + [ + (WorkflowExitType.SUCCESS.name, UpdateExpect.SUCCESS, WorkflowExpect.SUCCESS), + (WorkflowExitType.FAILURE.name, UpdateExpect.FAILURE, WorkflowExpect.FAILURE), + ( + WorkflowExitType.CANCELLATION.name, + UpdateExpect.FAILURE, + WorkflowExpect.FAILURE, + ), + ], +) async def test_waiting_for_handlers_and_compensation( - client: Client, env: WorkflowEnvironment + env: WorkflowEnvironment, + exit_type_name: str, + update_expect: UpdateExpect, + workflow_expect: WorkflowExpect, ): + [exit_type] = [t for t in WorkflowExitType if t.name == exit_type_name] if env.supports_time_skipping: pytest.skip( "Java test server: https://github.com/temporalio/sdk-java/issues/1903" ) - async with Worker( - client, + async with worker.Worker( + env.client, task_queue=TASK_QUEUE, workflows=[WaitingForHandlersAndCompensationWorkflow], + activities=[ + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, + ], ): - await starter( - WorkflowExitType.SUCCESS, - client, + wf_handle = await env.client.start_workflow( + WaitingForHandlersAndCompensationWorkflow.run, + WorkflowInput(exit_type=exit_type), + id=str(uuid.uuid4()), + task_queue=TASK_QUEUE, ) - - -async def starter(exit_type: WorkflowExitType, cl: Client): - wf_handle = await cl.start_workflow( - WaitingForHandlersAndCompensationWorkflow.run, - WorkflowInput(exit_type=exit_type), - id=str(uuid.uuid4()), - task_queue=TASK_QUEUE, - ) - await _check_run(wf_handle, exit_type) - - -async def _check_run( - wf_handle: WorkflowHandle, - exit_type: WorkflowExitType, -): - try: up_handle = await wf_handle.start_update( WaitingForHandlersAndCompensationWorkflow.my_update, - wait_for_stage=WorkflowUpdateStage.ACCEPTED, - ) - except Exception as e: - print( - f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}" + wait_for_stage=client.WorkflowUpdateStage.ACCEPTED, ) - if exit_type == WorkflowExitType.CANCELLATION: - await wf_handle.cancel() + if exit_type == WorkflowExitType.CANCELLATION: + await wf_handle.cancel() - try: - await up_handle.result() - print(" 🟢 caller received update result") - except Exception as e: - print( - f" 🔴 caught exception while waiting for update result: {e}: {e.__cause__ or ''}" - ) + if update_expect == UpdateExpect.SUCCESS: + await up_handle.result() + assert not ( + await wf_handle.query( + WaitingForHandlersAndCompensationWorkflow.update_compensation_done + ) + ) + else: + with pytest.raises(client.WorkflowUpdateFailedError): + await up_handle.result() + assert await wf_handle.query( + WaitingForHandlersAndCompensationWorkflow.update_compensation_done + ) - if exit_type == WorkflowExitType.CONTINUE_AS_NEW: - await _check_run(wf_handle, WorkflowExitType.SUCCESS) - else: - try: + if workflow_expect == WorkflowExpect.SUCCESS: await wf_handle.result() - print(" 🟢 caller received workflow result") - except Exception as e: - print( - f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}" - ) + else: + with pytest.raises(client.WorkflowFailureError): + await wf_handle.result()