Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make update caller receive update failed error on workflow cancellation #653

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
72 changes: 56 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ informal introduction to the features and their implementation.
- [Asyncio Cancellation](#asyncio-cancellation)
- [Workflow Utilities](#workflow-utilities)
- [Exceptions](#exceptions)
- [Signal and update handlers](#signal-and-update-handlers)
- [External Workflows](#external-workflows)
- [Testing](#testing)
- [Automatic Time Skipping](#automatic-time-skipping)
Expand Down Expand Up @@ -581,28 +582,35 @@ Here are the decorators that can be applied:
* The purpose of this decorator is to allow operations involving workflow arguments to be performed in the `__init__`
method, before any signal or update handler has a chance to execute.
* `@workflow.signal` - Defines a method as a signal
* Can be defined on an `async` or non-`async` function at any hierarchy depth, but if decorated method is overridden,
the override must also be decorated
* The method's arguments are the signal's arguments
* Can have a `name` param to customize the signal name, otherwise it defaults to the unqualified method name
* Can be defined on an `async` or non-`async` method at any point in the class hierarchy, but if the decorated method
is overridden, then the override must also be decorated.
* The method's arguments are the signal's arguments.
* Return value is ignored.
* May mutate workflow state, and make calls to other workflow APIs like starting activities, etc.
* Can have a `name` param to customize the signal name, otherwise it defaults to the unqualified method name.
* Can have `dynamic=True` which means all otherwise unhandled signals fall through to this. If present, cannot have
`name` argument, and method parameters must be `self`, a string signal name, and a
`Sequence[temporalio.common.RawValue]`.
* Non-dynamic method can only have positional arguments. Best practice is to only take a single argument that is an
object/dataclass of fields that can be added to as needed.
* Return value is ignored
* `@workflow.query` - Defines a method as a query
* All the same constraints as `@workflow.signal` but should return a value
* Should not be `async`
* Temporal queries should never mutate anything in the workflow or call any calls that would mutate the workflow
* See [Signal and update handlers](#signal-and-update-handlers) below
* `@workflow.update` - Defines a method as an update
* May both accept as input and return a value
* Can be defined on an `async` or non-`async` method at any point in the class hierarchy, but if the decorated method
is overridden, then the override must also be decorated.
* May accept input and return a value
* The method's arguments are the update's arguments.
* May be `async` or non-`async`
* May mutate workflow state, and make calls to other workflow APIs like starting activities, etc.
* Also accepts the `name` and `dynamic` parameters like signals and queries, with the same semantics.
* Also accepts the `name` and `dynamic` parameters like signal, with the same semantics.
* Update handlers may optionally define a validator method by decorating it with `@update_handler_method.validator`.
To reject an update before any events are written to history, throw an exception in a validator. Validators cannot
be `async`, cannot mutate workflow state, and return nothing.
* See [Signal and update handlers](#signal-and-update-handlers) below
* `@workflow.query` - Defines a method as a query
* Should return a value
* Should not be `async`
* Temporal queries should never mutate anything in the workflow or call any calls that would mutate the workflow
* Also accepts the `name` and `dynamic` parameters like signal and update, with the same semantics.

#### Running

Expand Down Expand Up @@ -705,9 +713,15 @@ deterministic:

#### Asyncio Cancellation

Cancellation is done the same way as `asyncio`. Specifically, a task can be requested to be cancelled but does not
necessarily have to respect that cancellation immediately. This also means that `asyncio.shield()` can be used to
protect against cancellation. The following tasks, when cancelled, perform a Temporal cancellation:
Cancellation is done using `asyncio` [task cancellation](https://docs.python.org/3/library/asyncio-task.html#task-cancellation).
This means that tasks are requested to be cancelled but can catch the
[`asyncio.CancelledError`](https://docs.python.org/3/library/asyncio-exceptions.html#asyncio.CancelledError), thus
allowing them to perform some cleanup before allowing the cancellation to proceed (i.e. re-raising the error), or to
deny the cancellation entirely. It also means that
[`asyncio.shield()`](https://docs.python.org/3/library/asyncio-task.html#shielding-from-cancellation) can be used to
protect tasks against cancellation.

The following tasks, when cancelled, perform a Temporal cancellation:

* Activities - when the task executing an activity is cancelled, a cancellation request is sent to the activity
* Child workflows - when the task starting or executing a child workflow is cancelled, a cancellation request is sent to
Expand Down Expand Up @@ -746,17 +760,43 @@ While running in a workflow, in addition to features documented elsewhere, the f
be marked non-retryable or include details as needed.
* Other exceptions that come from activity execution, child execution, cancellation, etc are already instances of
`FailureError` and will fail the workflow when uncaught.
* Update handlers are special: an instance of `temporalio.exceptions.FailureError` raised in an update handler will fail
the update instead of failing the workflow.
* All other exceptions fail the "workflow task" which means the workflow will continually retry until the workflow is
fixed. This is helpful for bad code or other non-predictable exceptions. To actually fail the workflow, use an
`ApplicationError` as mentioned above.

This default can be changed by providing a list of exception types to `workflow_failure_exception_types` when creating a
`Worker` or `failure_exception_types` on the `@workflow.defn` decorator. If a workflow-thrown exception is an instance
of any type in either list, it will fail the workflow instead of the task. This means a value of `[Exception]` will
cause every exception to fail the workflow instead of the task. Also, as a special case, if
of any type in either list, it will fail the workflow (or update) instead of the workflow task. This means a value of
`[Exception]` will cause every exception to fail the workflow instead of the workflow task. Also, as a special case, if
`temporalio.workflow.NondeterminismError` (or any superclass of it) is set, non-deterministic exceptions will fail the
workflow. WARNING: These settings are experimental.

#### Signal and update handlers

Signal and update handlers are defined using decorated methods as shown in the example [above](#definition). Client code
sends signals and updates using `workflow_handle.signal`, `workflow_handle.execute_update`, or
`workflow_handle.start_update`. When the workflow receives one of these requests, it starts an `asyncio.Task` executing
the corresponding handler method with the argument(s) from the request.

The handler methods may be `async def` and can do all the async operations described above (e.g. invoking activities and
child workflows, and waiting on timers and conditions). Notice that this means that handler tasks will be executing
concurrently with respect to each other and the main workflow task. Use
[asyncio.Lock](https://docs.python.org/3/library/asyncio-sync.html#lock) and
[asyncio.Semaphore](https://docs.python.org/3/library/asyncio-sync.html#semaphore) if necessary.

Your main workflow task may finish as a result of successful completion, cancellation, continue-as-new, or failure. You
should ensure that all in-progress signal and update handler tasks have finished before this happens; if you do not, you
will see a warning (the warning can be disabled via the `workflow.signal`/`workflow.update` decorators). One way to
ensure that handler tasks have finished is to wait on the `workflow.all_handlers_finished` condition:
```python
await workflow.wait_condition(workflow.all_handlers_finished)
```

If your main workflow task finishes as a result of cancellation, then any in-progress update handler tasks will be
automatically requested to cancel.

#### External Workflows

* `workflow.get_external_workflow_handle()` inside a workflow returns a handle to interact with another workflow
Expand Down
41 changes: 27 additions & 14 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ def is_completion(command):
command.HasField("complete_workflow_execution")
or command.HasField("continue_as_new_workflow_execution")
or command.HasField("fail_workflow_execution")
or command.HasField("cancel_workflow_execution")
dandavison marked this conversation as resolved.
Show resolved Hide resolved
)

if any(map(is_completion, self._current_completion.successful.commands)):
Expand Down Expand Up @@ -518,9 +517,9 @@ async def run_update() -> None:
f"Update handler for '{job.name}' expected but not found, and there is no dynamic handler. "
f"known updates: [{' '.join(known_updates)}]"
)
self._in_progress_updates[job.id] = HandlerExecution(
job.name, defn.unfinished_policy, job.id
)
self._in_progress_updates[
job.id
].unfinished_policy = defn.unfinished_policy
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are now forced to create the _in_progress_updates entry before this point (when we create the task), but we update the policy from the defn here, since defn is not known until handler coroutine execution time.

args = self._process_handler_args(
job.name,
job.input,
Expand Down Expand Up @@ -571,7 +570,7 @@ async def run_update() -> None:
# All asyncio cancelled errors become Temporal cancelled errors
if isinstance(err, asyncio.CancelledError):
err = temporalio.exceptions.CancelledError(
f"Cancellation raised within update {err}"
f"Cancellation raised within update: {err}"
)
# Read-only issues during validation should fail the task
if isinstance(err, temporalio.workflow.ReadOnlyContextError):
Expand Down Expand Up @@ -606,10 +605,16 @@ async def run_update() -> None:
finally:
self._in_progress_updates.pop(job.id, None)

self.create_task(
task = self.create_task(
run_update(),
name=f"update: {job.name}",
)
self._in_progress_updates[job.id] = HandlerExecution(
job.name,
task,
temporalio.workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON,
job.id,
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are now storing Task in the HandlerExecution dataclass, in order to get hold of unfinished update handler tasks for cancellation.


def _apply_fire_timer(
self, job: temporalio.bridge.proto.workflow_activation.FireTimer
Expand Down Expand Up @@ -1729,20 +1734,20 @@ def _process_signal_job(
signal=job.signal_name, args=args, headers=job.headers
)

task = self.create_task(
self._run_top_level_workflow_function(self._inbound.handle_signal(input)),
name=f"signal: {job.signal_name}",
)
self._handled_signals_seq += 1
id = self._handled_signals_seq
self._in_progress_signals[id] = HandlerExecution(
job.signal_name, defn.unfinished_policy
)

def done_callback(f):
def done_callback(_):
self._in_progress_signals.pop(id, None)

task = self.create_task(
self._run_top_level_workflow_function(self._inbound.handle_signal(input)),
name=f"signal: {job.signal_name}",
)
task.add_done_callback(done_callback)
self._in_progress_signals[id] = HandlerExecution(
job.signal_name, task, defn.unfinished_policy
)
Copy link
Contributor Author

@dandavison dandavison Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't actually do anything with the signal tasks currently, but we add the Task to the HandlerExecution for consistency with update.


def _register_task(
self,
Expand Down Expand Up @@ -1845,6 +1850,13 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
err
):
self._add_command().cancel_workflow_execution.SetInParent()
# Cancel update tasks, so that the update caller receives an
# update failed error. We do not currently cancel signal tasks
# since (a) doing so would require a workflow flag and (b) the
# presence of the update caller gives a strong reason to cancel
# update tasks.
for update_handler in self._in_progress_updates.values():
update_handler.task.cancel()
elif self._is_workflow_failure_exception(err):
# All other failure errors fail the workflow
self._set_workflow_failure(err)
Expand Down Expand Up @@ -2811,6 +2823,7 @@ class HandlerExecution:
"""Information about an execution of a signal or update handler."""

name: str
task: asyncio.Task[None]
unfinished_policy: temporalio.workflow.HandlerUnfinishedPolicy
id: Optional[str] = None

Expand Down
63 changes: 58 additions & 5 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5584,8 +5584,10 @@ class _UnfinishedHandlersOnWorkflowTerminationTest:
async def test_warning_is_issued_on_exit_with_unfinished_handler(
self,
):
assert await self._run_workflow_and_get_warning() == (
warning_emitted = await self._run_workflow_and_get_warning()
assert warning_emitted == (
self.handler_waiting == "-no-wait-all-handlers-finish-"
and self.workflow_termination_type != "-cancellation-"
)

async def _run_workflow_and_get_warning(self) -> bool:
Expand Down Expand Up @@ -5646,13 +5648,22 @@ async def _run_workflow_and_get_warning(self) -> bool:
with pytest.WarningsRecorder() as warnings:
if self.handler_type == "-update-":
assert update_task
if self.handler_waiting == "-wait-all-handlers-finish-":

if self.workflow_termination_type == "-cancellation-":
with pytest.raises(WorkflowUpdateFailedError) as update_err:
await update_task
assert isinstance(update_err.value.cause, CancelledError)
assert (
"the workflow was cancelled"
in str(update_err.value.cause).lower()
)
elif self.handler_waiting == "-wait-all-handlers-finish-":
await update_task
else:
with pytest.raises(RPCError) as update_err:
with pytest.raises(RPCError) as rpc_err:
await update_task
assert update_err.value.status == RPCStatusCode.NOT_FOUND and (
str(update_err.value).lower()
assert rpc_err.value.status == RPCStatusCode.NOT_FOUND and (
str(rpc_err.value).lower()
== "workflow execution already completed"
)

Expand Down Expand Up @@ -6146,3 +6157,45 @@ async def test_workflow_run_sees_workflow_init(client: Client):
task_queue=worker.task_queue,
)
assert workflow_result == "hello, world"


@workflow.defn
class UpdateCancellationWorkflow:
def __init__(self) -> None:
self.non_terminating_operation_has_started = False

@workflow.run
async def run(self) -> None:
await asyncio.Future()

@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)
async def non_terminating_update(self) -> None:
self.non_terminating_operation_has_started = True
await asyncio.Future()

@workflow.update
async def wait_until_non_terminating_operation_has_started(self) -> None:
await workflow.wait_condition(
lambda: self.non_terminating_operation_has_started
)


async def test_update_cancellation(client: Client):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth having a test for doing a cleanup step on cancellation (e.g. in a finally). Can do this in the update or the workflow. This is common in Temporal code and I fear cleanup code in an update with issue a warning.

Copy link
Contributor Author

@dandavison dandavison Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, good point. So to restate what you're saying:

If a workflow cancellation request comes in, and a handler is unfinished, the workflow author should be able to allow the workflow to be cancelled, and to have it gracefully terminate the handler, and not to be criticized by any warning.

That is true today: we don't cancel signals or updates on workflow cancellation, but a user could write a workflow that does, and they would escape the warning, because the handler would not be alive at check time.

But my PR breaks it, by attempting to make the check at workflow cancellation time: it would classify a handler as unfinished when in fact it's about to be gracefully terminated.

The reason my PR does that is that it wanted to continue to warn the workflow author when workflow cancellation causes an update cancellation. And the reason it wants to do that is to maintain consistency with signals: when the workflow is canceled, if we're going to warn on unfinished signal handlers (which we do) then we should warn on unfinished update handlers also.

I think that these desires are mutually incompatible. We have to allow a user to clean up their update gracefully without the warning, as you point out. I see two solutions:

  1. to change things so that we no longer issue unfinished handler warnings for either signal or update, in the case of workflow cancellation.

  2. to automatically cancel signals also (with a workflow logic flag)

There is a strong incentive (caller UX) for automatically canceling update handlers on workflow cancellation. We have never automatically cancelled signal handlers in the Python SDK, and doing so would require a flag as it may be backwards incompatible with replay of existing histories. The inconsistency in cancelling update handlers but not signal handlers would only be a problem if there is a potential for observable side effects.

I'll carry on thinking about it but for now I've updated the PR to stop issuing the unfinished warning on workflow cancellation, for both signal and update.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to change things so that we no longer issue unfinished handler warnings for either signal or update, in the case of workflow cancellation.
[...]
I'll carry on thinking about it but for now I've updated the PR to stop issuing the unfinished warning on workflow cancellation, for both signal and update.

I think we should warn on unfinished handlers upon workflow completion regardless of reason for completion and regardless of reason handlers are unfinished. I think the problem with the PR before was it was warning before cancellation is processed and I think the previous behavior of warning after cancellation/completion is processed is best. So I think the line at #653 (comment) should not be removed.

It does mean it's a pain for users doing "cleanup" in update handlers, but IMO that's an education issue. We have to educate them that they should not exit the primary run function until they have done cleanup in handlers. This is similar to process exiting before an async thing has been given a chance to clean up.

Copy link
Contributor Author

@dandavison dandavison Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're suggesting that we continue to do the check late, and warn on workflow cancellation.

We can't do that, because in the common case we will then warn on unfinished signals (since these are not automatically cancelled, and hence the handlers will still be alive) but not warn on unfinished updates (since these are automatically cancelled, and hence the handlers will be dead).

(I did try to explain that in the comment above #653 (comment), but it is turning out to be a slightly tricky subject! And my comment isn't very clear).

This is why I've switched the PR to no longer warn on cancellation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm from our off-PR discussion, we are still warning on unfinished handler regardless of reason (i.e. cancellation)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, still thinking about the design here. I've put the PR into draft mode for now.

async with new_worker(client, UpdateCancellationWorkflow) as worker:
wf_handle = await client.start_workflow(
UpdateCancellationWorkflow.run,
id=str(uuid.uuid4()),
task_queue=worker.task_queue,
)
# Asynchronously run an update that will never complete
non_terminating_update = asyncio.create_task(
wf_handle.execute_update(UpdateCancellationWorkflow.non_terminating_update)
)
# Wait until we know the update handler has started executing
await wf_handle.execute_update(
UpdateCancellationWorkflow.wait_until_non_terminating_operation_has_started
)
# Cancel the workflow and confirm that update caller sees update failed
await wf_handle.cancel()
with pytest.raises(WorkflowUpdateFailedError):
await non_terminating_update # type: ignore
Loading