Skip to content

Commit

Permalink
Skip on cancellation hooks if runner cant load flow (#13669)
Browse files Browse the repository at this point in the history
  • Loading branch information
zangell44 authored May 30, 2024
1 parent 9d7373e commit 4813835
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 9 deletions.
1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
[tool:pytest]
testpaths = tests
addopts = -rfEs
norecursedirs = *.egg-info .git .mypy_cache node_modules .pytest_cache .vscode

Expand Down
20 changes: 12 additions & 8 deletions src/prefect/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ def fast_flow():
)
from prefect.deployments.schedules import FlexibleScheduleList
from prefect.events import DeploymentTriggerTypes, TriggerTypes
from prefect.exceptions import (
Abort,
)
from prefect.exceptions import Abort, ObjectNotFound
from prefect.flows import Flow
from prefect.logging.loggers import PrefectLogAdapter, flow_run_logger, get_logger
from prefect.runner.server import start_webserver
Expand Down Expand Up @@ -1130,12 +1128,18 @@ async def _run_on_cancellation_hooks(
Run the hooks for a flow.
"""
if state.is_cancelling():
flow = await load_flow_from_flow_run(
flow_run, client=self._client, storage_base_path=str(self._tmp_dir)
)
hooks = flow.on_cancellation or []
try:
flow = await load_flow_from_flow_run(
flow_run, client=self._client, storage_base_path=str(self._tmp_dir)
)
hooks = flow.on_cancellation or []

await _run_hooks(hooks, flow_run, flow, state)
await _run_hooks(hooks, flow_run, flow, state)
except ObjectNotFound:
run_logger = self._get_flow_run_logger(flow_run)
run_logger.warning(
f"Runner cannot retrieve flow to execute cancellation hooks for flow run {flow_run.id!r}."
)

async def _run_on_crashed_hooks(
self,
Expand Down
77 changes: 77 additions & 0 deletions tests/runner/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,83 @@ def cancel_flow(sleep_time: int = 100):
# check to make sure on_cancellation hook was called
assert "This flow was cancelled!" in caplog.text

@pytest.mark.usefixtures("use_hosted_api_server")
async def test_runner_warns_if_unable_to_load_cancellation_hooks(
self,
prefect_client: PrefectClient,
caplog: pytest.LogCaptureFixture,
in_temporary_runner_directory: None,
temp_storage: MockStorage,
):
runner = Runner(query_seconds=2)

temp_storage.code = dedent(
"""\
from time import sleep
from prefect import flow
from prefect.logging.loggers import flow_run_logger
def on_cancellation(flow, flow_run, state):
logger = flow_run_logger(flow_run, flow)
logger.info("This flow was cancelled!")
@flow(on_cancellation=[on_cancellation], log_prints=True)
def cancel_flow(sleep_time: int = 100):
sleep(sleep_time)
"""
)

deployment_id = await runner.add_flow(
await flow.from_source(
source=temp_storage, entrypoint="flows.py:cancel_flow"
),
name=__file__,
)

async with anyio.create_task_group() as tg:
tg.start_soon(runner.start)

flow_run = await prefect_client.create_flow_run_from_deployment(
deployment_id=deployment_id
)

# Need to wait for polling loop to pick up flow run and
# start execution
while True:
await anyio.sleep(0.5)
flow_run = await prefect_client.read_flow_run(flow_run_id=flow_run.id)
assert flow_run.state
if flow_run.state.is_running():
break

await prefect_client.delete_deployment(deployment_id=deployment_id)

await prefect_client.set_flow_run_state(
flow_run_id=flow_run.id,
state=flow_run.state.copy(
update={"name": "Cancelling", "type": StateType.CANCELLING}
),
)

# Need to wait for polling loop to pick up flow run and then
# finish cancellation
while True:
await anyio.sleep(0.5)
flow_run = await prefect_client.read_flow_run(flow_run_id=flow_run.id)
assert flow_run.state
if flow_run.state.is_cancelled():
break

await runner.stop()
tg.cancel_scope.cancel()

# Cancellation hook should not have been called successfully
# but the flow run should still be cancelled correctly
assert flow_run.state.is_cancelled()
assert "This flow was cancelled!" not in caplog.text
assert (
"Runner cannot retrieve flow to execute cancellation hooks for flow run"
in caplog.text
)

@pytest.mark.usefixtures("use_hosted_api_server")
async def test_runner_runs_on_crashed_hooks_for_remotely_stored_flows(
self,
Expand Down

0 comments on commit 4813835

Please sign in to comment.