Skip to content

Commit

Permalink
retries: increase retries counter before failing
Browse files Browse the repository at this point in the history
  • Loading branch information
giuppep authored Oct 9, 2021
1 parent 7751c1d commit d5737fb
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ of those changes to CLEARTYPE SRL.
| [@rouge8](https://github.com/rouge8) | Andy Freeland |
| [@thomazthz](https://github.com/thomazthz) | Thomaz Soares |
| [@FinnLidbetter](https://github.com/FinnLidbetter) | Finn Lidbetter |
| [@giuppep](https://github.com/giuppep) | Giuseppe Papallo |
7 changes: 4 additions & 3 deletions dramatiq/middleware/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
return

retries = message.options.setdefault("retries", 0)

message.options["retries"] += 1
message.options["traceback"] = traceback.format_exc(limit=30)

max_retries = message.options.get("max_retries") or actor.options.get("max_retries", self.max_retries)
retry_when = actor.options.get("retry_when", self.retry_when)
if retry_when is not None and not retry_when(retries, exception) or \
Expand All @@ -100,9 +104,6 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
message.fail()
return

message.options["retries"] += 1
message.options["traceback"] = traceback.format_exc(limit=30)

if isinstance(exception, Retry) and exception.delay is not None:
delay = exception.delay
else:
Expand Down
38 changes: 38 additions & 0 deletions tests/test_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,41 @@ def callback(message, res):
# Then I should get back a TypeError
with pytest.raises(TypeError):
do_work.send_with_options(on_success=callback)


def test_actor_callback_knows_correct_number_of_retries(stub_broker, stub_worker):
MAX_RETRIES = 3
attempts, retries = [], []

# Given a callback that handles failure only after the last retry
@dramatiq.actor
def my_callback(message_data, exception_data):
global handled
handled = False
retry = message_data["options"]["retries"]
retries.append(retry)
# Handle failure after last retry
if retry > MAX_RETRIES:
handled = True

# And an actor that fails every time
@dramatiq.actor(max_retries=MAX_RETRIES, max_backoff=100, on_failure=my_callback.actor_name)
def do_work():
attempts.append(1)
raise RuntimeError("failure")

# When I send it a message
do_work.send()

# And join on the queue
stub_broker.join(do_work.queue_name)
stub_worker.join()

# Then I expect 4 attempts to have occurred
assert len(attempts) == 4
assert len(retries) == len(attempts)

# And I expect the retry number to increase every time
assert retries == [1, 2, 3, 4]
# And I expect the callback to have handled the failure
assert handled

0 comments on commit d5737fb

Please sign in to comment.