Skip to content

Commit

Permalink
New API to wait for handler executions to complete and warnings on un…
Browse files Browse the repository at this point in the history
…finished handler executions (#556)

* Implement warning on unfinished signals and updates
* Implement all_handlers_finished
  • Loading branch information
dandavison authored Jun 26, 2024
1 parent 2331aa4 commit 2c1ac54
Show file tree
Hide file tree
Showing 4 changed files with 570 additions and 27 deletions.
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,19 @@ env = { TEMPORAL_INTEGRATION_TEST = "1" }
cmd = "pip uninstall temporalio -y"

[tool.pytest.ini_options]
addopts = "-p no:warnings"
asyncio_mode = "auto"
log_cli = true
log_cli_level = "INFO"
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
testpaths = ["tests"]
timeout = 600
timeout_func_only = true
filterwarnings = [
"error::temporalio.workflow.UnfinishedUpdateHandlersWarning",
"error::temporalio.workflow.UnfinishedSignalHandlersWarning",
"ignore::pytest.PytestDeprecationWarning",
"ignore::DeprecationWarning",
]

[tool.cibuildwheel]
# We only want the 3.8 64-bit build of each type. However, due to
Expand Down
112 changes: 110 additions & 2 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import collections
import contextvars
import inspect
import json
import logging
import random
import sys
import traceback
import typing
import warnings
from abc import ABC, abstractmethod
from contextlib import contextmanager
Expand All @@ -25,6 +25,7 @@
Dict,
Generator,
Generic,
Iterable,
Iterator,
List,
Mapping,
Expand Down Expand Up @@ -240,6 +241,14 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
self._queries = dict(self._defn.queries)
self._updates = dict(self._defn.updates)

# We record in-progress signals and updates in order to support waiting for handlers to
# finish, and issuing warnings when the workflow exits with unfinished handlers. Since
# signals lack a unique per-invocation identifier, we introduce a sequence number for the
# purpose.
self._handled_signals_seq = 0
self._in_progress_signals: Dict[int, HandlerExecution] = {}
self._in_progress_updates: Dict[str, HandlerExecution] = {}

# Add stack trace handler
# TODO(cretz): Is it ok that this can be forcefully overridden by the
# workflow author? They could technically override in interceptor
Expand Down Expand Up @@ -406,12 +415,15 @@ def activate(
command.HasField("complete_workflow_execution")
or command.HasField("continue_as_new_workflow_execution")
or command.HasField("fail_workflow_execution")
or command.HasField("cancel_workflow_execution")
)
elif not command.HasField("respond_to_query"):
del self._current_completion.successful.commands[i]
continue
i += 1

if seen_completion:
self._warn_if_unfinished_handlers()
return self._current_completion

def _apply(
Expand Down Expand Up @@ -490,6 +502,9 @@ async def run_update() -> None:
f"Update handler for '{job.name}' expected but not found, and there is no dynamic handler. "
f"known updates: [{' '.join(known_updates)}]"
)
self._in_progress_updates[job.id] = HandlerExecution(
job.name, defn.unfinished_policy, job.id
)
args = self._process_handler_args(
job.name,
job.input,
Expand Down Expand Up @@ -572,6 +587,8 @@ async def run_update() -> None:
)
return
raise
finally:
self._in_progress_updates.pop(job.id, None)

self.create_task(
run_update(),
Expand Down Expand Up @@ -869,6 +886,9 @@ def _apply_update_random_seed(
#### _Runtime direct workflow call overrides ####
# These are in alphabetical order and all start with "workflow_".

def workflow_all_handlers_finished(self) -> bool:
return not self._in_progress_updates and not self._in_progress_signals

def workflow_continue_as_new(
self,
*args: Any,
Expand Down Expand Up @@ -1596,6 +1616,31 @@ def _is_workflow_failure_exception(self, err: BaseException) -> bool:
)
)

def _warn_if_unfinished_handlers(self) -> None:
def warnable(handler_executions: Iterable[HandlerExecution]):
return [
ex
for ex in handler_executions
if ex.unfinished_policy
== temporalio.workflow.HandlerUnfinishedPolicy.WARN_AND_ABANDON
]

warnable_updates = warnable(self._in_progress_updates.values())
if warnable_updates:
warnings.warn(
temporalio.workflow.UnfinishedUpdateHandlersWarning(
_make_unfinished_update_handler_message(warnable_updates)
)
)

warnable_signals = warnable(self._in_progress_signals.values())
if warnable_signals:
warnings.warn(
temporalio.workflow.UnfinishedSignalHandlersWarning(
_make_unfinished_signal_handler_message(warnable_signals)
)
)

def _next_seq(self, type: str) -> int:
seq = self._curr_seqs.get(type, 0) + 1
self._curr_seqs[type] = seq
Expand Down Expand Up @@ -1646,10 +1691,21 @@ def _process_signal_job(
input = HandleSignalInput(
signal=job.signal_name, args=args, headers=job.headers
)
self.create_task(

self._handled_signals_seq += 1
id = self._handled_signals_seq
self._in_progress_signals[id] = HandlerExecution(
job.signal_name, defn.unfinished_policy
)

def done_callback(f):
self._in_progress_signals.pop(id, None)

task = self.create_task(
self._run_top_level_workflow_function(self._inbound.handle_signal(input)),
name=f"signal: {job.signal_name}",
)
task.add_done_callback(done_callback)

def _register_task(
self,
Expand Down Expand Up @@ -2686,3 +2742,55 @@ def set(

class _WorkflowBeingEvictedError(BaseException):
pass


@dataclass
class HandlerExecution:
"""Information about an execution of a signal or update handler."""

name: str
unfinished_policy: temporalio.workflow.HandlerUnfinishedPolicy
id: Optional[str] = None


def _make_unfinished_update_handler_message(
handler_executions: List[HandlerExecution],
) -> str:
message = """
Workflow finished while update handlers are still running. This may have interrupted work that the
update handler was doing, and the client that sent the update will receive a 'workflow execution
already completed' RPCError instead of the update result. You can wait for all update and signal
handlers to complete by using `await workflow.wait_condition(lambda:
workflow.all_handlers_finished())`. Alternatively, if both you and the clients sending the update
are okay with interrupting running handlers when the workflow finishes, and causing clients to
receive errors, then you can disable this warning via the update handler decorator:
`@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`.
""".replace(
"\n", " "
).strip()
return (
f"{message} The following updates were unfinished (and warnings were not disabled for their handler): "
+ json.dumps([{"name": ex.name, "id": ex.id} for ex in handler_executions])
)


def _make_unfinished_signal_handler_message(
handler_executions: List[HandlerExecution],
) -> str:
message = """
Workflow finished while signal handlers are still running. This may have interrupted work that the
signal handler was doing. You can wait for all update and signal handlers to complete by using
`await workflow.wait_condition(lambda: workflow.all_handlers_finished())`. Alternatively, if both
you and the clients sending the signal are okay with interrupting running handlers when the workflow
finishes, and causing clients to receive errors, then you can disable this warning via the signal
handler decorator: `@workflow.signal(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`.
""".replace(
"\n", " "
).strip()
names = collections.Counter(ex.name for ex in handler_executions)
return (
f"{message} The following signals were unfinished (and warnings were not disabled for their handler): "
+ json.dumps(
[{"name": name, "count": count} for name, count in names.most_common()]
)
)
Loading

0 comments on commit 2c1ac54

Please sign in to comment.