Skip to content

Commit

Permalink
Bump anyio on Prefect 2.x (#14599)
Browse files Browse the repository at this point in the history
  • Loading branch information
abrookins authored Aug 1, 2024
1 parent e3cf416 commit 067cbc6
Show file tree
Hide file tree
Showing 17 changed files with 223 additions and 135 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,24 @@ jobs:
- prefect-version: "2.13"
server-incompatible: true
server-disable-csrf: true
extra_docker_run_options: '--env EXTRA_PIP_PACKAGES="prefect-kubernetes<0.4"'
- prefect-version: "2.14"
server-incompatible: true
server-disable-csrf: true
extra_docker_run_options: '--env EXTRA_PIP_PACKAGES="prefect-kubernetes<0.4"'
- prefect-version: "2.15"
server-incompatible: true
server-disable-csrf: true
extra_docker_run_options: '--env EXTRA_PIP_PACKAGES="prefect-kubernetes<0.4"'
- prefect-version: "2.16"
server-incompatible: false
extra_docker_run_options: '--env EXTRA_PIP_PACKAGES="prefect-kubernetes<0.4"'
- prefect-version: "2.17"
server-incompatible: false
extra_docker_run_options: '--env EXTRA_PIP_PACKAGES="prefect-kubernetes<0.4"'
- prefect-version: "2.18"
server-incompatible: false
extra_docker_run_options: '--env EXTRA_PIP_PACKAGES="prefect-kubernetes<0.4"'

steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion requirements-client.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
anyio >= 3.7.1, < 4.0.0
anyio >= 4.4.0, < 5.0.0
asgi-lifespan >= 1.0, < 3.0
cachetools >= 5.3, < 6.0
cloudpickle >= 2.0, < 4.0
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ pytz >= 2021.1, < 2025
readchar >= 4.0.0, < 5.0.0
sqlalchemy[asyncio] >= 1.4.22, != 1.4.33, < 3.0.0
typer >= 0.12.0, != 0.12.2, < 0.13.0
exceptiongroup >= 1.2.1
2 changes: 1 addition & 1 deletion scripts/wait-for-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@


async def main(timeout):
async with anyio.move_on_after(timeout):
with anyio.move_on_after(timeout):
print("Retrieving client...")
async with get_client() as client:
print("Connecting", end="")
Expand Down
49 changes: 28 additions & 21 deletions src/prefect/cli/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import os
from asyncio import CancelledError
from functools import partial
from typing import List, Optional
from uuid import UUID
Expand All @@ -18,6 +19,7 @@
from prefect.client import get_client
from prefect.client.schemas.filters import WorkQueueFilter, WorkQueueFilterName
from prefect.exceptions import ObjectNotFound
from prefect.logging import get_logger
from prefect.settings import (
PREFECT_AGENT_PREFETCH_SECONDS,
PREFECT_AGENT_QUERY_INTERVAL,
Expand All @@ -26,6 +28,8 @@
from prefect.utilities.processutils import setup_signal_handlers_agent
from prefect.utilities.services import critical_service_loop

logger = get_logger(__name__)

agent_app = PrefectTyper(
name="agent",
help="Commands for starting and interacting with agent processes.",
Expand Down Expand Up @@ -219,30 +223,33 @@ async def start(
f"queue(s): {', '.join(work_queues)}..."
)

async with anyio.create_task_group() as tg:
tg.start_soon(
partial(
critical_service_loop,
agent.get_and_submit_flow_runs,
PREFECT_AGENT_QUERY_INTERVAL.value(),
printer=app.console.print,
run_once=run_once,
jitter_range=0.3,
backoff=4, # Up to ~1 minute interval during backoff
try:
async with anyio.create_task_group() as tg:
tg.start_soon(
partial(
critical_service_loop,
agent.get_and_submit_flow_runs,
PREFECT_AGENT_QUERY_INTERVAL.value(),
printer=app.console.print,
run_once=run_once,
jitter_range=0.3,
backoff=4, # Up to ~1 minute interval during backoff
)
)
)

tg.start_soon(
partial(
critical_service_loop,
agent.check_for_cancelled_flow_runs,
PREFECT_AGENT_QUERY_INTERVAL.value() * 2,
printer=app.console.print,
run_once=run_once,
jitter_range=0.3,
backoff=4,
tg.start_soon(
partial(
critical_service_loop,
agent.check_for_cancelled_flow_runs,
PREFECT_AGENT_QUERY_INTERVAL.value() * 2,
printer=app.console.print,
run_once=run_once,
jitter_range=0.3,
backoff=4,
)
)
)
except CancelledError:
logger.debug("Agent task group cancelled")

app.console.print("Agent stopped!")

Expand Down
74 changes: 39 additions & 35 deletions src/prefect/cli/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import os
import textwrap
from asyncio import CancelledError
from functools import partial

import anyio
Expand Down Expand Up @@ -115,44 +116,47 @@ async def start(

base_url = f"http://{host}:{port}"

async with anyio.create_task_group() as tg:
app.console.print(generate_welcome_blurb(base_url, ui_enabled=ui))
app.console.print("\n")

server_process_id = await tg.start(
partial(
run_process,
command=[
get_sys_executable(),
"-m",
"uvicorn",
"--app-dir",
# quote wrapping needed for windows paths with spaces
f'"{prefect.__module_path__.parent}"',
"--factory",
"prefect.server.api.server:create_app",
"--host",
str(host),
"--port",
str(port),
"--timeout-keep-alive",
str(keep_alive_timeout),
],
env=server_env,
stream_output=True,
try:
async with anyio.create_task_group() as tg:
app.console.print(generate_welcome_blurb(base_url, ui_enabled=ui))
app.console.print("\n")

server_process_id = await tg.start(
partial(
run_process,
command=[
get_sys_executable(),
"-m",
"uvicorn",
"--app-dir",
# quote wrapping needed for windows paths with spaces
f'"{prefect.__module_path__.parent}"',
"--factory",
"prefect.server.api.server:create_app",
"--host",
str(host),
"--port",
str(port),
"--timeout-keep-alive",
str(keep_alive_timeout),
],
env=server_env,
stream_output=True,
)
)
)

# Explicitly handle the interrupt signal here, as it will allow us to
# cleanly stop the uvicorn server. Failing to do that may cause a
# large amount of anyio error traces on the terminal, because the
# SIGINT is handled by Typer/Click in this process (the parent process)
# and will start shutting down subprocesses:
# https://github.com/PrefectHQ/server/issues/2475
# Explicitly handle the interrupt signal here, as it will allow us to
# cleanly stop the uvicorn server. Failing to do that may cause a
# large amount of anyio error traces on the terminal, because the
# SIGINT is handled by Typer/Click in this process (the parent process)
# and will start shutting down subprocesses:
# https://github.com/PrefectHQ/server/issues/2475

setup_signal_handlers_server(
server_process_id, "the Prefect server", app.console.print
)
setup_signal_handlers_server(
server_process_id, "the Prefect server", app.console.print
)
except CancelledError:
logger.debug("Server task group cancelled")

app.console.print("Server stopped!")

Expand Down
109 changes: 58 additions & 51 deletions src/prefect/cli/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import threading
from asyncio import CancelledError
from enum import Enum
from functools import partial
from typing import List, Optional, Type
Expand All @@ -16,6 +17,7 @@
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import WorkQueueFilter, WorkQueueFilterName
from prefect.exceptions import ObjectNotFound
from prefect.logging import get_logger
from prefect.plugins import load_prefect_collections
from prefect.settings import (
PREFECT_WORKER_HEARTBEAT_SECONDS,
Expand All @@ -32,6 +34,8 @@
from prefect.workers.base import BaseWorker
from prefect.workers.server import start_healthcheck_server

logger = get_logger(__name__)

worker_app = PrefectTyper(
name="worker", help="Commands for starting and interacting with workers."
)
Expand Down Expand Up @@ -171,61 +175,64 @@ async def start(
base_job_template=template_contents,
) as worker:
app.console.print(f"Worker {worker.name!r} started!", style="green")
async with anyio.create_task_group() as tg:
# wait for an initial heartbeat to configure the worker
await worker.sync_with_backend()
# schedule the scheduled flow run polling loop
tg.start_soon(
partial(
critical_service_loop,
workload=worker.get_and_submit_flow_runs,
interval=PREFECT_WORKER_QUERY_SECONDS.value(),
run_once=run_once,
printer=app.console.print,
jitter_range=0.3,
backoff=4, # Up to ~1 minute interval during backoff
try:
async with anyio.create_task_group() as tg:
# wait for an initial heartbeat to configure the worker
await worker.sync_with_backend()
# schedule the scheduled flow run polling loop
tg.start_soon(
partial(
critical_service_loop,
workload=worker.get_and_submit_flow_runs,
interval=PREFECT_WORKER_QUERY_SECONDS.value(),
run_once=run_once,
printer=app.console.print,
jitter_range=0.3,
backoff=4, # Up to ~1 minute interval during backoff
)
)
)
# schedule the sync loop
tg.start_soon(
partial(
critical_service_loop,
workload=worker.sync_with_backend,
interval=worker.heartbeat_interval_seconds,
run_once=run_once,
printer=app.console.print,
jitter_range=0.3,
backoff=4,
# schedule the sync loop
tg.start_soon(
partial(
critical_service_loop,
workload=worker.sync_with_backend,
interval=worker.heartbeat_interval_seconds,
run_once=run_once,
printer=app.console.print,
jitter_range=0.3,
backoff=4,
)
)
)
tg.start_soon(
partial(
critical_service_loop,
workload=worker.check_for_cancelled_flow_runs,
interval=PREFECT_WORKER_QUERY_SECONDS.value() * 2,
run_once=run_once,
printer=app.console.print,
jitter_range=0.3,
backoff=4,
tg.start_soon(
partial(
critical_service_loop,
workload=worker.check_for_cancelled_flow_runs,
interval=PREFECT_WORKER_QUERY_SECONDS.value() * 2,
run_once=run_once,
printer=app.console.print,
jitter_range=0.3,
backoff=4,
)
)
)

started_event = await worker._emit_worker_started_event()

# if --with-healthcheck was passed, start the healthcheck server
if with_healthcheck:
# we'll start the ASGI server in a separate thread so that
# uvicorn does not block the main thread
server_thread = threading.Thread(
name="healthcheck-server-thread",
target=partial(
start_healthcheck_server,
worker=worker,
query_interval_seconds=PREFECT_WORKER_QUERY_SECONDS.value(),
),
daemon=True,
)
server_thread.start()
started_event = await worker._emit_worker_started_event()

# if --with-healthcheck was passed, start the healthcheck server
if with_healthcheck:
# we'll start the ASGI server in a separate thread so that
# uvicorn does not block the main thread
server_thread = threading.Thread(
name="healthcheck-server-thread",
target=partial(
start_healthcheck_server,
worker=worker,
query_interval_seconds=PREFECT_WORKER_QUERY_SECONDS.value(),
),
daemon=True,
)
server_thread.start()
except CancelledError:
logger.debug("Worker task group cancelled")

await worker._emit_worker_stopped_event(started_event)
app.console.print(f"Worker {worker.name!r} stopped!")
Expand Down
Loading

0 comments on commit 067cbc6

Please sign in to comment.