diff --git a/src/prefect/server/events/triggers.py b/src/prefect/server/events/triggers.py index 67d39a705c8a..51ceef29adc5 100644 --- a/src/prefect/server/events/triggers.py +++ b/src/prefect/server/events/triggers.py @@ -23,6 +23,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from typing_extensions import Literal, TypeAlias +from prefect._internal.retries import retry_async_fn from prefect.logging import get_logger from prefect.server.database.dependencies import db_injector from prefect.server.database.interface import PrefectDBInterface @@ -513,6 +514,13 @@ async def reactive_evaluation(event: ReceivedEvent, depth: int = 0): await session.commit() +# retry on operational errors to account for db flakiness with sqlite +@retry_async_fn(max_attempts=3, retry_on_exceptions=(sa.exc.OperationalError,)) +async def get_lost_followers(): + """Get followers that have been sitting around longer than our lookback""" + return await causal_ordering().get_lost_followers() + + async def periodic_evaluation(now: DateTime): """Periodic tasks that should be run regularly, but not as often as every event""" offset = await get_events_clock_offset() @@ -523,7 +531,7 @@ async def periodic_evaluation(now: DateTime): # Any followers that have been sitting around longer than our lookback are never # going to see their leader event (maybe it was lost or took too long to arrive). # These events can just be evaluated now in the order they occurred. - for event in await causal_ordering().get_lost_followers(): + for event in await get_lost_followers(): await reactive_evaluation(event) async with automations_session() as session: