Skip to content

Commit

Permalink
Remove PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS flag (#14440
Browse files Browse the repository at this point in the history
)
  • Loading branch information
bunchesofdonald authored Jul 11, 2024
1 parent a1ca74d commit 6d01d2a
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 83 deletions.
5 changes: 0 additions & 5 deletions docs/3.0rc/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -22279,11 +22279,6 @@
"title": "Prefect Task Scheduling Pending Task Timeout",
"default": "PT0S"
},
"PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS": {
"type": "boolean",
"title": "Prefect Experimental Enable Extra Runner Endpoints",
"default": false
},
"PREFECT_EXPERIMENTAL_DISABLE_SYNC_COMPAT": {
"type": "boolean",
"title": "Prefect Experimental Disable Sync Compat",
Expand Down
40 changes: 19 additions & 21 deletions src/prefect/runner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
inject_schemas_into_openapi,
)
from prefect.settings import (
PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS,
PREFECT_RUNNER_POLL_FREQUENCY,
PREFECT_RUNNER_SERVER_HOST,
PREFECT_RUNNER_SERVER_LOG_LEVEL,
Expand Down Expand Up @@ -261,29 +260,28 @@ async def build_server(runner: "Runner") -> FastAPI:
router.add_api_route("/shutdown", shutdown(runner=runner), methods=["POST"])
webserver.include_router(router)

if PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS.value():
deployments_router, deployment_schemas = await get_deployment_router(runner)
webserver.include_router(deployments_router)

subflow_schemas = await get_subflow_schemas(runner)
webserver.add_api_route(
"/flow/run",
_build_generic_endpoint_for_flows(runner=runner, schemas=subflow_schemas),
methods=["POST"],
name="Run flow in background",
description="Trigger any flow run as a background task on the runner.",
summary="Run flow",
)

def customize_openapi():
if webserver.openapi_schema:
return webserver.openapi_schema
deployments_router, deployment_schemas = await get_deployment_router(runner)
webserver.include_router(deployments_router)

subflow_schemas = await get_subflow_schemas(runner)
webserver.add_api_route(
"/flow/run",
_build_generic_endpoint_for_flows(runner=runner, schemas=subflow_schemas),
methods=["POST"],
name="Run flow in background",
description="Trigger any flow run as a background task on the runner.",
summary="Run flow",
)

openapi_schema = inject_schemas_into_openapi(webserver, deployment_schemas)
webserver.openapi_schema = openapi_schema
def customize_openapi():
if webserver.openapi_schema:
return webserver.openapi_schema

webserver.openapi = customize_openapi
openapi_schema = inject_schemas_into_openapi(webserver, deployment_schemas)
webserver.openapi_schema = openapi_schema
return webserver.openapi_schema

webserver.openapi = customize_openapi

return webserver

Expand Down
8 changes: 0 additions & 8 deletions src/prefect/runner/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from prefect.flows import Flow
from prefect.logging import get_logger
from prefect.settings import (
PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS,
PREFECT_RUNNER_PROCESS_LIMIT,
PREFECT_RUNNER_SERVER_HOST,
PREFECT_RUNNER_SERVER_PORT,
Expand Down Expand Up @@ -131,13 +130,6 @@ async def submit_to_runner(
"The `submit_to_runner` utility only supports submitting flows and tasks."
)

if not PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS.value():
raise ValueError(
"The `submit_to_runner` utility requires the `Runner` webserver to be"
" running and built with extra endpoints enabled. To enable this, set the"
" `PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS` setting to `True`."
)

parameters = parameters or {}
if isinstance(parameters, List):
return_single = False
Expand Down
5 changes: 0 additions & 5 deletions src/prefect/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1437,11 +1437,6 @@ def default_cloud_ui_url(settings, value):
PENDING for a while is a sign that the task worker may have crashed.
"""

PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS = Setting(bool, default=False)
"""
Whether or not to enable experimental worker webserver endpoints.
"""

PREFECT_EXPERIMENTAL_DISABLE_SYNC_COMPAT = Setting(bool, default=False)
"""
Whether or not to disable the sync_compatible decorator utility.
Expand Down
16 changes: 0 additions & 16 deletions tests/runner/test_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from prefect.client.schemas.objects import FlowRun
from prefect.runner import submit_to_runner
from prefect.settings import (
PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS,
PREFECT_RUNNER_SERVER_ENABLE,
temporary_settings,
)
Expand Down Expand Up @@ -61,26 +60,11 @@ def runner_settings():
with temporary_settings(
{
PREFECT_RUNNER_SERVER_ENABLE: True,
PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS: True,
}
):
yield


async def test_submission_raises_if_extra_endpoints_not_enabled(mock_webserver):
with temporary_settings(
{PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS: False}
):
with pytest.raises(
ValueError,
match=(
"The `submit_to_runner` utility requires the `Runner` webserver to be"
" running and built with extra endpoints enabled."
),
):
await submit_to_runner(identity, {"whatever": 42})


@pytest.mark.parametrize("prefect_callable", [identity, async_identity])
def test_submit_to_runner_happy_path_sync_context(mock_webserver, prefect_callable):
@flow
Expand Down
28 changes: 0 additions & 28 deletions tests/runner/test_webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from prefect.runner import Runner
from prefect.runner.server import build_server
from prefect.settings import (
PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS,
PREFECT_RUNNER_SERVER_HOST,
PREFECT_RUNNER_SERVER_PORT,
temporary_settings,
Expand Down Expand Up @@ -48,7 +47,6 @@ def a_non_flow_function():
def tmp_runner_settings():
with temporary_settings(
updates={
PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS: True,
PREFECT_RUNNER_SERVER_HOST: "0.0.0.0",
PREFECT_RUNNER_SERVER_PORT: 0,
}
Expand Down Expand Up @@ -82,21 +80,6 @@ async def test_webserver_settings_are_respected(self, runner: Runner):


class TestWebserverDeploymentRoutes:
async def test_deployment_router_not_added_if_experimental_flag_is_false(
self,
runner: Runner,
):
with temporary_settings(
updates={PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS: False}
):
webserver = await build_server(runner)
deployment_routes = [
r
for r in webserver.routes
if r.path.startswith("/deployment") and r.path.endswith("/run")
]
assert len(deployment_routes) == 0

async def test_runners_deployment_run_routes_exist(self, runner: Runner):
deployment_ids = [
await create_deployment(runner, simple_flow) for _ in range(3)
Expand Down Expand Up @@ -176,17 +159,6 @@ async def test_runners_deployment_run_route_execs_flow_run(self, runner: Runner)


class TestWebserverFlowRoutes:
async def test_flow_router_not_added_if_experimental_flag_is_false(
self,
runner: Runner,
):
with temporary_settings(
updates={PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS: False}
):
webserver = await build_server(runner)
flow_routes = [r for r in webserver.routes if r.path == "/flow/run"]
assert len(flow_routes) == 0

async def test_flow_router_runs_managed_flow(self, runner: Runner):
await create_deployment(runner, simple_flow)
webserver = await build_server(runner)
Expand Down

0 comments on commit 6d01d2a

Please sign in to comment.