From af5d50707a208f1011d4cde71370923d950311e5 Mon Sep 17 00:00:00 2001 From: Michael van Rooijen Date: Tue, 15 Oct 2024 12:12:02 +0200 Subject: [PATCH] Change Celery initialization configuration --- hirefire_resource/macro/celery.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/hirefire_resource/macro/celery.py b/hirefire_resource/macro/celery.py index 3e5230a..5505f3c 100644 --- a/hirefire_resource/macro/celery.py +++ b/hirefire_resource/macro/celery.py @@ -104,7 +104,7 @@ def job_queue_latency(*queues, broker_url=None): else: broker_url = "redis://localhost:6379/0" - app = Celery(broker=broker_url) + app = Celery(broker=broker_url, broker_connection_timeout=30, broker_heartbeat=30) try: with app.connection_or_acquire() as connection: @@ -115,9 +115,10 @@ def job_queue_latency(*queues, broker_url=None): fn = _job_queue_latency_rabbitmq return max(fn(channel, queue) for queue in queues) - except OperationalError: return 0 + finally: + app.close() async def async_job_queue_latency(*queues, broker_url=None): @@ -251,7 +252,7 @@ def job_queue_size(*queues, broker_url=None): else: broker_url = "redis://localhost:6379/0" - app = Celery(broker=broker_url) + app = Celery(broker=broker_url, broker_connection_timeout=30, broker_heartbeat=30) try: with app.connection_or_acquire() as connection: @@ -259,9 +260,10 @@ def job_queue_size(*queues, broker_url=None): worker_task_count = _job_queue_size_worker(app, queues) broker_task_count = _job_queue_size_broker(channel, queues) return worker_task_count + broker_task_count - except OperationalError: return 0 + finally: + app.close() async def async_job_queue_size(*queues, broker_url=None):