Skip to content

Commit

Permalink
Test
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Oct 4, 2024
1 parent 60384af commit 422db1b
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 53 deletions.
16 changes: 11 additions & 5 deletions message_passing/waiting_for_handlers_and_compensation/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self) -> None:
# application logic; if this future completes before the message handler
# task then the handler should abort and perform compensation.
self.workflow_exit = asyncio.Future()
self._update_compensation_done = False

@workflow.run
async def run(self, input: WorkflowInput) -> str:
Expand Down Expand Up @@ -64,12 +65,12 @@ async def run(self, input: WorkflowInput) -> str:
@workflow.update
async def my_update(self) -> str:
"""
An update handler that handles exceptions in itself and in the main
workflow method.
An update handler that handles exceptions raised in its own execution
and in that of the main workflow method.
It ensures that:
- Compensation/cleanup is always performed when appropriate
- The update caller gets the update result, or WorkflowUpdateFailedError
It ensures that: - Compensation/cleanup is always performed when
appropriate - The update caller gets the update result, or
WorkflowUpdateFailedError
"""
# 👉 As with the main workflow method, the update application logic is
# implemented in a separate method in order to separate "platform-level"
Expand Down Expand Up @@ -117,6 +118,11 @@ async def my_update_compensation(self):
activity_executed_by_update_handler_to_perform_compensation,
start_to_close_timeout=timedelta(seconds=10),
)
self._update_compensation_done = True

@workflow.query
def update_compensation_done(self) -> bool:
return self._update_compensation_done

# The following two methods are placeholders for the actual application
# logic that you would perform in your main workflow method or update
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import uuid
from enum import Enum

import pytest
from temporalio.client import Client, WorkflowHandle, WorkflowUpdateStage
from temporalio import client, worker
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from message_passing.waiting_for_handlers_and_compensation import (
WorkflowExitType,
WorkflowInput,
)
from message_passing.waiting_for_handlers_and_compensation.activities import (
activity_executed_by_update_handler,
activity_executed_by_update_handler_to_perform_compensation,
)
from message_passing.waiting_for_handlers_and_compensation.starter import (
TASK_QUEUE,
)
Expand All @@ -17,66 +21,78 @@
)


class UpdateExpect(Enum):
SUCCESS = "success"
FAILURE = "failure"


class WorkflowExpect(Enum):
SUCCESS = "success"
FAILURE = "failure"


@pytest.mark.parametrize(
["exit_type_name", "update_expect", "workflow_expect"],
[
(WorkflowExitType.SUCCESS.name, UpdateExpect.SUCCESS, WorkflowExpect.SUCCESS),
(WorkflowExitType.FAILURE.name, UpdateExpect.FAILURE, WorkflowExpect.FAILURE),
(
WorkflowExitType.CANCELLATION.name,
UpdateExpect.FAILURE,
WorkflowExpect.FAILURE,
),
],
)
async def test_waiting_for_handlers_and_compensation(
client: Client, env: WorkflowEnvironment
env: WorkflowEnvironment,
exit_type_name: str,
update_expect: UpdateExpect,
workflow_expect: WorkflowExpect,
):
[exit_type] = [t for t in WorkflowExitType if t.name == exit_type_name]
if env.supports_time_skipping:
pytest.skip(
"Java test server: https://github.com/temporalio/sdk-java/issues/1903"
)
async with Worker(
client,
async with worker.Worker(
env.client,
task_queue=TASK_QUEUE,
workflows=[WaitingForHandlersAndCompensationWorkflow],
activities=[
activity_executed_by_update_handler,
activity_executed_by_update_handler_to_perform_compensation,
],
):
await starter(
WorkflowExitType.SUCCESS,
client,
wf_handle = await env.client.start_workflow(
WaitingForHandlersAndCompensationWorkflow.run,
WorkflowInput(exit_type=exit_type),
id=str(uuid.uuid4()),
task_queue=TASK_QUEUE,
)


async def starter(exit_type: WorkflowExitType, cl: Client):
wf_handle = await cl.start_workflow(
WaitingForHandlersAndCompensationWorkflow.run,
WorkflowInput(exit_type=exit_type),
id=str(uuid.uuid4()),
task_queue=TASK_QUEUE,
)
await _check_run(wf_handle, exit_type)


async def _check_run(
wf_handle: WorkflowHandle,
exit_type: WorkflowExitType,
):
try:
up_handle = await wf_handle.start_update(
WaitingForHandlersAndCompensationWorkflow.my_update,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
except Exception as e:
print(
f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}"
wait_for_stage=client.WorkflowUpdateStage.ACCEPTED,
)

if exit_type == WorkflowExitType.CANCELLATION:
await wf_handle.cancel()
if exit_type == WorkflowExitType.CANCELLATION:
await wf_handle.cancel()

try:
await up_handle.result()
print(" 🟢 caller received update result")
except Exception as e:
print(
f" 🔴 caught exception while waiting for update result: {e}: {e.__cause__ or ''}"
)
if update_expect == UpdateExpect.SUCCESS:
await up_handle.result()
assert not (
await wf_handle.query(
WaitingForHandlersAndCompensationWorkflow.update_compensation_done
)
)
else:
with pytest.raises(client.WorkflowUpdateFailedError):
await up_handle.result()
assert await wf_handle.query(
WaitingForHandlersAndCompensationWorkflow.update_compensation_done
)

if exit_type == WorkflowExitType.CONTINUE_AS_NEW:
await _check_run(wf_handle, WorkflowExitType.SUCCESS)
else:
try:
if workflow_expect == WorkflowExpect.SUCCESS:
await wf_handle.result()
print(" 🟢 caller received workflow result")
except Exception as e:
print(
f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}"
)
else:
with pytest.raises(client.WorkflowFailureError):
await wf_handle.result()

0 comments on commit 422db1b

Please sign in to comment.