Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefect Server Experiencing Timeouts Due to Slow Database Communication #16299

Open
tomukmatthews opened this issue Dec 9, 2024 · 29 comments
Open
Labels
3.x bug Something isn't working performance Related to an optimization or performance improvement

Comments

@tomukmatthews
Copy link

tomukmatthews commented Dec 9, 2024

Bug summary

We're experiencing significant performance issues with our Prefect server installation, primarily manifesting as timeouts in database communications. The services are consistently running longer than their designated loop intervals:

FlowRunNotifications: Taking ~6.8s vs 4s interval
RecentDeploymentsScheduler: Taking ~7.5s vs 5s interval

The primary error appears to be a timeout in the PostgreSQL asyncpg connection:

TimeoutError raised in sqlalchemy/dialects/postgresql/asyncpg.py

Version info (for the server)

Version:             3.1.4
API version:         0.8.4
Python version:      3.12.7
Git commit:          78ee41cb
Built:               Wed, Nov 20, 2024 7:37 PM
OS/Arch:             linux/x86_64
Profile:             ephemeral
Server type:         server
Pydantic version:    2.10.0

Additional context

  • We've deployed the same flows in prefect 2.19 and there were no issues
  • We're using Postgres with engine version 14.2. The number of database connections is well below the maximum limit and the load on the DB is low
  • We attempted wiping the database completely and starting fresh (which didn't work)
  • Our prefect client which we used to deploy the flows is on the same prefect, pydantic and python version
  • We've deployed prefect in Kubernetes (EKS) using the helm chart at https://prefecthq.github.io/prefect-helm, vesion: 2024.12.3011129
  • I've noticed the perfect server experiencing OOMs and restarting, doubling its CPU and memory limits alleviated this.
  • I've been experiencing issues around client - server comms (despite increasing the proxy-read-timeout in my nginx ingress):
23:23:53.081 | WARNING | prefect.events.clients - Unable to connect to 'ws://prefect-dev.shared.unitary.ai/api/events/in'. Please check your network settings to ensure websocket connections to the API are allowed. Otherwise event data (including task run data) may be lost. Reason: . Set PREFECT_DEBUG_MODE=1 to see the full error.
23:23:53.082 | ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 5 pending items.

Server logs:

  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 949, in connect
    await_only(creator_fn(*arg, **kw)),
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
            ^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/asyncpg/connection.py", line 2420, in connect
    async with compat.timeout(timeout):
               ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/timeouts.py", line 115, in __aexit__
    raise TimeoutError from exc_val
TimeoutError
13:54:15.592 | WARNING | prefect.server.services.flowrunnotifications - FlowRunNotifications took 6.797844 seconds to run, which is longer than its loop interval of 4 seconds.
13:54:16.315 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 7.518834 seconds to run, which is longer than its loop interval of 5 seconds.
@tomukmatthews tomukmatthews added the bug Something isn't working label Dec 9, 2024
@CorMazz
Copy link

CorMazz commented Dec 10, 2024

I'm having what I think may be a related issue? I tested out Prefect no problem on WSL, because we're considering using it as a workflow orchestrator. Now when I'm trying to test that same workflow on a corporate RH Linux environment, the server feels like it is starved for resources (even though I'm running a simple workflow and the VM has 4 cores & 64 GB RAM), since tons of services are taking longer to run than their designated loop intervals and it is slow to respond to UI interaction. Likely related: the SQLite database seems to always be locked.

Version:             3.1.5
API version:         0.8.4
Python version:      3.10.8
Git commit:          3c06654e
Built:               Mon, Dec 2, 2024 6:57 PM
OS/Arch:             linux/x86_64
Profile:             local
Server type:         server
Pydantic version:    2.10.3

11:29:52.962 | WARNING | prefect.server.services.marklateruns - MarkLateRuns took 16.083751 seconds to run, which is longer than its loop interval of 5.0 seconds.
11:29:52.977 | ERROR   | uvicorn.error - Exception in ASGI application
Traceback (most recent call last):
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute
    cursor.execute(statement, parameters)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 147, in execute
    self._adapt_connection._handle_exception(error)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 298, in _handle_exception
    raise error
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 129, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/aiosqlite/cursor.py", line 48, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/aiosqlite/cursor.py", line 40, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/aiosqlite/core.py", line 132, in _execute
    return await future
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/aiosqlite/core.py", line 115, in run
    result = function()
