-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththrottler.py
35 lines (29 loc) · 1 KB
/
throttler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from collections import deque
import time
import asyncio
# From https://github.com/hallazzang/asyncio-throttle
class Throttler:
'''Implements a rate-limit throttler for aiohttp'''
def __init__(self, rate_limit, period=1.0, retry_interval=0.01):
self.rate_limit = rate_limit
self.period = period
self.retry_interval = retry_interval
self._task_logs = deque()
def flush(self):
now = time.time()
while self._task_logs:
if now - self._task_logs[0] > self.period:
self._task_logs.popleft()
else:
break
async def acquire(self):
while True:
self.flush()
if len(self._task_logs) < self.rate_limit:
break
await asyncio.sleep(self.retry_interval)
self._task_logs.append(time.time())
async def __aenter__(self):
await self.acquire()
async def __aexit__(self, exc_type, exc, tb):
pass