From 0082f76f1fd6b3e066108d2c437861c9780686b3 Mon Sep 17 00:00:00 2001 From: Duprat Date: Wed, 20 Dec 2023 22:20:49 +0100 Subject: [PATCH] wip replace deque with asyncio.Semaphore --- Lib/asyncio/queues.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 90a4a356869a17..e7157983c7dfa2 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -38,7 +38,8 @@ class Queue(mixins._LoopBoundMixin): def __init__(self, maxsize=0): self._maxsize = maxsize - self._sem_putters = locks.Semaphore(maxsize) + if maxsize > 0: + self._sem_putters = locks.Semaphore(maxsize) self._sem_getters = locks.Semaphore(0) self._unfinished_tasks = 0 self._finished = locks.Event() @@ -47,11 +48,13 @@ def __init__(self, maxsize=0): @property def _putters(self): - return self._sem_putters._waiters or [] + if self.maxsize > 0: + return self._sem_putters._waiters if self._sem_putters._waiters is not None else [] + return [] @property def _getters(self): - return self._sem_getters._waiters or [] + return self._sem_getters._waiters if self._sem_getters._waiters is not None else [] # These three are overridable in subclasses. def _init(self, maxsize): @@ -67,11 +70,12 @@ def _put(self, item): def _wakeup_next(self, waiters): # Wake up the next waiter (if any) that isn't cancelled. - if waiters is self._sem_putters: - if self._maxsize > 0: - self._sem_putters.release() - else: + if waiters is self._sem_getters: self._sem_getters.release() + return + + if self._maxsize > 0: + self._sem_putters.release() def __repr__(self): return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' @@ -137,7 +141,7 @@ def put_nowait(self, item): """ if self.full(): raise QueueFull - if self._sem_putters._waiters: + if self._maxsize > 0 and self._sem_putters.locked(): raise QueueFull # PendingPutTasks self._put_and_wakeup_next(item) @@ -166,7 +170,7 @@ def get_nowait(self): """ if self.empty(): raise QueueEmpty - if self._sem_getters._waiters: + if self._sem_getters.locked(): raise QueueEmpty # PendingGetTasks return self._get_and_wakeup_next() @@ -174,7 +178,8 @@ def _get_and_wakeup_next(self): """Remove and return an item from the queue. """ item = self._get() - self._wakeup_next(self._sem_putters) + if self.maxsize > 0: + self._wakeup_next(self._sem_putters) return item def task_done(self):