diff --git a/setup.cfg b/setup.cfg index 5aafd280bcee..75c0c8c87f10 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,4 @@ [tool:pytest] -testpaths = tests addopts = -rfEs norecursedirs = *.egg-info .git .mypy_cache node_modules .pytest_cache .vscode diff --git a/src/prefect/runner/runner.py b/src/prefect/runner/runner.py index dbdca6ad6fdb..97b250ca42ee 100644 --- a/src/prefect/runner/runner.py +++ b/src/prefect/runner/runner.py @@ -78,9 +78,7 @@ def fast_flow(): ) from prefect.deployments.schedules import FlexibleScheduleList from prefect.events import DeploymentTriggerTypes, TriggerTypes -from prefect.exceptions import ( - Abort, -) +from prefect.exceptions import Abort, ObjectNotFound from prefect.flows import Flow from prefect.logging.loggers import PrefectLogAdapter, flow_run_logger, get_logger from prefect.runner.server import start_webserver @@ -1130,12 +1128,18 @@ async def _run_on_cancellation_hooks( Run the hooks for a flow. """ if state.is_cancelling(): - flow = await load_flow_from_flow_run( - flow_run, client=self._client, storage_base_path=str(self._tmp_dir) - ) - hooks = flow.on_cancellation or [] + try: + flow = await load_flow_from_flow_run( + flow_run, client=self._client, storage_base_path=str(self._tmp_dir) + ) + hooks = flow.on_cancellation or [] - await _run_hooks(hooks, flow_run, flow, state) + await _run_hooks(hooks, flow_run, flow, state) + except ObjectNotFound: + run_logger = self._get_flow_run_logger(flow_run) + run_logger.warning( + f"Runner cannot retrieve flow to execute cancellation hooks for flow run {flow_run.id!r}." + ) async def _run_on_crashed_hooks( self, diff --git a/tests/runner/test_runner.py b/tests/runner/test_runner.py index dd00cb8c4caa..37a4fd844401 100644 --- a/tests/runner/test_runner.py +++ b/tests/runner/test_runner.py @@ -474,6 +474,83 @@ def cancel_flow(sleep_time: int = 100): # check to make sure on_cancellation hook was called assert "This flow was cancelled!" in caplog.text + @pytest.mark.usefixtures("use_hosted_api_server") + async def test_runner_warns_if_unable_to_load_cancellation_hooks( + self, + prefect_client: PrefectClient, + caplog: pytest.LogCaptureFixture, + in_temporary_runner_directory: None, + temp_storage: MockStorage, + ): + runner = Runner(query_seconds=2) + + temp_storage.code = dedent( + """\ + from time import sleep + from prefect import flow + from prefect.logging.loggers import flow_run_logger + def on_cancellation(flow, flow_run, state): + logger = flow_run_logger(flow_run, flow) + logger.info("This flow was cancelled!") + @flow(on_cancellation=[on_cancellation], log_prints=True) + def cancel_flow(sleep_time: int = 100): + sleep(sleep_time) + """ + ) + + deployment_id = await runner.add_flow( + await flow.from_source( + source=temp_storage, entrypoint="flows.py:cancel_flow" + ), + name=__file__, + ) + + async with anyio.create_task_group() as tg: + tg.start_soon(runner.start) + + flow_run = await prefect_client.create_flow_run_from_deployment( + deployment_id=deployment_id + ) + + # Need to wait for polling loop to pick up flow run and + # start execution + while True: + await anyio.sleep(0.5) + flow_run = await prefect_client.read_flow_run(flow_run_id=flow_run.id) + assert flow_run.state + if flow_run.state.is_running(): + break + + await prefect_client.delete_deployment(deployment_id=deployment_id) + + await prefect_client.set_flow_run_state( + flow_run_id=flow_run.id, + state=flow_run.state.copy( + update={"name": "Cancelling", "type": StateType.CANCELLING} + ), + ) + + # Need to wait for polling loop to pick up flow run and then + # finish cancellation + while True: + await anyio.sleep(0.5) + flow_run = await prefect_client.read_flow_run(flow_run_id=flow_run.id) + assert flow_run.state + if flow_run.state.is_cancelled(): + break + + await runner.stop() + tg.cancel_scope.cancel() + + # Cancellation hook should not have been called successfully + # but the flow run should still be cancelled correctly + assert flow_run.state.is_cancelled() + assert "This flow was cancelled!" not in caplog.text + assert ( + "Runner cannot retrieve flow to execute cancellation hooks for flow run" + in caplog.text + ) + @pytest.mark.usefixtures("use_hosted_api_server") async def test_runner_runs_on_crashed_hooks_for_remotely_stored_flows( self,