Skip to content

Commit

Permalink
Disable by default, and let the user drive the status port
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisguidry committed Jun 17, 2024
1 parent 8aa43df commit 86ea3b0
Showing 1 changed file with 23 additions and 17 deletions.
40 changes: 23 additions & 17 deletions src/prefect/task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ def status():


@sync_compatible
async def serve(*tasks: Task, limit: Optional[int] = 10):
async def serve(
*tasks: Task, limit: Optional[int] = 10, status_server_port: Optional[int] = None
):
"""Serve the provided tasks so that their runs may be submitted to and executed.
in the engine. Tasks do not need to be within a flow run context to be submitted.
You must `.submit` the same task object that you pass to `serve`.
Expand All @@ -361,6 +363,9 @@ async def serve(*tasks: Task, limit: Optional[int] = 10):
given task, the task run will be submitted to the engine for execution.
- limit: The maximum number of tasks that can be run concurrently. Defaults to 10.
Pass `None` to remove the limit.
- status_server_port: An optional port on which to start an HTTP server
exposing status information about the task worker. If not provided, no
status server will run.
Example:
```python
Expand All @@ -382,18 +387,18 @@ def yell(message: str):
"""
task_worker = TaskWorker(*tasks, limit=limit)

loop = asyncio.get_event_loop()

server = uvicorn.Server(
uvicorn.Config(
app=create_status_server(task_worker),
host="127.0.0.1",
port=4422,
access_log=False,
log_level="warning",
if status_server_port is not None:
server = uvicorn.Server(
uvicorn.Config(
app=create_status_server(task_worker),
host="127.0.0.1",
port=status_server_port,
access_log=False,
log_level="warning",
)
)
)
status_server = loop.create_task(server.serve())
loop = asyncio.get_event_loop()
status_server_task = loop.create_task(server.serve())

try:
await task_worker.start()
Expand All @@ -413,8 +418,9 @@ def yell(message: str):
logger.info("Task worker interrupted, stopping...")

finally:
status_server.cancel()
try:
await status_server
except asyncio.CancelledError:
pass
if status_server_task:
status_server_task.cancel()
try:
await status_server_task
except asyncio.CancelledError:
pass

0 comments on commit 86ea3b0

Please sign in to comment.