diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 99948bbc5c97a9..4e63db151ec406 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -47,7 +47,7 @@ def __init__(self, maxsize=0, *, ctx): self._wlock = ctx.Lock() self._sem = ctx.BoundedSemaphore(maxsize) self._is_shutdown = ctx.Value('B', False) - self._n_pendings = ctx.Array('Q',[0, 0]) + self._n_pending_processes = ctx.Array('Q',[0, 0]) # For use by concurrent.futures self._ignore_epipe = False @@ -59,12 +59,12 @@ def __getstate__(self): context.assert_spawning(self) return (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid, - self._is_shutdown, self._n_pendings) + self._is_shutdown, self._n_pending_processes) def __setstate__(self, state): (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid, - self._is_shutdown, self._n_pendings) = state + self._is_shutdown, self._n_pending_processes) = state self._reset() def _after_fork(self): @@ -88,15 +88,19 @@ def _reset(self, after_fork=False): @contextmanager def _handle_pending_processes(self, get_or_put): - # Count pending processes. Used when queue shutdowns + # Count pending get or put processes in a shared array. + # These two values are only used when queue shutdowns # to release all pending processes. - with self._n_pendings.get_lock(): - self._n_pendings[get_or_put] += 1 + with self._n_pending_processes: + self._n_pending_processes[get_or_put] += 1 try: + # Wraps calls to _sem.acquire() in put method and, + # calls to _rlock.acquire() or _recv_bytes() + # in get method. yield finally: - with self._n_pendings.get_lock(): - self._n_pendings[get_or_put] -= 1 + with self._n_pending_processes: + self._n_pending_processes[get_or_put] -= 1 def put(self, obj, block=True, timeout=None): if self._closed: @@ -112,7 +116,8 @@ def put(self, obj, block=True, timeout=None): if self._is_shutdown.value: # Released from acquire below. - if self._n_pendings[Queue._PUTTERS] > 0: + if self._n_pending_processes[Queue._PUTTERS] > 0: + debug('`put` release next pending putter process -> shutdown') self._sem.release() raise ShutDown @@ -127,6 +132,7 @@ def get(self, block=True, timeout=None): raise ValueError(f"Queue {self!r} is closed") if self._is_shutdown.value and self.empty(): raise ShutDown + if block and timeout is None: with self._handle_pending_processes(Queue._GETTERS): with self._rlock: @@ -156,14 +162,15 @@ def get(self, block=True, timeout=None): finally: self._rlock.release() - # unserialize the data before having released the lock - # to check if it's not a dummy item. - final_res = _ForkingPickler.loads(res) - if isinstance(final_res, _DummyItem) and self._is_shutdown.value: + # Unserialize the data before releasing the lock. + # When shutdowns, checks if this is a sentinel item. + item = _ForkingPickler.loads(res) + if self._is_shutdown.value and isinstance(item, _SentinelShutdown): + debug('`get` got _sentinel_shutdown -> shutdown') raise ShutDown self._sem.release() - return final_res + return item def qsize(self): # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() @@ -189,35 +196,35 @@ def _clear(self): def shutdown(self, immediate=False): if self._closed: raise ValueError(f"Queue {self!r} is closed") - with self._is_shutdown.get_lock(): + with self._is_shutdown: self._is_shutdown.value = True # Unblock all pending getter processes. - # Put specific dummy data in the pipe. - for _ in range(self._n_pendings[Queue._GETTERS]): - with self._notempty: - if self._thread is None: - self._start_thread() - self._buffer.append(_dummy) + # Put specific sentinel item in the pipe. + with self._notempty: + if self._thread is None: + self._start_thread() + for _ in range(self._n_pending_processes[Queue._GETTERS]): + self._buffer.append(_sentinel_shutdown) self._notempty.notify() - else: - debug(f'on shutdown, {self._n_pendings[Queue._GETTERS]}' + - 'pending getter processes to release') + else: + debug(f'when shutdown, {self._n_pending_processes[Queue._GETTERS]} ' + + 'pending getter processes to release') # Unblock all pending putter processes. - if self._n_pendings[Queue._PUTTERS] > 0: - debug(f'on shutdown, {self._n_pendings[Queue._PUTTERS]}' + + if self._n_pending_processes[Queue._PUTTERS] > 0: + debug(f'when shutdown, {self._n_pending_processes[Queue._PUTTERS]} ' + 'pending putters processes to release') - # Here we start to release for a first putter process. - # When this process is unblock, checks again and - # continue to release in cascade - # until there is no more putters. + # We start releasing a first putter process. + # In the `put` method, as soon as the target process is + # unblocked, we continue releasing in cascade until + # there are no more putter processes. self._sem.release() # if there are pending getters processes, queue is empty. - if immediate and not self._n_pendings[Queue._GETTERS]: - debug(f'on shutdown, clear all items in pipe') + if immediate and not self._n_pending_processes[Queue._GETTERS]: + debug(f'when shutdown, clear all items in pipe') self._clear() def close(self): @@ -386,10 +393,10 @@ def _on_queue_feeder_error(e, obj): __class_getitem__ = classmethod(types.GenericAlias) -# dummy data used to release +# sentinel used to release # pending getter processes. -class _DummyItem: pass -_dummy = _DummyItem() +class _SentinelShutdown: pass +_sentinel_shutdown = _SentinelShutdown() _sentinel = object() @@ -429,14 +436,14 @@ def put(self, obj, block=True, timeout=None): raise Full if self._is_shutdown.value: - if self._n_pendings[Queue._PUTTERS] > 0: + if self._n_pending_processes[Queue._PUTTERS] > 0: self._sem.release() raise ShutDown - with self._notempty: #, self._cond: + with self._notempty: # Here it seems to me that `self._cond` is unnecessary in # the "with" instruction. - # So now, this method and its inherited method are identical except + # This method and its inherited method are identical except # a call to class instructions. # Here this is a call to `self._unfinished_tasks.release()`. #