From 590a82d56226e23792d0580f7d118fa960c459b8 Mon Sep 17 00:00:00 2001 From: Bogdan Popa Date: Sun, 23 Dec 2018 19:08:26 +0200 Subject: [PATCH] refactor(rate_limits): use builtin incr and decr w/ memcached backend --- dramatiq/rate_limits/backend.py | 4 +-- dramatiq/rate_limits/backends/memcached.py | 34 ++-------------------- tests/test_barrier.py | 22 +++++++++----- 3 files changed, 19 insertions(+), 41 deletions(-) diff --git a/dramatiq/rate_limits/backend.py b/dramatiq/rate_limits/backend.py index 7d1e16da..0157d3d6 100644 --- a/dramatiq/rate_limits/backend.py +++ b/dramatiq/rate_limits/backend.py @@ -96,7 +96,8 @@ def wait(self, key, timeout): # pragma: no cover raise NotImplementedError def wait_notify(self, key, ttl): # pragma: no cover - """Notify parties wait()ing on a key that an event has occurred. + """Notify parties wait()ing on a key that an event has + occurred. The default implementation is a no-op. Parameters: key(str): The key to notify on. @@ -106,4 +107,3 @@ def wait_notify(self, key, ttl): # pragma: no cover Returns: None """ - raise NotImplementedError diff --git a/dramatiq/rate_limits/backends/memcached.py b/dramatiq/rate_limits/backends/memcached.py index b47b2b7b..c45b3028 100644 --- a/dramatiq/rate_limits/backends/memcached.py +++ b/dramatiq/rate_limits/backends/memcached.py @@ -49,42 +49,12 @@ def add(self, key, value, ttl): return client.add(key, value, time=int(ttl / 1000)) def incr(self, key, amount, maximum, ttl): - ttl = int(ttl / 1000) with self.pool.reserve(block=True) as client: - while True: - value, cid = client.gets(key) - if cid is None: - return False - - value += amount - if value > maximum: - return False - - try: - swapped = client.cas(key, value, cid, ttl) - if swapped: - return True - except NotFound: # pragma: no cover - continue + return client.incr(key, amount) <= maximum def decr(self, key, amount, minimum, ttl): - ttl = int(ttl / 1000) with self.pool.reserve(block=True) as client: - while True: - value, cid = client.gets(key) - if cid is None: - return False - - value -= amount - if value < minimum: - return False - - try: - swapped = client.cas(key, value, cid, ttl) - if swapped: - return True - except NotFound: # pragma: no cover - continue + return client.decr(key, amount) >= minimum def incr_and_sum(self, key, keys, amount, maximum, ttl): ttl = int(ttl / 1000) diff --git a/tests/test_barrier.py b/tests/test_barrier.py index b9a235cf..cc1e77b0 100644 --- a/tests/test_barrier.py +++ b/tests/test_barrier.py @@ -1,6 +1,8 @@ import time from concurrent.futures import ThreadPoolExecutor +import pytest + from dramatiq.rate_limits import Barrier @@ -33,10 +35,13 @@ def worker(): assert barrier.wait(timeout=1000) times.append(time.monotonic()) - # When I run those workers - with ThreadPoolExecutor(max_workers=8) as e: - for future in [e.submit(worker), e.submit(worker)]: - future.result() + try: + # When I run those workers + with ThreadPoolExecutor(max_workers=8) as e: + for future in [e.submit(worker), e.submit(worker)]: + future.result() + except NotImplementedError: + pytest.skip("Waiting is not supported under this backend.") # Then their execution times should be really close to one another assert abs(times[0] - times[1]) <= 0.01 @@ -47,6 +52,9 @@ def test_barriers_can_timeout(rate_limiter_backend): barrier = Barrier(rate_limiter_backend, "sequential-barrier", ttl=30000) assert barrier.create(parties=2) - # When I wait on the barrier with a timeout - # Then I should get False back - assert not barrier.wait(timeout=1000) + try: + # When I wait on the barrier with a timeout + # Then I should get False back + assert not barrier.wait(timeout=1000) + except NotImplementedError: + pytest.skip("Waiting is not supported under this backend.")