diff --git a/a_sync/primitives/locks/prio_semaphore.py b/a_sync/primitives/locks/prio_semaphore.py index 9681caa5..5f50949d 100644 --- a/a_sync/primitives/locks/prio_semaphore.py +++ b/a_sync/primitives/locks/prio_semaphore.py @@ -40,6 +40,8 @@ def __init__(self, value: int = 1, *, name: Optional[str] = None) -> None: self._capacity = value super().__init__(value, name=name) self._waiters = [] + # NOTE: This should (hopefully) be temporary + self._potential_lost_waiters = [] def __repr__(self) -> str: return f"<{self.__class__.__name__} name={self.name} capacity={self._capacity} value={self._value} waiters={[manager._repr_no_parent_() for manager in self._waiters]}>" @@ -85,9 +87,10 @@ def _wake_up_next(self) -> None: while manager._waiters: waiter = manager._waiters.popleft() + waiter = self._potential_lost_waiters.remove(waiter) if not waiter.done(): waiter.set_result(None) - logger.debug(f"woke up %s", waiter) + logger.debug("woke up %s", waiter) woke_up = True break @@ -106,6 +109,15 @@ def _wake_up_next(self) -> None: # There are no more waiters, get rid of the empty manager self._context_managers.pop(manager._priority) return + + # emergency procedure (hopefully temporary): + while self._potential_lost_waiters: + waiter = self._potential_lost_waiters.pop(0) + logger.debug('we found a lost waiter %s', waiter) + if not waiter.done(): + waiter.set_result(None) + logger.warning("woke up lost waiter %s", waiter) + return logger.debug("%s has no waiters to wake", self) class _AbstractPrioritySemaphoreContextManager(Semaphore, Generic[PT]): @@ -156,6 +168,7 @@ async def acquire(self) -> Literal[True]: while self._parent._value <= 0: fut = self.loop.create_future() self.waiters.append(fut) + self._parent._potential_lost_waiters.append(fut) try: await fut except: