Skip to content

Commit

Permalink
refactor(rate_limits): use builtin incr and decr w/ memcached backend
Browse files Browse the repository at this point in the history
  • Loading branch information
Bogdanp committed Dec 23, 2018
1 parent f476aaf commit 590a82d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 41 deletions.
4 changes: 2 additions & 2 deletions dramatiq/rate_limits/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ def wait(self, key, timeout): # pragma: no cover
raise NotImplementedError

def wait_notify(self, key, ttl): # pragma: no cover
"""Notify parties wait()ing on a key that an event has occurred.
"""Notify parties wait()ing on a key that an event has
occurred. The default implementation is a no-op.
Parameters:
key(str): The key to notify on.
Expand All @@ -106,4 +107,3 @@ def wait_notify(self, key, ttl): # pragma: no cover
Returns:
None
"""
raise NotImplementedError
34 changes: 2 additions & 32 deletions dramatiq/rate_limits/backends/memcached.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,42 +49,12 @@ def add(self, key, value, ttl):
return client.add(key, value, time=int(ttl / 1000))

def incr(self, key, amount, maximum, ttl):
ttl = int(ttl / 1000)
with self.pool.reserve(block=True) as client:
while True:
value, cid = client.gets(key)
if cid is None:
return False

value += amount
if value > maximum:
return False

try:
swapped = client.cas(key, value, cid, ttl)
if swapped:
return True
except NotFound: # pragma: no cover
continue
return client.incr(key, amount) <= maximum

def decr(self, key, amount, minimum, ttl):
ttl = int(ttl / 1000)
with self.pool.reserve(block=True) as client:
while True:
value, cid = client.gets(key)
if cid is None:
return False

value -= amount
if value < minimum:
return False

try:
swapped = client.cas(key, value, cid, ttl)
if swapped:
return True
except NotFound: # pragma: no cover
continue
return client.decr(key, amount) >= minimum

def incr_and_sum(self, key, keys, amount, maximum, ttl):
ttl = int(ttl / 1000)
Expand Down
22 changes: 15 additions & 7 deletions tests/test_barrier.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import time
from concurrent.futures import ThreadPoolExecutor

import pytest

from dramatiq.rate_limits import Barrier


Expand Down Expand Up @@ -33,10 +35,13 @@ def worker():
assert barrier.wait(timeout=1000)
times.append(time.monotonic())

# When I run those workers
with ThreadPoolExecutor(max_workers=8) as e:
for future in [e.submit(worker), e.submit(worker)]:
future.result()
try:
# When I run those workers
with ThreadPoolExecutor(max_workers=8) as e:
for future in [e.submit(worker), e.submit(worker)]:
future.result()
except NotImplementedError:
pytest.skip("Waiting is not supported under this backend.")

# Then their execution times should be really close to one another
assert abs(times[0] - times[1]) <= 0.01
Expand All @@ -47,6 +52,9 @@ def test_barriers_can_timeout(rate_limiter_backend):
barrier = Barrier(rate_limiter_backend, "sequential-barrier", ttl=30000)
assert barrier.create(parties=2)

# When I wait on the barrier with a timeout
# Then I should get False back
assert not barrier.wait(timeout=1000)
try:
# When I wait on the barrier with a timeout
# Then I should get False back
assert not barrier.wait(timeout=1000)
except NotImplementedError:
pytest.skip("Waiting is not supported under this backend.")

0 comments on commit 590a82d

Please sign in to comment.