sqlite3.OperationalError: database is locked

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/uvicorn/protocols/http/h11_impl.py", line 403, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__
    return await self.app(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/applications.py", line 113, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/middleware/errors.py", line 187, in __call__
    raise exc
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/middleware/errors.py", line 165, in __call__
    await self.app(scope, receive, _send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/prefect/server/api/server.py", line 149, in __call__
    await self.app(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/middleware/cors.py", line 85, in __call__
    await self.app(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/routing.py", line 715, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/routing.py", line 735, in app
    await route.handle(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/routing.py", line 460, in handle
    await self.app(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/applications.py", line 113, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/middleware/errors.py", line 187, in __call__
    raise exc
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/middleware/errors.py", line 165, in __call__
    await self.app(scope, receive, _send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 20, in __call__
    await responder(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 39, in __call__
    await self.app(scope, receive, self.send_with_gzip)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/routing.py", line 715, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/routing.py", line 735, in app
    await route.handle(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/routing.py", line 288, in handle
    await self.app(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/routing.py", line 76, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/starlette/routing.py", line 73, in app
    response = await f(request)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/prefect/server/utilities/server.py", line 47, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/fastapi/routing.py", line 301, in app
    raw_response = await run_endpoint_function(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/fastapi/routing.py", line 212, in run_endpoint_function
    return await dependant.call(**values)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/prefect/server/api/task_runs.py", line 72, in create_task_run
    model = await models.task_runs.create_task_run(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 168, in async_wrapper
    return await func(db, *args, **kwargs)  # type: ignore
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/prefect/server/models/task_runs.py", line 80, in create_task_run
    await session.execute(insert_stmt)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute
    result = await greenlet_spawn(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 201, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2362, in execute
    return self._execute_internal(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2247, in _execute_internal
    result: Result[Any] = compile_state_cls.orm_execute_statement(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/orm/bulk_persistence.py", line 1294, in orm_execute_statement
    result = conn.execute(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1418, in execute
    return meth(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
    return self._exec_single_context(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
    self._handle_dbapi_exception(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2355, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute
    cursor.execute(statement, parameters)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 147, in execute
    self._adapt_connection._handle_exception(error)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 298, in _handle_exception
    raise error
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 129, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/aiosqlite/cursor.py", line 48, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/aiosqlite/cursor.py", line 40, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/aiosqlite/core.py", line 132, in _execute
    return await future
  File "/home/my_username/python_environment/.venv/lib/python3.10/site-packages/aiosqlite/core.py", line 115, in run
    result = function()
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: INSERT INTO task_run (flow_run_id, task_key, dynamic_key, cache_key, cache_expiration, task_version, flow_run_run_count, empirical_policy, task_inputs, tags, labels, name, run_count, total_run_time, id, created, updated) VALUES (:flow_run_id, :task_key, :dynamic_key, :cache_key, :cache_expiration, :task_version, :flow_run_run_count, :empirical_policy, :task_inputs, :tags, :labels, :name, :run_count, :total_run_time, :id, :created, :updated) ON CONFLICT (flow_run_id, task_key, dynamic_key) DO NOTHING]
[parameters: {'flow_run_id': '541ccbb0-372e-4580-862c-3a5b56177058', 'task_key': 'process_case_flow-a8ab69b3', 'dynamic_key': '0', 'cache_key': None, 'cache_expiration': None, 'task_version': '5fd26d2d027b6eb680eda186427acafc', 'flow_run_run_count': 0, 'empirical_policy': '{"max_retries": 0, "retry_delay_seconds": 0.0, "retries": 0, "retry_delay": 0, "retry_jitter_factor": null}', 'task_inputs': '{"case_data": [{"input_type": "task_run", "id": "08e56be0-6a98-4122-898a-1111edbc8ebd"}]}', 'tags': '[]', 'labels': '{"prefect.flow.id": "b7b42394-69e0-4734-9921-b99a2292fa38", "prefect.flow-run.id": "541ccbb0-372e-4580-862c-3a5b56177058"}', 'name': 'Process Case 1a-0', 'run_count': 0, 'total_run_time': '1970-01-01 00:00:00.000000', 'id': '924ccd4a-285f-46c2-b529-05ef671e8f1f', 'created': '2024-12-10 16:29:36.273078', 'updated': '2024-12-10 16:29:47.872167'}]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
11:29:52.994 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 16.116992 seconds to run, which is longer than its loop interval of 5 seconds.
11:29:52.998 | WARNING | prefect.server.services.failexpiredpauses - FailExpiredPauses took 16.119 seconds to run, which is longer than its loop interval of 5.0 seconds.
11:29:53.019 | WARNING | prefect.server.services.flowrunnotifications - FlowRunNotifications took 19.135031 seconds to run, which is longer than its loop interval of 4 seconds.
11:29:53.162 | WARNING | prefect.server.services.foreman - Foreman took 16.29065 seconds to run, which is longer than its loop interval of 15.0 seconds.
11:30:44.210 | WARNING | prefect.server.services.failexpiredpauses - FailExpiredPauses took 6.207736 seconds to run, which is longer than its loop interval of 5.0 seconds.
11:30:44.268 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 6.266031 seconds to run, which is longer than its loop interval of 5 seconds.
11:30:44.294 | WARNING | prefect.server.services.marklateruns - MarkLateRuns took 6.318531 seconds to run, which is longer than its loop interval of 5.0 seconds.
11:30:44.333 | WARNING | prefect.server.services.flowrunnotifications - FlowRunNotifications took 7.302994 seconds to run, which is longer than its loop interval of 4 seconds.
11:31:04.703 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 10.43387 seconds to run, which is longer than its loop interval of 5 seconds.
11:31:04.749 | WARNING | prefect.server.services.marklateruns - MarkLateRuns took 10.451921 seconds to run, which is longer than its loop interval of 5.0 seconds.
11:31:04.754 | WARNING | prefect.server.services.flowrunnotifications - FlowRunNotifications took 12.412788 seconds to run, which is longer than its loop interval of 4 seconds.
11:31:04.760 | WARNING | prefect.server.services.failexpiredpauses - FailExpiredPauses took 10.545929 seconds to run, which is longer than its loop interval of 5.0 seconds.
11:31:35.370 | WARNING | prefect.server.services.foreman - Foreman took 42.202888 seconds to run, which is longer than its loop interval of 15.0 seconds.
11:31:35.429 | WARNING | prefect.server.services.failexpiredpauses - FailExpiredPauses took 25.666847 seconds to run, which is longer than its loop interval of 5.0 seconds.
11:31:35.441 | WARNING | prefect.server.services.marklateruns - MarkLateRuns took 25.68895 seconds to run, which is longer than its loop interval of 5.0 seconds.
11:31:35.445 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 25.739011 seconds to run, which is longer than its loop interval of 5 seconds.
11:31:35.495 | WARNING | prefect.server.services.flowrunnotifications - FlowRunNotifications took 26.73926 seconds to run, which is longer than its loop interval of 4 seconds.
11:32:06.644 | WARNING | prefect.server.services.flowrunnotifications - FlowRunNotifications took 31.147717 seconds to run, which is longer than its loop interval of 4 seconds.
11:32:06.906 | WARNING | prefect.server.services.foreman - Foreman took 31.535556 seconds to run, which is longer than its loop interval of 15.0 seconds.

@will-uai
Copy link

These seem to be related (#16304)
We are also experiencing these issues.

@CorMazz
Copy link

CorMazz commented Dec 11, 2024

It definitely has something to do with the database. I figured I'd test out running with an in-memory database and I had no issues whatsoever.

(python-environment) bash-4.4$ prefect config set PREFECT_API_DATABASE_CONNECTION_URL="sqlite+aiosqlite:///file::memory:?cache=shared&uri=true&check_same_thread=false"

Set 'PREFECT_API_DATABASE_CONNECTION_URL' to 'sqlite+aiosqlite:///file::memory:?cache=shared&uri=true&check_same_thread=false'.
Updated profile 'local'.

@cicdw cicdw added the performance Related to an optimization or performance improvement label Dec 11, 2024
@will-uai
Copy link

I'll add that shortly after a few of these errors occur, our prefect server pod just crashes and needs to be restarted. As a result, all running flows end up failing or in a zombie state (ie. prefect says the flow is running when it is not).

@mthanded
Copy link
Contributor

Can confirm I'm still seeing this on 3.1.7.dev4 . Seeing intermittent issues with both Postgres and SQLite.

@tomukmatthews
Copy link
Author

Hey @zzstoatzz, do you know if any prefect engineers have found any possible causes or are investigating this? Sorry to pester but we're having a lot of issues running flows and using the server UI?

@zzstoatzz
Copy link
Collaborator

hi @tomukmatthews - thanks for the bump. this is something we're going to investigate over the coming days - any information you can share about your workloads would be appreciated! thanks for the infra detail so far

@CorMazz
Copy link

CorMazz commented Dec 20, 2024

@zzstoatzz

I was running this toy workflow. Wasn't even doing anything real yet, I just wanted to see if Prefect would work for my use-case.

Essentially, I had a main workflow that would load a .csv file, and then for every row in that .csv file it would start a subworkflow that would do some stuff to the data. Then once all the subworkflows were done it would come together and summarize the results. The actual processing here is complete nonsense, this was just to test Prefect.

import asyncio
from textwrap import dedent
from typing import List, Dict
from prefect import task, flow, get_run_logger
import pandas as pd
from time import sleep
from prefect.tasks import task_input_hash
from prefect.cache_policies import TASK_SOURCE, INPUTS
from prefect.artifacts import create_progress_artifact, update_progress_artifact, create_markdown_artifact

"""An example workflow with confidential information removed for https://github.com/PrefectHQ/prefect/issues/16299"""

#-----------------------------------------------------------------------------------------------------------------------
# Process Case Workflow
#-----------------------------------------------------------------------------------------------------------------------

# Don't cache this one
@task(name="Load CSV")
def load_csv(path: str) -> List[Dict]:
    df = pd.read_csv(path)
    return df.to_dict('records')

# cache_policy=TASK_SOURCE + INPUTS

@task(name="Solve Stuff")
def solve_stuff(row: Dict) -> Dict:
    sleep(2)
    row['left_side'] = row["p"]*row["v"]
    row["right_side"] = row["n"]*row["r"]*row["t"]
    return row

@task(name="Create File")
def create_file(bcs: Dict) -> str:
    sleep(2)
    return "\n File \n".join([f"{k}: {v}" for k, v in bcs.items()])

@task(name="Create QFile")
def create_qfile(bcs: Dict) -> str:
    sleep(2)
    return "\n Q \n".join([f"{k}: {v}" for k, v in bcs.items()])

@task(name="Launch Job")
async def launch_job(journal_file: str, q_file: str) -> str:
    # progress_artifact_id = create_progress_artifact(
    #     progress=0.0,
    #     description="Indicates the estimated progress of the run.",
    # )
    sleep_time = 20
    for i in range(1, sleep_time + 1):
        await asyncio.sleep(1)
        # update_progress_artifact(artifact_id=progress_artifact_id, progress=(i / sleep_time) * 100)
    return "job\n" + journal_file + q_file

# cache_policy=TASK_SOURCE + INPUTS
@task()
def validate_case_bcs(job_results: str, case_bcs: Dict) -> bool:
    sleep(2)
    markdown_report = dedent(f"""\
        # Boundary Conditions Summary

        Case ID: {case_bcs["id"]}

        ```python
        def example_func():
            return "It works"
        ```

        ```mermaid
        pie title NETFLIX
         "Time spent looking for movie" : 90
         "Time spent watching it" : 10
                     
        ```"""
    )

    create_markdown_artifact(
        markdown=markdown_report,
        description="Validate Job Conditions",
    )

    return True

@task(name="Analyze Case Results")
def analyze_job_results(job_results: str, case_bcs: Dict) -> Dict:
    sleep(2)
    case_bcs["results"] = job_results
    return case_bcs

async def build_case_subflow(row: Dict):
    """https://github.com/PrefectHQ/prefect/issues/7319#issuecomment-1311968282"""
    @flow(name=f"Process Case {row.get('id', 'unknown')}")
    async def process_case_flow(case_data: Dict) -> Dict:
        case_bcs = solve_stuff(case_data)
        file_future, q_file_future = create_file.submit(case_bcs), create_qfile.submit(case_bcs)
        job_results = await launch_job(file_future, q_file_future)
        validate_case_bcs(job_results, case_bcs)
        return analyze_job_results(job_results, case_bcs)
    
    return await process_case_flow(row)

@task(name="Summarize All Results")
def summarize_results(all_results: List[Dict]) -> List[Dict]:
    sleep(2)
    return all_results

#-----------------------------------------------------------------------------------------------------------------------
# CFD Optimization Workflow
#-----------------------------------------------------------------------------------------------------------------------

@flow(name="Toy Job Workflow")
async def job_workflow(csv_path: str) -> List[Dict]:
    rows = load_csv(csv_path)
    all_cfd_results = await asyncio.gather(*[build_case_subflow(row) for row in rows])
    return summarize_results(all_cfd_results)

if __name__ == "__main__":
    asyncio.run(job_workflow("example_data.csv"))

@zzstoatzz
Copy link
Collaborator

thanks @CorMazz - much appreciated!

@tomukmatthews
Copy link
Author

tomukmatthews commented Jan 1, 2025

Hi @zzstoatzz, i was curious if you found anything interesting re. the performance issues in the telemetry data?

@tomukmatthews
Copy link
Author

Our data pipelines are frequently failing with:

prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url 'http://prefect-server.prefect.svc.cluster.local:4200/api/flow_runs/'

Do you know of any server / client side configuration we can change that'll alleviate this issues in the interim to help our runs to complete successfully? e.g. I've already 3x'd its memory allocation, maybe bumping up the server's memory and cpu further might help?

@OliverKleinBST
Copy link

I have the same problem currently, but with 2.20.15.
I realized it when moving from 2.20.15-python3.10 to 2.20.15-python3.11.
However, it stays to be a problem even when I now rollback.

The problem actually hits me when I am opening a flow run page where it seems to time out while collecting data to display the flow graph and the logs.

The setup is deployed via Helm on Azure and I use Azure PostgresQL flexible server version 14.12

The stack trace reads like
File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/routing.py", line 780, in app │ await route.handle(scope, receive, send) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/routing.py", line 483, in handle │ await self.app(scope, receive, send) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/fastapi/applications.py", line 293, in call │ await super().call(scope, receive, send) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/applications.py", line 123, in call │ await self.middleware_stack(scope, receive, send) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/middleware/errors.py", line 190, in call │ raise exc │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/middleware/errors.py", line 168, in call │ await self.app(scope, receive, _send) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/middleware/gzip.py", line 24, in call │ await responder(scope, receive, send) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/middleware/gzip.py", line 44, in call │ await self.app(scope, receive, self.send_with_gzip) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/middleware/exceptions.py", line 62, in call │ await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/_exception_handler.py", line 62, in wrapped_app │ raise exc │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/_exception_handler.py", line 51, in wrapped_app │ await app(scope, receive, sender) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/fastapi/middleware/asyncexitstack.py", line 20, in call │ raise e │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/fastapi/middleware/asyncexitstack.py", line 17, in call │ await self.app(scope, receive, send) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/routing.py", line 760, in call │ await self.middleware_stack(scope, receive, send) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/routing.py", line 780, in app │ await route.handle(scope, receive, send) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/routing.py", line 302, in handle │ await self.app(scope, receive, send) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/routing.py", line 81, in app │ await wrap_app_handling_exceptions(app, request)(scope, receive, send)
File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/_exception_handler.py", line 62, in wrapped_app │ raise exc │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/_exception_handler.py", line 51, in wrapped_app │ await app(scope, receive, sender) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/starlette/routing.py", line 76, in app │ response = await func(request) │ File "/usr/local/lib/python3.10/site-packages/prefect/server/utilities/server.py", line 104, in handle_response_scoped_depends │
│ response = await default_handler(request) │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/fastapi/routing.py", line 251, in app │ raw_response = await run_endpoint_function( │ File "/usr/local/lib/python3.10/site-packages/prefect/_vendor/fastapi/routing.py", line 177, in run_endpoint_function │ return await dependant.call(**values) │ File "/usr/local/lib/python3.10/site-packages/prefect/server/api/flow_runs.py", line 318, in read_flow_run_graph_v2 │ return await read_flow_run_graph( │ File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 168, in async_wrapper │ return await func(db, *args, **kwargs) # type: ignore │ File "/usr/local/lib/python3.10/site-packages/prefect/server/models/flow_runs.py", line 540, in read_flow_run_graph │ return await db.queries.flow_run_graph_v2( │ File "/usr/local/lib/python3.10/site-packages/prefect/server/database/query_components.py", line 898, in flow_run_graph_v2 │ results = await session.execute(query) │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute │ result = await greenlet_spawn( │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 203, in greenlet_spawn │ result = context.switch(value) │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2362, in execute │ return self._execute_internal( │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2256, in _execute_internal │ result = conn.execute( │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1418, in execute │ return meth( │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection │ return connection._execute_clauseelement( │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement │ ret = self._execute_context( │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context │ return self._exec_single_context( │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context │ self._handle_dbapi_exception( │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2358, in _handle_dbapi_exception
raise exc_info[1].with_traceback(exc_info[2]) │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context │ self.dialect.do_execute( │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute │ cursor.execute(statement, parameters) │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 572, in execute │ self.adapt_connection.await( │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only │ return current.parent.switch(awaitable) # type: ignore[no-any-return,attr-defined] # noqa: E501 │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn │ value = await result │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 550, in _prepare_and_execute │ self._handle_exception(error) │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 501, in _handle_exception │ self._adapt_connection._handle_exception(error) │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 786, in _handle_exception │ raise error │ File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 538, in _prepare_and_execute │ self._rows = deque(await prepared_stmt.fetch(*parameters)) │ File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch │ data = await self.__bind_execute(args, 0, timeout) │ File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute │ data, status, _ = await self.__do_execute( │ File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute │ return await executor(protocol) │ File "asyncpg/protocol/protocol.pyx", line 207, in bind_execute │ asyncio.exceptions.TimeoutError

@zzstoatzz
Copy link
Collaborator

hi @tomukmatthews - there's still more investigation to do (apologies, last couple weeks have been slower due to the time of the year)

from the preliminary investigation I did, after running work constantly for a while (was testing mostly on postgres), I was able to observe the lagging loop services mentioned above, which seemed to be caused by long waits to obtain new db connections. Some ideas have been floated about this being related to the in-memory events implementation, but anecdotally that specific change hasn't made a significant improvement.

any findings you'd like to share would be appreciated! we will continue to look into this as time allows

@RobertFischer
Copy link

I am seeing exactly this behavior -- timeout errors with messages about things taking longer than their loop allowances, and then Prefect's workflow management becoming entirely inactive -- but with 2.20.13 under moderate load. This occurred when running against a minimally-powered GCP PSQL server and it went away when I made the PSQL server beefier.

@OliverKleinBST
Copy link

OliverKleinBST commented Jan 8, 2025

I am seeing exactly this behavior -- timeout errors with messages about things taking longer than their loop allowances, and then Prefect's workflow management becoming entirely inactive -- but with 2.20.13 under moderate load. This occurred when running against a minimally-powered GCP PSQL server and it went away when I made the PSQL server beefier.

I also had the impression that a higher SKU for the PostgreSQL server helps - as after the upgrade the problem vanished, but in my case only for a while, and then problem came back. More strangely, when prefect got stalled with the database I could still run my own queries manually to the prefect tables reflecting similar data pull's with both psycopg and asyncpg and results returned with <1s. And the server metrics also showed way enough capacity.

@will-uai
Copy link

will-uai commented Jan 8, 2025

I am seeing exactly this behavior -- timeout errors with messages about things taking longer than their loop allowances, and then Prefect's workflow management becoming entirely inactive -- but with 2.20.13 under moderate load. This occurred when running against a minimally-powered GCP PSQL server and it went away when I made the PSQL server beefier.

@RobertFischer could you post the specs of your PSQL instance (before and after)?

@OliverKleinBST
Copy link

I am seeing exactly this behavior -- timeout errors with messages about things taking longer than their loop allowances, and then Prefect's workflow management becoming entirely inactive -- but with 2.20.13 under moderate load. This occurred when running against a minimally-powered GCP PSQL server and it went away when I made the PSQL server beefier.

@RobertFischer could you post the specs of your PSQL instance (before and after)?

I try to go cheap:
Standard_B1ms (1 vCore, 2GiB, 640 max iops, P4 120 iops) vs Standard_Ds2_v3 (2 vCore, 8GiB, 3200 max iops, P4 120 iops)

@RobertFischer
Copy link

RobertFischer commented Jan 8, 2025

The lightweight version was GCP's db-f1-micro, which was shared core, 0.6 GiB RAM, HDD. There was nothing in the GCP logs implying that the DB was under resource strain. The heavier version was GCP's db-perf-optimized-N-2, which is 2 vCPUs with 16GiB RAM, SSD. Like @OliverKleinBST, this increase just deferred how long until the problem recurred.

@RobertFischer
Copy link

Here's a total shot in the dark. Are we positive all DB connections are being closed? If there's a leak in the DB connections and some transaction isn't getting closed, it might cause problems like this due to locks in the DB. If/when I see this again, I can query the DB system tables to see what the situation is, if that's useful.

@OliverKleinBST
Copy link

OliverKleinBST commented Jan 9, 2025

One observation when running
SELECT datname, state, count(*) FROM pg_stat_activity group by datname, state order by count desc;
the clear winner is the prefect database with in my case currently 27 counts whereas other tools I do have are less than 10 if not 1.

@OliverKleinBST
Copy link

 datid | datname |  pid  | leader_pid | usesysid | usename | application_name | client_addr | client_hostname | client_port |         backend_start         | xact_start |          query_start          |         state_change          | wait_event_type | wait_event | state | backend_xid | backend_xmin |      query_id       |   query   |  backend_type  
-------+---------+-------+------------+----------+---------+------------------+-------------+-----------------+-------------+-------------------------------+------------+-------------------------------+-------------------------------+-----------------+------------+-------+-------------+--------------+---------------------+-----------+----------------
 18121 | server  |  1848 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       39646 | 2025-01-08 16:40:28.57522+00  |            | 2025-01-08 17:47:30.119931+00 | 2025-01-08 17:47:30.12002+00  | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  |  1847 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       39652 | 2025-01-08 16:40:28.570175+00 |            | 2025-01-08 17:47:30.119755+00 | 2025-01-08 17:47:30.119868+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  |  1844 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       39630 | 2025-01-08 16:40:28.563204+00 |            | 2025-01-09 08:26:47.497561+00 | 2025-01-09 08:26:47.497616+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  |  1837 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       39590 | 2025-01-08 16:40:28.54943+00  |            | 2025-01-09 08:26:47.498061+00 | 2025-01-09 08:26:47.498119+00 | Client          | ClientRead | idle  |             |              | 3694949039461716331 | COMMIT;   | client backend
 18121 | server  |  1845 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       39636 | 2025-01-08 16:40:28.564328+00 |            | 2025-01-09 08:23:57.463443+00 | 2025-01-09 08:23:57.463501+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  |  1843 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       39626 | 2025-01-08 16:40:28.558585+00 |            | 2025-01-08 17:47:33.463163+00 | 2025-01-08 17:47:33.46325+00  | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  |  1840 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       39598 | 2025-01-08 16:40:28.555034+00 |            | 2025-01-08 17:47:33.463585+00 | 2025-01-08 17:47:33.463661+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  |  1838 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       39582 | 2025-01-08 16:40:28.551169+00 |            | 2025-01-08 17:47:30.120089+00 | 2025-01-08 17:47:30.120182+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  |  1839 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       39596 | 2025-01-08 16:40:28.553584+00 |            | 2025-01-08 17:47:33.462427+00 | 2025-01-08 17:47:33.462517+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  |  1842 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       39618 | 2025-01-08 16:40:28.557746+00 |            | 2025-01-09 01:17:17.153839+00 | 2025-01-09 01:17:17.155689+00 | Client          | ClientRead | idle  |             |              | 3694949039461716331 | COMMIT;   | client backend
 18121 | server  |  1841 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       39608 | 2025-01-08 16:40:28.556379+00 |            | 2025-01-08 17:44:48.81624+00  | 2025-01-08 17:44:48.816362+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  |  1846 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       39644 | 2025-01-08 16:40:28.565457+00 |            | 2025-01-09 08:26:49.54046+00  | 2025-01-09 08:26:49.540496+00 | Client          | ClientRead | idle  |             |              | 3694949039461716331 | COMMIT;   | client backend
 18121 | server  | 26986 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       40438 | 2025-01-08 11:18:19.466876+00 |            | 2025-01-08 16:40:28.598412+00 | 2025-01-08 16:40:28.598504+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 26988 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       40456 | 2025-01-08 11:18:19.478918+00 |            | 2025-01-08 16:40:28.558518+00 | 2025-01-08 16:40:28.558628+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 26987 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       40452 | 2025-01-08 11:18:19.472465+00 |            | 2025-01-08 16:40:28.585386+00 | 2025-01-08 16:40:28.585487+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 32603 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       52134 | 2025-01-08 04:29:33.408821+00 |            | 2025-01-08 16:40:28.559101+00 | 2025-01-08 16:40:28.5592+00   | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 32596 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       55828 | 2025-01-08 04:29:27.335201+00 |            | 2025-01-08 16:40:28.55877+00  | 2025-01-08 16:40:28.559017+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 32595 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       55820 | 2025-01-08 04:29:27.310999+00 |            | 2025-01-08 16:40:28.55244+00  | 2025-01-08 16:40:28.552587+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 22803 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       38064 | 2025-01-07 20:31:40.634515+00 |            | 2025-01-08 16:40:28.552943+00 | 2025-01-08 16:40:28.553039+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 22805 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       38082 | 2025-01-07 20:31:40.638371+00 |            | 2025-01-08 16:40:28.606069+00 | 2025-01-08 16:40:28.606182+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 22804 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       38076 | 2025-01-07 20:31:40.641748+00 |            | 2025-01-08 17:47:33.244517+00 | 2025-01-08 17:47:33.244618+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 22802 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       38052 | 2025-01-07 20:31:40.634917+00 |            | 2025-01-08 16:40:28.602074+00 | 2025-01-08 16:40:28.60217+00  | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 22801 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       38036 | 2025-01-07 20:31:40.6251+00   |            | 2025-01-08 16:40:28.552763+00 | 2025-01-08 16:40:28.552852+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 22800 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       38030 | 2025-01-07 20:31:40.617929+00 |            | 2025-01-08 16:40:28.606104+00 | 2025-01-08 16:40:28.606191+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 32594 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       55812 | 2025-01-08 04:29:27.309575+00 |            | 2025-01-08 16:40:28.558969+00 | 2025-01-08 16:40:28.559095+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 26985 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       40424 | 2025-01-08 11:18:19.463167+00 |            | 2025-01-08 16:40:28.59809+00  | 2025-01-08 16:40:28.598199+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
 18121 | server  | 22798 |            |    47343 | prefect |                  | 10.1.19.38  |                 |       38022 | 2025-01-07 20:31:39.296126+00 |            | 2025-01-08 16:40:28.602081+00 | 2025-01-08 16:40:28.602148+00 | Client          | ClientRead | idle  |             |              | 7088996947831101750 | ROLLBACK; | client backend
(27 rows)


@OliverKleinBST
Copy link

Another finding I could add: we have 2 instances in cloud DEV and PROD. On the seldomly used DEV environment I see 5 such idle connections, so much less that on the frequently in use PROD.

@will-uai
Copy link

will-uai commented Jan 9, 2025

Can confirm we also see these idle connections (5 at the minimum).

@will-uai
Copy link

any update from the prefect team on this, @zzstoatzz

@RobertFischer
Copy link

Deleting all the retry configurations seemed to resolve this issue. I have no idea why or if it's a red herring, but just FYI.

@cicdw
Copy link
Member

cicdw commented Jan 13, 2025

Both @zzstoatzz and I are looking into this for the week; a few small updates:

  • the number 5 as the minimum connection count makes sense, as this is the default pool_size for sqlalchemy connection pools; you can configure this with the Prefect setting PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE
  • we think one piece of the problem is that using prefect server start creates a single connection pool that is shared by both the webserver and the background services; this is not what I would want in a production environment and could be one source of contention - we are working on exposing a new interface that will allow you to do prefect server start --no-services and prefect server services start to decouple these two components of the backend which should hopefully improve all of our monitoring capabilities
  • we are introducing a new setting called PREFECT_SERVER_DATABASE_CONNECTION_APP_NAME that, when used alongside the two distinct CLI commands above, should allow us to determine whether the webserver or background services are at fault (or both) for leaving their pool connections open

We'll keep providing updates as we have them and please continue to share any / all relevant information!

@zzstoatzz
Copy link
Collaborator

zzstoatzz commented Jan 22, 2025

👋 hi all - wanted to share some findings from recent investigation. We believe we've identified a bottleneck in the service responsible for recording task runs which is likely responsible for a significant part of the bad performance reported here.

This conclusion is consistent with:

  • the observation some have made that this behavior is specific to 3.x, since the TaskRunRecorder was created to facilitate the client-side task run orchestration that lets tasks run much more quickly
  • the fact that this issue exists for both pg and sqlite

The issue appears to be that in the task run recording service, task runs are recorded sequentially through an in-memory queue and therefore when running continual task-heavy flows, you get:

  • arbitrary queue backup
  • leading to memory growth
  • leading to lagging loop services

we're exploring / testing strategies for dealing with this (which have shown initial promise)

  • Refactoring the TaskRunRecorder's underlying consumer to allow faster message processing
  • Implementing configurable maximum queue depths (i.e. introducing backpressure)
  • Removing extra database transactions (the statements related to task run recording should be idempotent)

While we're still interested in adding external messaging support (e.g. redis) for larger scale, we'd like to reasonably exhaust opportunities for improvement with the in-memory queue.

Please feel free to chime in if you feel this doesn't explain the behavior you're seeing or if you have other ideas/thoughts

@OliverKleinBST
Copy link

👋 hi all - wanted to share some findings from recent investigation. We believe we've identified a bottleneck in the service responsible for recording task runs which is likely responsible for a significant part of the bad performance reported here.

This conclusion is consistent with:

  • the observation some have made that this behavior is specific to 3.x, since the TaskRunRecorder was created to facilitate the client-side task run orchestration that lets tasks run much more quickly
  • the fact that this issue exists for both pg and sqlite

The issue appears to be that in the task run recording service, task runs are recorded sequentially through an in-memory queue and therefore when running continual task-heavy flows, you get:

  • arbitrary queue backup
  • leading to memory growth
  • leading to lagging loop services

we're exploring / testing strategies for dealing with this (which have shown initial promise)

  • Refactoring the TaskRunRecorder's underlying consumer to allow faster message processing
  • Implementing configurable maximum queue depths (i.e. introducing backpressure)
  • Removing extra database transactions (the statements related to task run recording should be idempotent)

While we're still interested in adding external messaging support (e.g. redis) for larger scale, we'd like to reasonably exhaust opportunities for improvement with the in-memory queue.

Please feel free to chime in if you feel this doesn't explain the behavior you're seeing or if you have other ideas/thoughts

Great. That would though not explain why we have a problem also in prefect 2.x, or?

@zzstoatzz
Copy link
Collaborator

correct @OliverKleinBST - that would not directly explain seeing the same behavior in 2.x

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3.x bug Something isn't working performance Related to an optimization or performance improvement
Projects
None yet
Development

No branches or pull requests

8 participants