Skip to content

Commit

Permalink
wip replace deque with asyncio.Semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
YvesDup committed Dec 20, 2023
1 parent 4accfdb commit 0082f76
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions Lib/asyncio/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class Queue(mixins._LoopBoundMixin):
def __init__(self, maxsize=0):
self._maxsize = maxsize

self._sem_putters = locks.Semaphore(maxsize)
if maxsize > 0:
self._sem_putters = locks.Semaphore(maxsize)
self._sem_getters = locks.Semaphore(0)
self._unfinished_tasks = 0
self._finished = locks.Event()
Expand All @@ -47,11 +48,13 @@ def __init__(self, maxsize=0):

@property
def _putters(self):
return self._sem_putters._waiters or []
if self.maxsize > 0:
return self._sem_putters._waiters if self._sem_putters._waiters is not None else []
return []

@property
def _getters(self):
return self._sem_getters._waiters or []
return self._sem_getters._waiters if self._sem_getters._waiters is not None else []

# These three are overridable in subclasses.
def _init(self, maxsize):
Expand All @@ -67,11 +70,12 @@ def _put(self, item):

def _wakeup_next(self, waiters):
# Wake up the next waiter (if any) that isn't cancelled.
if waiters is self._sem_putters:
if self._maxsize > 0:
self._sem_putters.release()
else:
if waiters is self._sem_getters:
self._sem_getters.release()
return

if self._maxsize > 0:
self._sem_putters.release()

def __repr__(self):
return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
Expand Down Expand Up @@ -137,7 +141,7 @@ def put_nowait(self, item):
"""
if self.full():
raise QueueFull
if self._sem_putters._waiters:
if self._maxsize > 0 and self._sem_putters.locked():
raise QueueFull # PendingPutTasks
self._put_and_wakeup_next(item)

Expand Down Expand Up @@ -166,15 +170,16 @@ def get_nowait(self):
"""
if self.empty():
raise QueueEmpty
if self._sem_getters._waiters:
if self._sem_getters.locked():
raise QueueEmpty # PendingGetTasks
return self._get_and_wakeup_next()

def _get_and_wakeup_next(self):
"""Remove and return an item from the queue.
"""
item = self._get()
self._wakeup_next(self._sem_putters)
if self.maxsize > 0:
self._wakeup_next(self._sem_putters)
return item

def task_done(self):
Expand Down

0 comments on commit 0082f76

Please sign in to comment.