Skip to content

Commit

Permalink
Rename private attrs, update comments
Browse files Browse the repository at this point in the history
  • Loading branch information
YvesDup committed Oct 15, 2024
1 parent aabdaa3 commit b7ae374
Showing 1 changed file with 45 additions and 38 deletions.
83 changes: 45 additions & 38 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(self, maxsize=0, *, ctx):
self._wlock = ctx.Lock()
self._sem = ctx.BoundedSemaphore(maxsize)
self._is_shutdown = ctx.Value('B', False)
self._n_pendings = ctx.Array('Q',[0, 0])
self._n_pending_processes = ctx.Array('Q',[0, 0])

# For use by concurrent.futures
self._ignore_epipe = False
Expand All @@ -59,12 +59,12 @@ def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid,
self._is_shutdown, self._n_pendings)
self._is_shutdown, self._n_pending_processes)

def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid,
self._is_shutdown, self._n_pendings) = state
self._is_shutdown, self._n_pending_processes) = state
self._reset()

def _after_fork(self):
Expand All @@ -88,15 +88,19 @@ def _reset(self, after_fork=False):

@contextmanager
def _handle_pending_processes(self, get_or_put):
# Count pending processes. Used when queue shutdowns
# Count pending get or put processes in a shared array.
# These two values are only used when queue shutdowns
# to release all pending processes.
with self._n_pendings.get_lock():
self._n_pendings[get_or_put] += 1
with self._n_pending_processes:
self._n_pending_processes[get_or_put] += 1
try:
# Wraps calls to _sem.acquire() in put method and,
# calls to _rlock.acquire() or _recv_bytes()
# in get method.
yield
finally:
with self._n_pendings.get_lock():
self._n_pendings[get_or_put] -= 1
with self._n_pending_processes:
self._n_pending_processes[get_or_put] -= 1

def put(self, obj, block=True, timeout=None):
if self._closed:
Expand All @@ -112,7 +116,8 @@ def put(self, obj, block=True, timeout=None):

if self._is_shutdown.value:
# Released from acquire below.
if self._n_pendings[Queue._PUTTERS] > 0:
if self._n_pending_processes[Queue._PUTTERS] > 0:
debug('`put` release next pending putter process -> shutdown')
self._sem.release()
raise ShutDown

Expand All @@ -127,6 +132,7 @@ def get(self, block=True, timeout=None):
raise ValueError(f"Queue {self!r} is closed")
if self._is_shutdown.value and self.empty():
raise ShutDown

if block and timeout is None:
with self._handle_pending_processes(Queue._GETTERS):
with self._rlock:
Expand Down Expand Up @@ -156,14 +162,15 @@ def get(self, block=True, timeout=None):
finally:
self._rlock.release()

# unserialize the data before having released the lock
# to check if it's not a dummy item.
final_res = _ForkingPickler.loads(res)
if isinstance(final_res, _DummyItem) and self._is_shutdown.value:
# Unserialize the data before releasing the lock.
# When shutdowns, checks if this is a sentinel item.
item = _ForkingPickler.loads(res)
if self._is_shutdown.value and isinstance(item, _SentinelShutdown):
debug('`get` got _sentinel_shutdown -> shutdown')
raise ShutDown

self._sem.release()
return final_res
return item

def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
Expand All @@ -189,35 +196,35 @@ def _clear(self):
def shutdown(self, immediate=False):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
with self._is_shutdown.get_lock():
with self._is_shutdown:
self._is_shutdown.value = True

# Unblock all pending getter processes.
# Put specific dummy data in the pipe.
for _ in range(self._n_pendings[Queue._GETTERS]):
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(_dummy)
# Put specific sentinel item in the pipe.
with self._notempty:
if self._thread is None:
self._start_thread()
for _ in range(self._n_pending_processes[Queue._GETTERS]):
self._buffer.append(_sentinel_shutdown)
self._notempty.notify()
else:
debug(f'on shutdown, {self._n_pendings[Queue._GETTERS]}' +
'pending getter processes to release')
else:
debug(f'when shutdown, {self._n_pending_processes[Queue._GETTERS]} ' +
'pending getter processes to release')


# Unblock all pending putter processes.
if self._n_pendings[Queue._PUTTERS] > 0:
debug(f'on shutdown, {self._n_pendings[Queue._PUTTERS]}' +
if self._n_pending_processes[Queue._PUTTERS] > 0:
debug(f'when shutdown, {self._n_pending_processes[Queue._PUTTERS]} ' +
'pending putters processes to release')
# Here we start to release for a first putter process.
# When this process is unblock, checks again and
# continue to release in cascade
# until there is no more putters.
# We start releasing a first putter process.
# In the `put` method, as soon as the target process is
# unblocked, we continue releasing in cascade until
# there are no more putter processes.
self._sem.release()

# if there are pending getters processes, queue is empty.
if immediate and not self._n_pendings[Queue._GETTERS]:
debug(f'on shutdown, clear all items in pipe')
if immediate and not self._n_pending_processes[Queue._GETTERS]:
debug(f'when shutdown, clear all items in pipe')
self._clear()

def close(self):
Expand Down Expand Up @@ -386,10 +393,10 @@ def _on_queue_feeder_error(e, obj):
__class_getitem__ = classmethod(types.GenericAlias)


# dummy data used to release
# sentinel used to release
# pending getter processes.
class _DummyItem: pass
_dummy = _DummyItem()
class _SentinelShutdown: pass
_sentinel_shutdown = _SentinelShutdown()


_sentinel = object()
Expand Down Expand Up @@ -429,14 +436,14 @@ def put(self, obj, block=True, timeout=None):
raise Full

if self._is_shutdown.value:
if self._n_pendings[Queue._PUTTERS] > 0:
if self._n_pending_processes[Queue._PUTTERS] > 0:
self._sem.release()
raise ShutDown

with self._notempty: #, self._cond:
with self._notempty:
# Here it seems to me that `self._cond` is unnecessary in
# the "with" instruction.
# So now, this method and its inherited method are identical except
# This method and its inherited method are identical except
# a call to class instructions.
# Here this is a call to `self._unfinished_tasks.release()`.
#
Expand Down

0 comments on commit b7ae374

Please sign in to comment.