Skip to content

Commit

Permalink
Fix runtime load from entrypoint (#14665)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Jul 18, 2024
1 parent 45191af commit cc9ddd5
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 134 deletions.
7 changes: 5 additions & 2 deletions src/prefect/flow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,12 @@ def load_flow_and_flow_run(flow_run_id: UUID) -> Tuple[FlowRun, Flow]:

flow_run = client.read_flow_run(flow_run_id)
if entrypoint:
flow = load_flow_from_entrypoint(entrypoint)
# we should not accept a placeholder flow at runtime
flow = load_flow_from_entrypoint(entrypoint, use_placeholder_flow=False)
else:
flow = run_coro_as_sync(load_flow_from_flow_run(flow_run))
flow = run_coro_as_sync(
load_flow_from_flow_run(flow_run, use_placeholder_flow=False)
)

return flow_run, flow

Expand Down
20 changes: 16 additions & 4 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1704,13 +1704,16 @@ def select_flow(

def load_flow_from_entrypoint(
entrypoint: str,
use_placeholder_flow: bool = True,
) -> Flow:
"""
Extract a flow object from a script at an entrypoint by running all of the code in the file.
Args:
entrypoint: a string in the format `<path_to_script>:<flow_func_name>` or a module path
to a flow function
use_placeholder_flow: if True, use a placeholder Flow object if the actual flow object
cannot be loaded from the entrypoint (e.g. dependencies are missing)
Returns:
The flow object from the script
Expand All @@ -1737,8 +1740,10 @@ def load_flow_from_entrypoint(
# drawback of this approach is that we're unable to actually load the
# function, so we create a placeholder flow that will re-raise this
# exception when called.

flow = load_placeholder_flow(entrypoint=entrypoint, raises=exc)
if use_placeholder_flow:
flow = load_placeholder_flow(entrypoint=entrypoint, raises=exc)
else:
raise

if not isinstance(flow, Flow):
raise MissingFlowError(
Expand Down Expand Up @@ -1856,6 +1861,7 @@ async def load_flow_from_flow_run(
flow_run: "FlowRun",
ignore_storage: bool = False,
storage_base_path: Optional[str] = None,
use_placeholder_flow: bool = True,
) -> Flow:
"""
Load a flow from the location/script provided in a deployment's storage document.
Expand All @@ -1882,7 +1888,9 @@ async def load_flow_from_flow_run(
f"Importing flow code from module path {deployment.entrypoint}"
)
flow = await run_sync_in_worker_thread(
load_flow_from_entrypoint, deployment.entrypoint
load_flow_from_entrypoint,
deployment.entrypoint,
use_placeholder_flow=use_placeholder_flow,
)
return flow

Expand Down Expand Up @@ -1924,7 +1932,11 @@ async def load_flow_from_flow_run(
import_path = relative_path_to_current_platform(deployment.entrypoint)
run_logger.debug(f"Importing flow code from '{import_path}'")

flow = await run_sync_in_worker_thread(load_flow_from_entrypoint, str(import_path))
flow = await run_sync_in_worker_thread(
load_flow_from_entrypoint,
str(import_path),
use_placeholder_flow=use_placeholder_flow,
)

return flow

Expand Down
6 changes: 3 additions & 3 deletions src/prefect/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ def goodbye_flow(name):
self.query_seconds = query_seconds or PREFECT_RUNNER_POLL_FREQUENCY.value()
self._prefetch_seconds = prefetch_seconds

self._limiter: Optional[anyio.CapacityLimiter] = anyio.CapacityLimiter(
self.limit
)
self._limiter: Optional[anyio.CapacityLimiter] = None
self._client = get_client()
self._submitting_flow_run_ids = set()
self._cancelling_flow_run_ids = set()
Expand Down Expand Up @@ -1227,6 +1225,8 @@ async def __aenter__(self):
self._client = get_client()
self._tmp_dir.mkdir(parents=True)

self._limiter = anyio.CapacityLimiter(self.limit)

if not hasattr(self, "_loop") or not self._loop:
self._loop = asyncio.get_event_loop()

Expand Down
35 changes: 17 additions & 18 deletions tests/runner/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,29 +607,28 @@ async def test_runner_can_execute_a_single_flow_run(
async def test_runner_respects_set_limit(
self, prefect_client: PrefectClient, caplog
):
runner = Runner(limit=1)
async with Runner(limit=1) as runner:
deployment_id = await (await dummy_flow_1.to_deployment(__file__)).apply()

deployment_id = await (await dummy_flow_1.to_deployment(__file__)).apply()

good_run = await prefect_client.create_flow_run_from_deployment(
deployment_id=deployment_id
)
bad_run = await prefect_client.create_flow_run_from_deployment(
deployment_id=deployment_id
)
good_run = await prefect_client.create_flow_run_from_deployment(
deployment_id=deployment_id
)
bad_run = await prefect_client.create_flow_run_from_deployment(
deployment_id=deployment_id
)

runner._acquire_limit_slot(good_run.id)
await runner.execute_flow_run(bad_run.id)
assert "run limit reached" in caplog.text
runner._acquire_limit_slot(good_run.id)
await runner.execute_flow_run(bad_run.id)
assert "run limit reached" in caplog.text

flow_run = await prefect_client.read_flow_run(flow_run_id=bad_run.id)
assert flow_run.state.is_scheduled()
flow_run = await prefect_client.read_flow_run(flow_run_id=bad_run.id)
assert flow_run.state.is_scheduled()

runner._release_limit_slot(good_run.id)
await runner.execute_flow_run(bad_run.id)
runner._release_limit_slot(good_run.id)
await runner.execute_flow_run(bad_run.id)

flow_run = await prefect_client.read_flow_run(flow_run_id=bad_run.id)
assert flow_run.state.is_completed()
flow_run = await prefect_client.read_flow_run(flow_run_id=bad_run.id)
assert flow_run.state.is_completed()

async def test_handles_spaces_in_sys_executable(self, monkeypatch, prefect_client):
"""
Expand Down
38 changes: 20 additions & 18 deletions tests/runner/test_webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def tmp_runner_settings():
yield


@pytest.fixture(scope="function")
@pytest.fixture
async def runner() -> Runner:
return Runner()

Expand Down Expand Up @@ -146,22 +146,23 @@ async def test_runners_deployment_run_route_execs_flow_run(self, runner: Runner)
mock_get_client.return_value.__aenter__.return_value = mock_client
mock_get_client.return_value.__aexit__.return_value = None

deployment_id = await create_deployment(runner, simple_flow)
webserver = await build_server(runner)
client = TestClient(webserver)
async with runner:
deployment_id = await create_deployment(runner, simple_flow)
webserver = await build_server(runner)
client = TestClient(webserver)

with mock.patch(
"prefect.runner.server.get_client", new=mock_get_client
), mock.patch.object(runner, "execute_in_background"):
with client:
response = client.post(f"/deployment/{deployment_id}/run")
assert response.status_code == 201, response.json()
flow_run_id = response.json()["flow_run_id"]
assert flow_run_id == mock_flow_run_id
assert isinstance(uuid.UUID(flow_run_id), uuid.UUID)
mock_client.create_flow_run_from_deployment.assert_called_once_with(
deployment_id=uuid.UUID(deployment_id), parameters={}
)
with mock.patch(
"prefect.runner.server.get_client", new=mock_get_client
), mock.patch.object(runner, "execute_in_background"):
with client:
response = client.post(f"/deployment/{deployment_id}/run")
assert response.status_code == 201, response.json()
flow_run_id = response.json()["flow_run_id"]
assert flow_run_id == mock_flow_run_id
assert isinstance(uuid.UUID(flow_run_id), uuid.UUID)
mock_client.create_flow_run_from_deployment.assert_called_once_with(
deployment_id=uuid.UUID(deployment_id), parameters={}
)


class TestWebserverFlowRoutes:
Expand Down Expand Up @@ -192,8 +193,9 @@ async def test_non_flow_raises_a_404(
flow_file: str,
flow_name: str,
):
await create_deployment(runner, simple_flow)
webserver = await build_server(runner)
async with runner:
await create_deployment(runner, simple_flow)
webserver = await build_server(runner)
client = TestClient(webserver)

response = client.post(
Expand Down
Loading

0 comments on commit cc9ddd5

Please sign in to comment.