Skip to content

Commit

Permalink
fix(tests): use a context manager for workers to avoid leaking them
Browse files Browse the repository at this point in the history
  • Loading branch information
Bogdanp committed Apr 17, 2018
1 parent 8528e4c commit 73f0309
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 62 deletions.
12 changes: 12 additions & 0 deletions tests/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from contextlib import contextmanager
from dramatiq import Worker


@contextmanager
def worker(*args, **kwargs):
try:
worker = Worker(*args, **kwargs)
worker.start()
yield worker
finally:
worker.stop()
65 changes: 33 additions & 32 deletions tests/test_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import pytest
import time

from dramatiq import Message, Middleware, Worker
from dramatiq import Message, Middleware
from dramatiq.errors import RateLimitExceeded
from dramatiq.middleware import SkipMessage
from unittest.mock import patch

from .common import worker

_current_platform = platform.python_implementation()


Expand Down Expand Up @@ -259,19 +261,19 @@ def test_actors_can_be_assigned_message_age_limits(stub_broker):
def do_work():
runs.append(1)

# If I send it a message
# When I send it a message
do_work.send()

# And join on its queue after the age limit has passed
# And wait for its age limit to pass
time.sleep(0.1)
worker = Worker(stub_broker, worker_timeout=100)
worker.start()
stub_broker.join(do_work.queue_name)
worker.join()
worker.stop()

# I expect the message to have been skipped
assert sum(runs) == 0
# Then join on its queue
with worker(stub_broker, worker_timeout=100) as stub_worker:
stub_broker.join(do_work.queue_name)
stub_worker.join()

# I expect the message to have been skipped
assert sum(runs) == 0


def test_actors_can_delay_messages_independent_of_each_other(stub_broker, stub_worker):
Expand Down Expand Up @@ -413,34 +415,33 @@ def track_call():


def test_actors_can_prioritize_work(stub_broker):
# Given that I a paused worker
worker = Worker(stub_broker, worker_timeout=100, worker_threads=1)
worker.start()
worker.pause()
with worker(stub_broker, worker_timeout=100, worker_threads=1) as stub_worker:
# Given that I a paused worker
stub_worker.pause()

# And actors with different priorities
calls = []
# And actors with different priorities
calls = []

@dramatiq.actor(priority=0)
def hi():
calls.append("hi")
@dramatiq.actor(priority=0)
def hi():
calls.append("hi")

@dramatiq.actor(priority=10)
def lo():
calls.append("lo")
@dramatiq.actor(priority=10)
def lo():
calls.append("lo")

# When I send both actors a nubmer of messages
for _ in range(10):
lo.send()
hi.send()
# When I send both actors a nubmer of messages
for _ in range(10):
lo.send()
hi.send()

# Then resume the worker and join on the queue
worker.resume()
stub_broker.join(lo.queue_name)
worker.join()
# Then resume the worker and join on the queue
stub_worker.resume()
stub_broker.join(lo.queue_name)
stub_worker.join()

# Then the high priority actor should run first
assert calls == ["hi"] * 10 + ["lo"] * 10
# Then the high priority actor should run first
assert calls == ["hi"] * 10 + ["lo"] * 10


def test_actors_can_conditionally_retry(stub_broker, stub_worker):
Expand Down
40 changes: 19 additions & 21 deletions tests/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import pytest
import time

from dramatiq import Message, QueueJoinTimeout, Worker
from dramatiq import Message, QueueJoinTimeout
from dramatiq.common import current_millis, dq_name, xq_name

from .common import worker


def test_redis_actors_can_be_sent_messages(redis_broker, redis_worker):
# Given that I have a database
Expand Down Expand Up @@ -106,23 +108,22 @@ def append(x):
results.append(x)

# When I pause the worker
worker = Worker(redis_broker, worker_timeout=100, worker_threads=1)
worker.start()
worker.pause()
with worker(redis_broker, worker_timeout=100, worker_threads=1) as redis_worker:
redis_worker.pause()

# And I send it a delayed message
append.send_with_options(args=(1,), delay=2000)
# And I send it a delayed message
append.send_with_options(args=(1,), delay=2000)

# And then another delayed message with a smaller delay
append.send_with_options(args=(2,), delay=1000)
# And then another delayed message with a smaller delay
append.send_with_options(args=(2,), delay=1000)

# Then resume the worker and join on the queue
worker.resume()
redis_broker.join(append.queue_name)
worker.join()
# Then resume the worker and join on the queue
redis_worker.resume()
redis_broker.join(append.queue_name)
redis_worker.join()

# I expect the latter message to have been run first
assert results == [2, 1]
# I expect the latter message to have been run first
assert results == [2, 1]


def test_redis_unacked_messages_can_be_requeued(redis_broker):
Expand Down Expand Up @@ -240,10 +241,8 @@ def do_work():
message_2 = do_work.send()

# Then start a worker and subsequently shut it down
worker = Worker(redis_broker, worker_threads=1)
worker.start()
time.sleep(0.25)
worker.stop()
with worker(redis_broker, worker_threads=1):
time.sleep(0.25)

# I expect it to have processed one of the messages and re-enqueued the other
messages = redis_broker.client.lrange("dramatiq:%s" % do_work.queue_name, 0, 10)
Expand All @@ -264,9 +263,8 @@ def do_work():
message = do_work.send_with_options(delay=10000)

# Then start a worker and subsequently shut it down
worker = Worker(redis_broker, worker_threads=1)
worker.start()
worker.stop()
with worker(redis_broker, worker_threads=1):
pass

# I expect it to have re-enqueued the message
messages = redis_broker.client.lrange("dramatiq:%s" % dq_name(do_work.queue_name), 0, 10)
Expand Down
13 changes: 4 additions & 9 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
from dramatiq import Worker
from .common import worker


def test_workers_dont_register_queues_that_arent_whitelisted(stub_broker):
# Given that I have a worker object with a restricted set of queues
worker = Worker(stub_broker, queues={"a", "b"})
worker.start()

try:
with worker(stub_broker, queues={"a", "b"}) as stub_worker:
# When I try to register a consumer for a queue that hasn't been whitelisted
stub_broker.declare_queue("c")
stub_broker.declare_queue("c.DQ")

# Then a consumer should not get spun up for that queue
assert "c" not in worker.consumers
assert "c.DQ" not in worker.consumers
finally:
worker.stop()
assert "c" not in stub_worker.consumers
assert "c.DQ" not in stub_worker.consumers

0 comments on commit 73f0309

Please sign in to comment.