From 86ea3b0e757daf90f849f7bd52a98e8fde051edb Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Mon, 17 Jun 2024 14:59:20 -0400 Subject: [PATCH] Disable by default, and let the user drive the status port --- src/prefect/task_worker.py | 40 ++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/src/prefect/task_worker.py b/src/prefect/task_worker.py index b5c40e9611e6..1c225afd1f9c 100644 --- a/src/prefect/task_worker.py +++ b/src/prefect/task_worker.py @@ -351,7 +351,9 @@ def status(): @sync_compatible -async def serve(*tasks: Task, limit: Optional[int] = 10): +async def serve( + *tasks: Task, limit: Optional[int] = 10, status_server_port: Optional[int] = None +): """Serve the provided tasks so that their runs may be submitted to and executed. in the engine. Tasks do not need to be within a flow run context to be submitted. You must `.submit` the same task object that you pass to `serve`. @@ -361,6 +363,9 @@ async def serve(*tasks: Task, limit: Optional[int] = 10): given task, the task run will be submitted to the engine for execution. - limit: The maximum number of tasks that can be run concurrently. Defaults to 10. Pass `None` to remove the limit. + - status_server_port: An optional port on which to start an HTTP server + exposing status information about the task worker. If not provided, no + status server will run. Example: ```python @@ -382,18 +387,18 @@ def yell(message: str): """ task_worker = TaskWorker(*tasks, limit=limit) - loop = asyncio.get_event_loop() - - server = uvicorn.Server( - uvicorn.Config( - app=create_status_server(task_worker), - host="127.0.0.1", - port=4422, - access_log=False, - log_level="warning", + if status_server_port is not None: + server = uvicorn.Server( + uvicorn.Config( + app=create_status_server(task_worker), + host="127.0.0.1", + port=status_server_port, + access_log=False, + log_level="warning", + ) ) - ) - status_server = loop.create_task(server.serve()) + loop = asyncio.get_event_loop() + status_server_task = loop.create_task(server.serve()) try: await task_worker.start() @@ -413,8 +418,9 @@ def yell(message: str): logger.info("Task worker interrupted, stopping...") finally: - status_server.cancel() - try: - await status_server - except asyncio.CancelledError: - pass + if status_server_task: + status_server_task.cancel() + try: + await status_server_task + except asyncio.CancelledError: + pass