diff --git a/README.md b/README.md index 34eed9c3..6eb13e50 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ informal introduction to the features and their implementation. - [Asyncio Cancellation](#asyncio-cancellation) - [Workflow Utilities](#workflow-utilities) - [Exceptions](#exceptions) + - [Signal and update handlers](#signal-and-update-handlers) - [External Workflows](#external-workflows) - [Testing](#testing) - [Automatic Time Skipping](#automatic-time-skipping) @@ -581,28 +582,35 @@ Here are the decorators that can be applied: * The purpose of this decorator is to allow operations involving workflow arguments to be performed in the `__init__` method, before any signal or update handler has a chance to execute. * `@workflow.signal` - Defines a method as a signal - * Can be defined on an `async` or non-`async` function at any hierarchy depth, but if decorated method is overridden, - the override must also be decorated - * The method's arguments are the signal's arguments - * Can have a `name` param to customize the signal name, otherwise it defaults to the unqualified method name + * Can be defined on an `async` or non-`async` method at any point in the class hierarchy, but if the decorated method + is overridden, then the override must also be decorated. + * The method's arguments are the signal's arguments. + * Return value is ignored. + * May mutate workflow state, and make calls to other workflow APIs like starting activities, etc. + * Can have a `name` param to customize the signal name, otherwise it defaults to the unqualified method name. * Can have `dynamic=True` which means all otherwise unhandled signals fall through to this. If present, cannot have `name` argument, and method parameters must be `self`, a string signal name, and a `Sequence[temporalio.common.RawValue]`. * Non-dynamic method can only have positional arguments. Best practice is to only take a single argument that is an object/dataclass of fields that can be added to as needed. - * Return value is ignored -* `@workflow.query` - Defines a method as a query - * All the same constraints as `@workflow.signal` but should return a value - * Should not be `async` - * Temporal queries should never mutate anything in the workflow or call any calls that would mutate the workflow + * See [Signal and update handlers](#signal-and-update-handlers) below * `@workflow.update` - Defines a method as an update - * May both accept as input and return a value + * Can be defined on an `async` or non-`async` method at any point in the class hierarchy, but if the decorated method + is overridden, then the override must also be decorated. + * May accept input and return a value + * The method's arguments are the update's arguments. * May be `async` or non-`async` * May mutate workflow state, and make calls to other workflow APIs like starting activities, etc. - * Also accepts the `name` and `dynamic` parameters like signals and queries, with the same semantics. + * Also accepts the `name` and `dynamic` parameters like signal, with the same semantics. * Update handlers may optionally define a validator method by decorating it with `@update_handler_method.validator`. To reject an update before any events are written to history, throw an exception in a validator. Validators cannot be `async`, cannot mutate workflow state, and return nothing. + * See [Signal and update handlers](#signal-and-update-handlers) below +* `@workflow.query` - Defines a method as a query + * Should return a value + * Should not be `async` + * Temporal queries should never mutate anything in the workflow or call any calls that would mutate the workflow + * Also accepts the `name` and `dynamic` parameters like signal and update, with the same semantics. #### Running @@ -705,9 +713,15 @@ deterministic: #### Asyncio Cancellation -Cancellation is done the same way as `asyncio`. Specifically, a task can be requested to be cancelled but does not -necessarily have to respect that cancellation immediately. This also means that `asyncio.shield()` can be used to -protect against cancellation. The following tasks, when cancelled, perform a Temporal cancellation: +Cancellation is done using `asyncio` [task cancellation](https://docs.python.org/3/library/asyncio-task.html#task-cancellation). +This means that tasks are requested to be cancelled but can catch the +[`asyncio.CancelledError`](https://docs.python.org/3/library/asyncio-exceptions.html#asyncio.CancelledError), thus +allowing them to perform some cleanup before allowing the cancellation to proceed (i.e. re-raising the error), or to +deny the cancellation entirely. It also means that +[`asyncio.shield()`](https://docs.python.org/3/library/asyncio-task.html#shielding-from-cancellation) can be used to +protect tasks against cancellation. + +The following tasks, when cancelled, perform a Temporal cancellation: * Activities - when the task executing an activity is cancelled, a cancellation request is sent to the activity * Child workflows - when the task starting or executing a child workflow is cancelled, a cancellation request is sent to @@ -746,17 +760,43 @@ While running in a workflow, in addition to features documented elsewhere, the f be marked non-retryable or include details as needed. * Other exceptions that come from activity execution, child execution, cancellation, etc are already instances of `FailureError` and will fail the workflow when uncaught. +* Update handlers are special: an instance of `temporalio.exceptions.FailureError` raised in an update handler will fail + the update instead of failing the workflow. * All other exceptions fail the "workflow task" which means the workflow will continually retry until the workflow is fixed. This is helpful for bad code or other non-predictable exceptions. To actually fail the workflow, use an `ApplicationError` as mentioned above. This default can be changed by providing a list of exception types to `workflow_failure_exception_types` when creating a `Worker` or `failure_exception_types` on the `@workflow.defn` decorator. If a workflow-thrown exception is an instance -of any type in either list, it will fail the workflow instead of the task. This means a value of `[Exception]` will -cause every exception to fail the workflow instead of the task. Also, as a special case, if +of any type in either list, it will fail the workflow (or update) instead of the workflow task. This means a value of +`[Exception]` will cause every exception to fail the workflow instead of the workflow task. Also, as a special case, if `temporalio.workflow.NondeterminismError` (or any superclass of it) is set, non-deterministic exceptions will fail the workflow. WARNING: These settings are experimental. +#### Signal and update handlers + +Signal and update handlers are defined using decorated methods as shown in the example [above](#definition). Client code +sends signals and updates using `workflow_handle.signal`, `workflow_handle.execute_update`, or +`workflow_handle.start_update`. When the workflow receives one of these requests, it starts an `asyncio.Task` executing +the corresponding handler method with the argument(s) from the request. + +The handler methods may be `async def` and can do all the async operations described above (e.g. invoking activities and +child workflows, and waiting on timers and conditions). Notice that this means that handler tasks will be executing +concurrently with respect to each other and the main workflow task. Use +[asyncio.Lock](https://docs.python.org/3/library/asyncio-sync.html#lock) and +[asyncio.Semaphore](https://docs.python.org/3/library/asyncio-sync.html#semaphore) if necessary. + +Your main workflow task may finish as a result of successful completion, cancellation, continue-as-new, or failure. You +should ensure that all in-progress signal and update handler tasks have finished before this happens; if you do not, you +will see a warning (the warning can be disabled via the `workflow.signal`/`workflow.update` decorators). One way to +ensure that handler tasks have finished is to wait on the `workflow.all_handlers_finished` condition: +```python +await workflow.wait_condition(workflow.all_handlers_finished) +``` + +If your main workflow task finishes as a result of cancellation, then any in-progress update handler tasks will be +automatically requested to cancel. + #### External Workflows * `workflow.get_external_workflow_handle()` inside a workflow returns a handle to interact with another workflow diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 1ca70a23..23964c60 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -434,7 +434,6 @@ def is_completion(command): command.HasField("complete_workflow_execution") or command.HasField("continue_as_new_workflow_execution") or command.HasField("fail_workflow_execution") - or command.HasField("cancel_workflow_execution") ) if any(map(is_completion, self._current_completion.successful.commands)): @@ -518,9 +517,9 @@ async def run_update() -> None: f"Update handler for '{job.name}' expected but not found, and there is no dynamic handler. " f"known updates: [{' '.join(known_updates)}]" ) - self._in_progress_updates[job.id] = HandlerExecution( - job.name, defn.unfinished_policy, job.id - ) + self._in_progress_updates[ + job.id + ].unfinished_policy = defn.unfinished_policy args = self._process_handler_args( job.name, job.input, @@ -571,7 +570,7 @@ async def run_update() -> None: # All asyncio cancelled errors become Temporal cancelled errors if isinstance(err, asyncio.CancelledError): err = temporalio.exceptions.CancelledError( - f"Cancellation raised within update {err}" + f"Cancellation raised within update: {err}" ) # Read-only issues during validation should fail the task if isinstance(err, temporalio.workflow.ReadOnlyContextError): @@ -606,10 +605,16 @@ async def run_update() -> None: finally: self._in_progress_updates.pop(job.id, None) - self.create_task( + task = self.create_task( run_update(), name=f"update: {job.name}", ) + self._in_progress_updates[job.id] = HandlerExecution( + job.name, + task, + temporalio.workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON, + job.id, + ) def _apply_fire_timer( self, job: temporalio.bridge.proto.workflow_activation.FireTimer @@ -1729,20 +1734,20 @@ def _process_signal_job( signal=job.signal_name, args=args, headers=job.headers ) + task = self.create_task( + self._run_top_level_workflow_function(self._inbound.handle_signal(input)), + name=f"signal: {job.signal_name}", + ) self._handled_signals_seq += 1 id = self._handled_signals_seq - self._in_progress_signals[id] = HandlerExecution( - job.signal_name, defn.unfinished_policy - ) - def done_callback(f): + def done_callback(_): self._in_progress_signals.pop(id, None) - task = self.create_task( - self._run_top_level_workflow_function(self._inbound.handle_signal(input)), - name=f"signal: {job.signal_name}", - ) task.add_done_callback(done_callback) + self._in_progress_signals[id] = HandlerExecution( + job.signal_name, task, defn.unfinished_policy + ) def _register_task( self, @@ -1845,6 +1850,13 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None: err ): self._add_command().cancel_workflow_execution.SetInParent() + # Cancel update tasks, so that the update caller receives an + # update failed error. We do not currently cancel signal tasks + # since (a) doing so would require a workflow flag and (b) the + # presence of the update caller gives a strong reason to cancel + # update tasks. + for update_handler in self._in_progress_updates.values(): + update_handler.task.cancel() elif self._is_workflow_failure_exception(err): # All other failure errors fail the workflow self._set_workflow_failure(err) @@ -2811,6 +2823,7 @@ class HandlerExecution: """Information about an execution of a signal or update handler.""" name: str + task: asyncio.Task[None] unfinished_policy: temporalio.workflow.HandlerUnfinishedPolicy id: Optional[str] = None diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 15afe8c4..8ed4ed46 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5584,8 +5584,10 @@ class _UnfinishedHandlersOnWorkflowTerminationTest: async def test_warning_is_issued_on_exit_with_unfinished_handler( self, ): - assert await self._run_workflow_and_get_warning() == ( + warning_emitted = await self._run_workflow_and_get_warning() + assert warning_emitted == ( self.handler_waiting == "-no-wait-all-handlers-finish-" + and self.workflow_termination_type != "-cancellation-" ) async def _run_workflow_and_get_warning(self) -> bool: @@ -5646,13 +5648,22 @@ async def _run_workflow_and_get_warning(self) -> bool: with pytest.WarningsRecorder() as warnings: if self.handler_type == "-update-": assert update_task - if self.handler_waiting == "-wait-all-handlers-finish-": + + if self.workflow_termination_type == "-cancellation-": + with pytest.raises(WorkflowUpdateFailedError) as update_err: + await update_task + assert isinstance(update_err.value.cause, CancelledError) + assert ( + "the workflow was cancelled" + in str(update_err.value.cause).lower() + ) + elif self.handler_waiting == "-wait-all-handlers-finish-": await update_task else: - with pytest.raises(RPCError) as update_err: + with pytest.raises(RPCError) as rpc_err: await update_task - assert update_err.value.status == RPCStatusCode.NOT_FOUND and ( - str(update_err.value).lower() + assert rpc_err.value.status == RPCStatusCode.NOT_FOUND and ( + str(rpc_err.value).lower() == "workflow execution already completed" ) @@ -6146,3 +6157,45 @@ async def test_workflow_run_sees_workflow_init(client: Client): task_queue=worker.task_queue, ) assert workflow_result == "hello, world" + + +@workflow.defn +class UpdateCancellationWorkflow: + def __init__(self) -> None: + self.non_terminating_operation_has_started = False + + @workflow.run + async def run(self) -> None: + await asyncio.Future() + + @workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON) + async def non_terminating_update(self) -> None: + self.non_terminating_operation_has_started = True + await asyncio.Future() + + @workflow.update + async def wait_until_non_terminating_operation_has_started(self) -> None: + await workflow.wait_condition( + lambda: self.non_terminating_operation_has_started + ) + + +async def test_update_cancellation(client: Client): + async with new_worker(client, UpdateCancellationWorkflow) as worker: + wf_handle = await client.start_workflow( + UpdateCancellationWorkflow.run, + id=str(uuid.uuid4()), + task_queue=worker.task_queue, + ) + # Asynchronously run an update that will never complete + non_terminating_update = asyncio.create_task( + wf_handle.execute_update(UpdateCancellationWorkflow.non_terminating_update) + ) + # Wait until we know the update handler has started executing + await wf_handle.execute_update( + UpdateCancellationWorkflow.wait_until_non_terminating_operation_has_started + ) + # Cancel the workflow and confirm that update caller sees update failed + await wf_handle.cancel() + with pytest.raises(WorkflowUpdateFailedError): + await non_terminating_update # type: ignore