Skip to content

Commit

Permalink
brokers: flush StubBroker's DLQ when flush_all is called
Browse files Browse the repository at this point in the history
Closes Bogdanp#243.
  • Loading branch information
CapedHero authored and Bogdanp committed Dec 16, 2019
1 parent e304600 commit a233948
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 3 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ of those changes to CLEARTYPE SRL.
| [@wsantos](https://github.com/wsantos) | Waldecir Santos |
| [@jonathanlintott](http://github.com/jonathanlintott) | Jonathan Lintott |
| [@evstratbg](https://github.com/evstratbg) | Bogdan Evstratenko |
| [@CapedHero](https://github.com/CapedHero) | Maciej Wrześniewski |
4 changes: 4 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ Fixed

* An issue where the `max_age` parameter to |AgeLimit| was being
ignored. (`#240`_, `@evstratbg`_)
* The |StubBroker| now flushes its dead letter queue when its
``flush_all`` method is called. (`#247`_, `@CapedHero`_)

.. _#240: https://github.com/Bogdanp/dramatiq/pull/240
.. _#247: https://github.com/Bogdanp/dramatiq/pull/247
.. _@CapedHero: https://github.com/CapedHero
.. _@evstratbg: https://github.com/evstratbg

Changed
Expand Down
2 changes: 2 additions & 0 deletions dramatiq/brokers/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ def flush_all(self):
for queue_name in chain(self.queues, self.delay_queues):
self.flush(queue_name)

self.dead_letters_by_queue.clear()

# TODO: Make fail_fast default to True.
def join(self, queue_name, *, fail_fast=False, timeout=None):
"""Wait for all the messages on the given queue to be
Expand Down
13 changes: 10 additions & 3 deletions tests/test_stub_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,22 @@ def do_work():
# When I send that actor a message
do_work.send()

# Then its queue should contain a message
# And when there is already a message on that actor's dead-letter queue
stub_broker.dead_letters_by_queue[do_work.queue_name].append("dead letter")

# Then its queue should contain the right number of messages
assert stub_broker.queues[do_work.queue_name].qsize() == 1
assert len(stub_broker.dead_letters) == 1

# When I flush all the queues in the broker
# When I flush all of the queues
stub_broker.flush_all()

# Then the queue should be empty and it should contain no in-progress tasks
# Then the queue should be empty
assert stub_broker.queues[do_work.queue_name].qsize() == 0
# and it should contain no in-progress tasks
assert stub_broker.queues[do_work.queue_name].unfinished_tasks == 0
# and the dead-letter queue should be empty
assert len(stub_broker.dead_letters) == 0


def test_stub_broker_can_join_with_timeout(stub_broker, stub_worker):
Expand Down

0 comments on commit a233948

Please sign in to comment.