From 2438508e45a1dfa96a90bb0be447910ee0f38b62 Mon Sep 17 00:00:00 2001 From: Michael van Rooijen Date: Wed, 16 Oct 2024 14:57:25 +0200 Subject: [PATCH] Introduce retry machanism for connection reset problem --- hirefire_resource/macro/celery.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/hirefire_resource/macro/celery.py b/hirefire_resource/macro/celery.py index 5505f3c..d9e0467 100644 --- a/hirefire_resource/macro/celery.py +++ b/hirefire_resource/macro/celery.py @@ -25,6 +25,32 @@ class ChannelError(Exception): from hirefire_resource.errors import MissingQueueError +def retry_on_connection_reset(retries=5, delay=2): + """ + Decorator to retry a function when ConnectionResetError occurs. + + Args: + retries (int): Number of retry attempts for connection errors. + delay (int): Fixed delay between retry attempts in seconds. + """ + + def decorator(func): + def wrapper(*args, **kwargs): + for attempt in range(retries): + try: + return func(*args, **kwargs) + except ConnectionResetError as e: + if attempt < retries - 1: + time.sleep(delay) + else: + raise + + return wrapper + + return decorator + + +@retry_on_connection_reset() def job_queue_latency(*queues, broker_url=None): """ Calculates the maximum job queue latency across the specified queues using Celery with either @@ -184,6 +210,7 @@ async def async_job_queue_latency(*queues, broker_url=None): return await loop.run_in_executor(None, func) +@retry_on_connection_reset() def job_queue_size(*queues, broker_url=None): """ Calculates the total job queue size across the specified queues using Celery with either Redis