Skip to content

Commit

Permalink
Emit unfinished handler warnings separately for cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Sep 27, 2024
1 parent 6d6fd7f commit 6b4d5ca
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
12 changes: 9 additions & 3 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,15 +429,20 @@ def activate(
f"Failed converting activation exception: {inner_err}"
)

def is_completion(command):
def is_non_cancellation_completion(command):
return (
command.HasField("complete_workflow_execution")
or command.HasField("continue_as_new_workflow_execution")
or command.HasField("fail_workflow_execution")
or command.HasField("cancel_workflow_execution")
)

if any(map(is_completion, self._current_completion.successful.commands)):
# We do also warn in the case of workflow cancellation, but this is done
# when handling the workflow cancellation, since we also cancel update
# handlers at that time.
if any(
is_non_cancellation_completion(c)
for c in self._current_completion.successful.commands
):
self._warn_if_unfinished_handlers()

return self._current_completion
Expand Down Expand Up @@ -1851,6 +1856,7 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
err
):
self._add_command().cancel_workflow_execution.SetInParent()
self._warn_if_unfinished_handlers()
# Cancel update tasks, so that the update caller receives an
# update failed error. We do not currently cancel signal tasks
# since (a) doing so would require a workflow flag and (b) the
Expand Down
13 changes: 10 additions & 3 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5584,9 +5584,16 @@ class _UnfinishedHandlersOnWorkflowTerminationTest:
async def test_warning_is_issued_on_exit_with_unfinished_handler(
self,
):
assert await self._run_workflow_and_get_warning() == (
self.handler_waiting == "-no-wait-all-handlers-finish-"
)
warning_emitted = await self._run_workflow_and_get_warning()
if self.workflow_termination_type == "-cancellation-":
# All paths through this test for which the workflow is cencalled result
# in the warning being emitted.
assert warning_emitted
else:
# Otherwise, the warning is emitted iff the workflow does not wait for handlers to finish.
assert warning_emitted == (
self.handler_waiting == "-no-wait-all-handlers-finish-"
)

async def _run_workflow_and_get_warning(self) -> bool:
workflow_id = f"wf-{uuid.uuid4()}"
Expand Down

0 comments on commit 6b4d5ca

Please sign in to comment.