From 2c1ac543cd12c990ce047432aaef5e6ae937684b Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 26 Jun 2024 18:57:02 +0100 Subject: [PATCH] New API to wait for handler executions to complete and warnings on unfinished handler executions (#556) * Implement warning on unfinished signals and updates * Implement all_handlers_finished --- pyproject.toml | 7 +- temporalio/worker/_workflow_instance.py | 112 +++++++- temporalio/workflow.py | 136 ++++++++-- tests/worker/test_workflow.py | 342 +++++++++++++++++++++++- 4 files changed, 570 insertions(+), 27 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 021fb0d7..e80f4fcf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,7 +99,6 @@ env = { TEMPORAL_INTEGRATION_TEST = "1" } cmd = "pip uninstall temporalio -y" [tool.pytest.ini_options] -addopts = "-p no:warnings" asyncio_mode = "auto" log_cli = true log_cli_level = "INFO" @@ -107,6 +106,12 @@ log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(linen testpaths = ["tests"] timeout = 600 timeout_func_only = true +filterwarnings = [ + "error::temporalio.workflow.UnfinishedUpdateHandlersWarning", + "error::temporalio.workflow.UnfinishedSignalHandlersWarning", + "ignore::pytest.PytestDeprecationWarning", + "ignore::DeprecationWarning", +] [tool.cibuildwheel] # We only want the 3.8 64-bit build of each type. However, due to diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 13923b2b..d29b727e 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -6,11 +6,11 @@ import collections import contextvars import inspect +import json import logging import random import sys import traceback -import typing import warnings from abc import ABC, abstractmethod from contextlib import contextmanager @@ -25,6 +25,7 @@ Dict, Generator, Generic, + Iterable, Iterator, List, Mapping, @@ -240,6 +241,14 @@ def __init__(self, det: WorkflowInstanceDetails) -> None: self._queries = dict(self._defn.queries) self._updates = dict(self._defn.updates) + # We record in-progress signals and updates in order to support waiting for handlers to + # finish, and issuing warnings when the workflow exits with unfinished handlers. Since + # signals lack a unique per-invocation identifier, we introduce a sequence number for the + # purpose. + self._handled_signals_seq = 0 + self._in_progress_signals: Dict[int, HandlerExecution] = {} + self._in_progress_updates: Dict[str, HandlerExecution] = {} + # Add stack trace handler # TODO(cretz): Is it ok that this can be forcefully overridden by the # workflow author? They could technically override in interceptor @@ -406,12 +415,15 @@ def activate( command.HasField("complete_workflow_execution") or command.HasField("continue_as_new_workflow_execution") or command.HasField("fail_workflow_execution") + or command.HasField("cancel_workflow_execution") ) elif not command.HasField("respond_to_query"): del self._current_completion.successful.commands[i] continue i += 1 + if seen_completion: + self._warn_if_unfinished_handlers() return self._current_completion def _apply( @@ -490,6 +502,9 @@ async def run_update() -> None: f"Update handler for '{job.name}' expected but not found, and there is no dynamic handler. " f"known updates: [{' '.join(known_updates)}]" ) + self._in_progress_updates[job.id] = HandlerExecution( + job.name, defn.unfinished_policy, job.id + ) args = self._process_handler_args( job.name, job.input, @@ -572,6 +587,8 @@ async def run_update() -> None: ) return raise + finally: + self._in_progress_updates.pop(job.id, None) self.create_task( run_update(), @@ -869,6 +886,9 @@ def _apply_update_random_seed( #### _Runtime direct workflow call overrides #### # These are in alphabetical order and all start with "workflow_". + def workflow_all_handlers_finished(self) -> bool: + return not self._in_progress_updates and not self._in_progress_signals + def workflow_continue_as_new( self, *args: Any, @@ -1596,6 +1616,31 @@ def _is_workflow_failure_exception(self, err: BaseException) -> bool: ) ) + def _warn_if_unfinished_handlers(self) -> None: + def warnable(handler_executions: Iterable[HandlerExecution]): + return [ + ex + for ex in handler_executions + if ex.unfinished_policy + == temporalio.workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON + ] + + warnable_updates = warnable(self._in_progress_updates.values()) + if warnable_updates: + warnings.warn( + temporalio.workflow.UnfinishedUpdateHandlersWarning( + _make_unfinished_update_handler_message(warnable_updates) + ) + ) + + warnable_signals = warnable(self._in_progress_signals.values()) + if warnable_signals: + warnings.warn( + temporalio.workflow.UnfinishedSignalHandlersWarning( + _make_unfinished_signal_handler_message(warnable_signals) + ) + ) + def _next_seq(self, type: str) -> int: seq = self._curr_seqs.get(type, 0) + 1 self._curr_seqs[type] = seq @@ -1646,10 +1691,21 @@ def _process_signal_job( input = HandleSignalInput( signal=job.signal_name, args=args, headers=job.headers ) - self.create_task( + + self._handled_signals_seq += 1 + id = self._handled_signals_seq + self._in_progress_signals[id] = HandlerExecution( + job.signal_name, defn.unfinished_policy + ) + + def done_callback(f): + self._in_progress_signals.pop(id, None) + + task = self.create_task( self._run_top_level_workflow_function(self._inbound.handle_signal(input)), name=f"signal: {job.signal_name}", ) + task.add_done_callback(done_callback) def _register_task( self, @@ -2686,3 +2742,55 @@ def set( class _WorkflowBeingEvictedError(BaseException): pass + + +@dataclass +class HandlerExecution: + """Information about an execution of a signal or update handler.""" + + name: str + unfinished_policy: temporalio.workflow.HandlerUnfinishedPolicy + id: Optional[str] = None + + +def _make_unfinished_update_handler_message( + handler_executions: List[HandlerExecution], +) -> str: + message = """ +Workflow finished while update handlers are still running. This may have interrupted work that the +update handler was doing, and the client that sent the update will receive a 'workflow execution +already completed' RPCError instead of the update result. You can wait for all update and signal +handlers to complete by using `await workflow.wait_condition(lambda: +workflow.all_handlers_finished())`. Alternatively, if both you and the clients sending the update +are okay with interrupting running handlers when the workflow finishes, and causing clients to +receive errors, then you can disable this warning via the update handler decorator: +`@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`. +""".replace( + "\n", " " + ).strip() + return ( + f"{message} The following updates were unfinished (and warnings were not disabled for their handler): " + + json.dumps([{"name": ex.name, "id": ex.id} for ex in handler_executions]) + ) + + +def _make_unfinished_signal_handler_message( + handler_executions: List[HandlerExecution], +) -> str: + message = """ +Workflow finished while signal handlers are still running. This may have interrupted work that the +signal handler was doing. You can wait for all update and signal handlers to complete by using +`await workflow.wait_condition(lambda: workflow.all_handlers_finished())`. Alternatively, if both +you and the clients sending the signal are okay with interrupting running handlers when the workflow +finishes, and causing clients to receive errors, then you can disable this warning via the signal +handler decorator: `@workflow.signal(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`. +""".replace( + "\n", " " + ).strip() + names = collections.Counter(ex.name for ex in handler_executions) + return ( + f"{message} The following signals were unfinished (and warnings were not disabled for their handler): " + + json.dumps( + [{"name": name, "count": count} for name, count in names.most_common()] + ) + ) diff --git a/temporalio/workflow.py b/temporalio/workflow.py index 64173266..3d98df9f 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -173,6 +173,30 @@ def run(fn: CallableAsyncType) -> CallableAsyncType: return fn # type: ignore[return-value] +class HandlerUnfinishedPolicy(Enum): + """Actions taken if a workflow terminates with running handlers. + + Policy defining actions taken when a workflow exits while update or signal handlers are running. + The workflow exit may be due to successful return, failure, cancellation, or continue-as-new. + """ + + WARN_AND_ABANDON = 1 + """Issue a warning in addition to abandoning.""" + ABANDON = 2 + """Abandon the handler. + + In the case of an update handler this means that the client will receive an error rather than + the update result.""" + + +class UnfinishedUpdateHandlersWarning(RuntimeWarning): + """The workflow exited before all update handlers had finished executing.""" + + +class UnfinishedSignalHandlersWarning(RuntimeWarning): + """The workflow exited before all signal handlers had finished executing.""" + + @overload def signal(fn: CallableSyncOrAsyncReturnNoneType) -> CallableSyncOrAsyncReturnNoneType: ... @@ -180,14 +204,26 @@ def signal(fn: CallableSyncOrAsyncReturnNoneType) -> CallableSyncOrAsyncReturnNo @overload def signal( - *, name: str + *, + unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON, ) -> Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]: ... @overload def signal( - *, dynamic: Literal[True] + *, + name: str, + unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON, +) -> Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]: + ... + + +@overload +def signal( + *, + dynamic: Literal[True], + unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON, ) -> Callable[[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType]: ... @@ -197,6 +233,7 @@ def signal( *, name: Optional[str] = None, dynamic: Optional[bool] = False, + unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON, ): """Decorator for a workflow signal method. @@ -217,12 +254,23 @@ def signal( parameters of the method must be self, a string name, and a ``*args`` positional varargs. Cannot be present when ``name`` is present. + unfinished_policy: Actions taken if a workflow terminates with + a running instance of this handler. """ - def with_name( - name: Optional[str], fn: CallableSyncOrAsyncReturnNoneType + def decorator( + name: Optional[str], + unfinished_policy: HandlerUnfinishedPolicy, + fn: CallableSyncOrAsyncReturnNoneType, ) -> CallableSyncOrAsyncReturnNoneType: - defn = _SignalDefinition(name=name, fn=fn, is_method=True) + if not name and not dynamic: + name = fn.__name__ + defn = _SignalDefinition( + name=name, + fn=fn, + is_method=True, + unfinished_policy=unfinished_policy, + ) setattr(fn, "__temporal_signal_definition", defn) if defn.dynamic_vararg: warnings.warn( @@ -232,13 +280,12 @@ def with_name( ) return fn - if name is not None or dynamic: + if not fn: if name is not None and dynamic: raise RuntimeError("Cannot provide name and dynamic boolean") - return partial(with_name, name) - if fn is None: - raise RuntimeError("Cannot create signal without function or name or dynamic") - return with_name(fn.__name__, fn) + return partial(decorator, name, unfinished_policy) + else: + return decorator(fn.__name__, unfinished_policy, fn) @overload @@ -467,6 +514,10 @@ def logger_details(self) -> Mapping[str, Any]: self._logger_details = self.workflow_info()._logger_details() return self._logger_details + @abstractmethod + def workflow_all_handlers_finished(self) -> bool: + ... + @abstractmethod def workflow_continue_as_new( self, @@ -923,7 +974,20 @@ def update( @overload def update( - *, name: str + *, + unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON, +) -> Callable[ + [Callable[MultiParamSpec, ReturnType]], + UpdateMethodMultiParam[MultiParamSpec, ReturnType], +]: + ... + + +@overload +def update( + *, + name: str, + unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON, ) -> Callable[ [Callable[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType], @@ -933,7 +997,9 @@ def update( @overload def update( - *, dynamic: Literal[True] + *, + dynamic: Literal[True], + unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON, ) -> Callable[ [Callable[MultiParamSpec, ReturnType]], UpdateMethodMultiParam[MultiParamSpec, ReturnType], @@ -946,6 +1012,7 @@ def update( *, name: Optional[str] = None, dynamic: Optional[bool] = False, + unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON, ): """Decorator for a workflow update handler method. @@ -973,12 +1040,23 @@ def update( parameters of the method must be self, a string name, and a ``*args`` positional varargs. Cannot be present when ``name`` is present. + unfinished_policy: Actions taken if a workflow terminates with + a running instance of this handler. """ - def with_name( - name: Optional[str], fn: CallableSyncOrAsyncType + def decorator( + name: Optional[str], + unfinished_policy: HandlerUnfinishedPolicy, + fn: CallableSyncOrAsyncType, ) -> CallableSyncOrAsyncType: - defn = _UpdateDefinition(name=name, fn=fn, is_method=True) + if not name and not dynamic: + name = fn.__name__ + defn = _UpdateDefinition( + name=name, + fn=fn, + is_method=True, + unfinished_policy=unfinished_policy, + ) if defn.dynamic_vararg: raise RuntimeError( "Dynamic updates do not support a vararg third param, use Sequence[RawValue]", @@ -987,13 +1065,12 @@ def with_name( setattr(fn, "validator", partial(_update_validator, defn)) return fn - if name is not None or dynamic: + if not fn: if name is not None and dynamic: raise RuntimeError("Cannot provide name and dynamic boolean") - return partial(with_name, name) - if fn is None: - raise RuntimeError("Cannot create update without function or name or dynamic") - return with_name(fn.__name__, fn) + return partial(decorator, name, unfinished_policy) + else: + return decorator(fn.__name__, unfinished_policy, fn) def _update_validator( @@ -1450,6 +1527,9 @@ class _SignalDefinition: name: Optional[str] fn: Callable[..., Union[None, Awaitable[None]]] is_method: bool + unfinished_policy: HandlerUnfinishedPolicy = ( + HandlerUnfinishedPolicy.WARN_AND_ABANDON + ) # Types loaded on post init if None arg_types: Optional[List[Type]] = None dynamic_vararg: bool = False @@ -1531,6 +1611,9 @@ class _UpdateDefinition: name: Optional[str] fn: Callable[..., Union[Any, Awaitable[Any]]] is_method: bool + unfinished_policy: HandlerUnfinishedPolicy = ( + HandlerUnfinishedPolicy.WARN_AND_ABANDON + ) # Types loaded on post init if None arg_types: Optional[List[Type]] = None ret_type: Optional[Type] = None @@ -4400,6 +4483,19 @@ def set_dynamic_update_handler( _Runtime.current().workflow_set_update_handler(None, handler, validator) +def all_handlers_finished() -> bool: + """Whether update and signal handlers have finished executing. + + Consider waiting on this condition before workflow return or continue-as-new, to prevent + interruption of in-progress handlers by workflow exit: + ``await workflow.wait_condition(lambda: workflow.all_handlers_finished())`` + + Returns: + True if there are no in-progress update or signal handler executions. + """ + return _Runtime.current().workflow_all_handlers_finished() + + def as_completed( fs: Iterable[Awaitable[AnyType]], *, timeout: Optional[float] = None ) -> Iterator[Awaitable[AnyType]]: diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index f51f7721..c63d2de9 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import dataclasses import json @@ -13,6 +15,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta, timezone from enum import IntEnum +from functools import partial from typing import ( Any, Awaitable, @@ -31,7 +34,7 @@ import pytest from google.protobuf.timestamp_pb2 import Timestamp -from typing_extensions import Protocol, runtime_checkable +from typing_extensions import Literal, Protocol, runtime_checkable import temporalio.worker from temporalio import activity, workflow @@ -39,7 +42,6 @@ from temporalio.api.enums.v1 import EventType from temporalio.api.failure.v1 import Failure from temporalio.api.sdk.v1 import EnhancedStackTrace -from temporalio.api.update.v1 import UpdateRef from temporalio.api.workflowservice.v1 import ( GetWorkflowExecutionHistoryRequest, ResetStickyTaskQueueRequest, @@ -109,10 +111,8 @@ new_worker, workflow_update_exists, ) -from tests.helpers.external_coroutine import wait_on_timer from tests.helpers.external_stack_trace import ( ExternalStackTraceWorkflow, - MultiFileStackTraceWorkflow, external_wait_cancel, ) @@ -5176,3 +5176,337 @@ async def test_workflow_current_update(client: Client, env: WorkflowEnvironment) assert {"update1", "update2", "update3", "update4", "update5"} == set( await handle.result() ) + + +@workflow.defn +class UnfinishedHandlersWorkflow: + def __init__(self): + self.started_handler = False + self.handler_may_return = False + self.handler_finished = False + + @workflow.run + async def run(self, wait_all_handlers_finished: bool) -> bool: + await workflow.wait_condition(lambda: self.started_handler) + if wait_all_handlers_finished: + self.handler_may_return = True + await workflow.wait_condition(workflow.all_handlers_finished) + return self.handler_finished + + async def _do_update_or_signal(self) -> None: + self.started_handler = True + await workflow.wait_condition(lambda: self.handler_may_return) + self.handler_finished = True + + @workflow.update + async def my_update(self) -> None: + await self._do_update_or_signal() + + @workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON) + async def my_update_ABANDON(self) -> None: + await self._do_update_or_signal() + + @workflow.update( + unfinished_policy=workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON + ) + async def my_update_WARN_AND_ABANDON(self) -> None: + await self._do_update_or_signal() + + @workflow.signal + async def my_signal(self): + await self._do_update_or_signal() + + @workflow.signal(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON) + async def my_signal_ABANDON(self): + await self._do_update_or_signal() + + @workflow.signal( + unfinished_policy=workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON + ) + async def my_signal_WARN_AND_ABANDON(self): + await self._do_update_or_signal() + + +async def test_unfinished_update_handler(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + ) + async with new_worker(client, UnfinishedHandlersWorkflow) as worker: + test = _UnfinishedHandlersTest(client, worker, "update") + await test.test_wait_all_handlers_finished_and_unfinished_handlers_warning() + await test.test_unfinished_handlers_cause_exceptions_in_test_suite() + + +async def test_unfinished_signal_handler(client: Client): + async with new_worker(client, UnfinishedHandlersWorkflow) as worker: + test = _UnfinishedHandlersTest(client, worker, "signal") + await test.test_wait_all_handlers_finished_and_unfinished_handlers_warning() + await test.test_unfinished_handlers_cause_exceptions_in_test_suite() + + +@dataclass +class _UnfinishedHandlersTest: + client: Client + worker: Worker + handler_type: Literal["update", "signal"] + + async def test_wait_all_handlers_finished_and_unfinished_handlers_warning(self): + # The unfinished handler warning is issued by default, + handler_finished, warning = await self._get_workflow_result_and_warning( + wait_all_handlers_finished=False, + ) + assert not handler_finished and warning + # and when the workflow sets the unfinished_policy to WARN_AND_ABANDON, + handler_finished, warning = await self._get_workflow_result_and_warning( + wait_all_handlers_finished=False, + unfinished_policy=workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON, + ) + assert not handler_finished and warning + # but not when the workflow waits for handlers to complete, + handler_finished, warning = await self._get_workflow_result_and_warning( + wait_all_handlers_finished=True, + ) + assert handler_finished and not warning + # nor when the silence-warnings policy is set on the handler. + handler_finished, warning = await self._get_workflow_result_and_warning( + wait_all_handlers_finished=False, + unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON, + ) + assert not handler_finished and not warning + + async def test_unfinished_handlers_cause_exceptions_in_test_suite(self): + # If we don't capture warnings then -- since the unfinished handler warning is converted to + # an exception in the test suite -- we see WFT failures when we don't wait for handlers. + handle: asyncio.Future[WorkflowHandle] = asyncio.Future() + asyncio.create_task( + self._get_workflow_result( + wait_all_handlers_finished=False, handle_future=handle + ) + ) + await assert_eq_eventually( + True, + partial(self._workflow_task_failed, workflow_id=(await handle).id), + timeout=timedelta(seconds=20), + ) + + async def _workflow_task_failed(self, workflow_id: str) -> bool: + resp = await self.client.workflow_service.get_workflow_execution_history( + GetWorkflowExecutionHistoryRequest( + namespace=self.client.namespace, + execution=WorkflowExecution(workflow_id=workflow_id), + ), + ) + for event in reversed(resp.history.events): + if event.event_type == EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED: + assert event.workflow_task_failed_event_attributes.failure.message.startswith( + f"Workflow finished while {self.handler_type} handlers are still running" + ) + return True + return False + + async def _get_workflow_result_and_warning( + self, + wait_all_handlers_finished: bool, + unfinished_policy: Optional[workflow.HandlerUnfinishedPolicy] = None, + ) -> Tuple[bool, bool]: + with pytest.WarningsRecorder() as warnings: + wf_result = await self._get_workflow_result( + wait_all_handlers_finished, unfinished_policy + ) + unfinished_handler_warning_emitted = any( + issubclass(w.category, self._unfinished_handler_warning_cls) + for w in warnings + ) + return wf_result, unfinished_handler_warning_emitted + + async def _get_workflow_result( + self, + wait_all_handlers_finished: bool, + unfinished_policy: Optional[workflow.HandlerUnfinishedPolicy] = None, + handle_future: Optional[asyncio.Future[WorkflowHandle]] = None, + ) -> bool: + handle = await self.client.start_workflow( + UnfinishedHandlersWorkflow.run, + arg=wait_all_handlers_finished, + id=f"wf-{uuid.uuid4()}", + task_queue=self.worker.task_queue, + ) + if handle_future: + handle_future.set_result(handle) + handler_name = f"my_{self.handler_type}" + if unfinished_policy: + handler_name += f"_{unfinished_policy.name}" + if self.handler_type == "signal": + await handle.signal(handler_name) + else: + if not wait_all_handlers_finished: + with pytest.raises(RPCError) as err: + await handle.execute_update(handler_name, id="my-update") + assert ( + err.value.status == RPCStatusCode.NOT_FOUND + and "workflow execution already completed" in str(err.value).lower() + ) + else: + await handle.execute_update(handler_name, id="my-update") + + return await handle.result() + + @property + def _unfinished_handler_warning_cls(self) -> Type: + return { + "update": workflow.UnfinishedUpdateHandlersWarning, + "signal": workflow.UnfinishedSignalHandlersWarning, + }[self.handler_type] + + +@workflow.defn +class UnfinishedHandlersWithCancellationOrFailureWorkflow: + @workflow.run + async def run( + self, workflow_termination_type: Literal["cancellation", "failure"] + ) -> NoReturn: + if workflow_termination_type == "failure": + raise ApplicationError( + "Deliberately failing workflow with an unfinished handler" + ) + await workflow.wait_condition(lambda: False) + raise AssertionError("unreachable") + + @workflow.update + async def my_update(self) -> NoReturn: + await workflow.wait_condition(lambda: False) + raise AssertionError("unreachable") + + @workflow.signal + async def my_signal(self) -> NoReturn: + await workflow.wait_condition(lambda: False) + raise AssertionError("unreachable") + + +async def test_unfinished_update_handler_with_workflow_cancellation( + client: Client, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + ) + await _UnfinishedHandlersWithCancellationOrFailureTest( + client, + "update", + "cancellation", + ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler() + + +async def test_unfinished_signal_handler_with_workflow_cancellation(client: Client): + await _UnfinishedHandlersWithCancellationOrFailureTest( + client, + "signal", + "cancellation", + ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler() + + +async def test_unfinished_update_handler_with_workflow_failure( + client: Client, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + ) + await _UnfinishedHandlersWithCancellationOrFailureTest( + client, + "update", + "failure", + ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler() + + +async def test_unfinished_signal_handler_with_workflow_failure( + client: Client, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/2127" + ) + await _UnfinishedHandlersWithCancellationOrFailureTest( + client, + "signal", + "failure", + ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler() + + +@dataclass +class _UnfinishedHandlersWithCancellationOrFailureTest: + client: Client + handler_type: Literal["update", "signal"] + workflow_termination_type: Literal["cancellation", "failure"] + + async def test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler( + self, + ): + assert await self._run_workflow_and_get_warning() + + async def _run_workflow_and_get_warning(self) -> bool: + workflow_id = f"wf-{uuid.uuid4()}" + update_id = "update-id" + task_queue = "tq" + + # We require a startWorkflow, an update, and maybe a cancellation request, to be delivered + # in the same WFT. To do this we start the worker after they've all been accepted by the + # server. + handle = await self.client.start_workflow( + UnfinishedHandlersWithCancellationOrFailureWorkflow.run, + self.workflow_termination_type, + id=workflow_id, + task_queue=task_queue, + ) + if self.workflow_termination_type == "cancellation": + await handle.cancel() + + if self.handler_type == "update": + update_task = asyncio.create_task( + handle.execute_update( + UnfinishedHandlersWithCancellationOrFailureWorkflow.my_update, + id=update_id, + ) + ) + await assert_eq_eventually( + True, + lambda: workflow_update_exists(self.client, workflow_id, update_id), + ) + else: + await handle.signal( + UnfinishedHandlersWithCancellationOrFailureWorkflow.my_signal + ) + + async with new_worker( + self.client, + UnfinishedHandlersWithCancellationOrFailureWorkflow, + task_queue=task_queue, + ): + with pytest.WarningsRecorder() as warnings: + if self.handler_type == "update": + assert update_task + with pytest.raises(RPCError) as update_err: + await update_task + assert ( + update_err.value.status == RPCStatusCode.NOT_FOUND + and "workflow execution already completed" + in str(update_err.value).lower() + ) + + with pytest.raises(WorkflowFailureError) as err: + await handle.result() + assert "workflow execution failed" in str(err.value).lower() + + unfinished_handler_warning_emitted = any( + issubclass(w.category, self._unfinished_handler_warning_cls) + for w in warnings + ) + return unfinished_handler_warning_emitted + + @property + def _unfinished_handler_warning_cls(self) -> Type: + return { + "update": workflow.UnfinishedUpdateHandlersWarning, + "signal": workflow.UnfinishedSignalHandlersWarning, + }[self.handler_type]