Skip to content

Commit

Permalink
Mitigate Celery's intermittent ConnectionResetError
Browse files Browse the repository at this point in the history
  • Loading branch information
mrrooijen committed Oct 17, 2024
1 parent 8831ad2 commit 3db6bdd
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
27 changes: 27 additions & 0 deletions hirefire_resource/macro/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,32 @@ class ChannelError(Exception):
from hirefire_resource.errors import MissingQueueError


def mitigate_connection_reset_error(retries=10, delay=1):
"""
Decorator to retry a function when ConnectionResetError occurs.
Args:
retries (int): Number of retry attempts for connection errors.
delay (int): Fixed delay between retry attempts in seconds.
"""

def decorator(func):
def wrapper(*args, **kwargs):
for attempt in range(retries):
try:
return func(*args, **kwargs)
except ConnectionResetError:
if attempt < retries - 1:
time.sleep(delay)
else:
raise

return wrapper

return decorator


@mitigate_connection_reset_error()
def job_queue_latency(*queues, broker_url=None):
"""
Calculates the maximum job queue latency across the specified queues using Celery with either
Expand Down Expand Up @@ -183,6 +209,7 @@ async def async_job_queue_latency(*queues, broker_url=None):
return await loop.run_in_executor(None, func)


@mitigate_connection_reset_error()
def job_queue_size(*queues, broker_url=None):
"""
Calculates the total job queue size across the specified queues using Celery with either Redis
Expand Down
23 changes: 23 additions & 0 deletions tests/hirefire_resource/macro/test_celery.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import math
from datetime import datetime, timedelta, timezone
from unittest.mock import patch

import pytest
from celery import Celery
Expand Down Expand Up @@ -77,6 +78,20 @@ def test_job_queue_latency_with_jobs(celery_app):
)


def test_job_queue_latency_connection_reset(celery_app):
with patch(
"hirefire_resource.macro.celery._job_queue_latency_redis"
) as mock_job_queue_latency_redis:
with patch(
"hirefire_resource.macro.celery._job_queue_latency_rabbitmq"
) as mock_job_queue_latency_rabbitmq:
mock_job_queue_latency_redis.side_effect = [ConnectionResetError, 0]
mock_job_queue_latency_rabbitmq.side_effect = [ConnectionResetError, 0]
assert (
job_queue_latency("celery", broker_url=celery_app.conf.broker_url) == 0
)


def test_job_queue_latency_with_jobs_multi(celery_app):
enqueue_for_job_queue_latency_with_job(celery_app)

Expand Down Expand Up @@ -168,6 +183,14 @@ def test_job_queue_size_with_jobs(celery_app):
)


def test_job_queue_size_connection_reset(celery_app):
with patch(
"hirefire_resource.macro.celery._job_queue_size_worker"
) as mock_job_queue_size_worker:
mock_job_queue_size_worker.side_effect = [ConnectionResetError, 0]
assert job_queue_size("celery", broker_url=celery_app.conf.broker_url) == 0


@pytest.mark.asyncio
async def test_async_job_queue_size_missing_queue():
with pytest.raises(MissingQueueError):
Expand Down

0 comments on commit 3db6bdd

Please sign in to comment.