Skip to content

Commit

Permalink
DEV: logging to investigate flaky test
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Jun 26, 2024
1 parent 97fc45f commit 54d9ac9
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ def activate(
i = 0
while i < len(self._current_completion.successful.commands):
command = self._current_completion.successful.commands[i]
print(
f"command: {command}, {command.HasField("cancel_workflow_execution")}"
)
if not seen_completion:
seen_completion = (
command.HasField("complete_workflow_execution")
Expand All @@ -422,13 +425,18 @@ def activate(
continue
i += 1

print(
f"seen_completion: {seen_completion} {self._in_progress_updates} {self._in_progress_signals}"
)

if seen_completion:
self._warn_if_unfinished_handlers()
return self._current_completion

def _apply(
self, job: temporalio.bridge.proto.workflow_activation.WorkflowActivationJob
) -> None:
print(f"applying job: {job}")
if job.HasField("cancel_workflow"):
self._apply_cancel_workflow(job.cancel_workflow)
elif job.HasField("do_update"):
Expand Down Expand Up @@ -467,11 +475,15 @@ def _apply(
self._apply_update_random_seed(job.update_random_seed)
else:
raise RuntimeError(f"Unrecognized job: {job.WhichOneof('variant')}")
print(
f"after applying job: {self._in_progress_updates} {self._in_progress_signals}"
)

def _apply_cancel_workflow(
self, job: temporalio.bridge.proto.workflow_activation.CancelWorkflow
) -> None:
self._cancel_requested = True
print("cancel_requested")
# TODO(cretz): Details or cancel message or whatever?
if self._primary_task:
# The primary task may not have started yet and we want to give the
Expand Down Expand Up @@ -1627,6 +1639,7 @@ def warnable(handler_executions: Iterable[HandlerExecution]):

warnable_updates = warnable(self._in_progress_updates.values())
if warnable_updates:
print(">>>>>> Issuing update warning")
warnings.warn(
temporalio.workflow.UnfinishedUpdateHandlersWarning(
_make_unfinished_update_handler_message(warnable_updates)
Expand All @@ -1635,6 +1648,7 @@ def warnable(handler_executions: Iterable[HandlerExecution]):

warnable_signals = warnable(self._in_progress_signals.values())
if warnable_signals:
print(">>>>>> Issuing signal warning")
warnings.warn(
temporalio.workflow.UnfinishedSignalHandlersWarning(
_make_unfinished_signal_handler_message(warnable_signals)
Expand Down Expand Up @@ -1808,6 +1822,7 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
err
):
self._add_command().cancel_workflow_execution.SetInParent()
print(">>>>>>>> added command")
elif self._is_workflow_failure_exception(err):
# All other failure errors fail the workflow
self._set_workflow_failure(err)
Expand Down

0 comments on commit 54d9ac9

Please sign in to comment.