Skip to content

Commit

Permalink
gh-125451: Fix deadlock in ProcessPoolExecutor shutdown (#125492)
Browse files Browse the repository at this point in the history
There was a deadlock when `ProcessPoolExecutor` shuts down at the same
time that a queueing thread handles an error processing a task.

Don't use `_shutdown_lock` to protect the `_ThreadWakeup` pipes -- use
an internal lock instead. This fixes the ordering deadlock where the
`ExecutorManagerThread` holds the `_shutdown_lock` and joins the
queueing thread, while the queueing thread is attempting to acquire the
`_shutdown_lock` while closing the `_ThreadWakeup`.
  • Loading branch information
colesbury authored Oct 16, 2024
1 parent d83fcf8 commit 760872e
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 32 deletions.
50 changes: 21 additions & 29 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,31 @@
class _ThreadWakeup:
def __init__(self):
self._closed = False
self._lock = threading.Lock()
self._reader, self._writer = mp.Pipe(duplex=False)

def close(self):
# Please note that we do not take the shutdown lock when
# Please note that we do not take the self._lock when
# calling clear() (to avoid deadlocking) so this method can
# only be called safely from the same thread as all calls to
# clear() even if you hold the shutdown lock. Otherwise we
# clear() even if you hold the lock. Otherwise we
# might try to read from the closed pipe.
if not self._closed:
self._closed = True
self._writer.close()
self._reader.close()
with self._lock:
if not self._closed:
self._closed = True
self._writer.close()
self._reader.close()

def wakeup(self):
if not self._closed:
self._writer.send_bytes(b"")
with self._lock:
if not self._closed:
self._writer.send_bytes(b"")

def clear(self):
if not self._closed:
while self._reader.poll():
self._reader.recv_bytes()
if self._closed:
raise RuntimeError('operation on closed _ThreadWakeup')
while self._reader.poll():
self._reader.recv_bytes()


def _python_exit():
Expand Down Expand Up @@ -167,10 +171,8 @@ def __init__(self, work_id, fn, args, kwargs):

class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
thread_wakeup):
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
self.pending_work_items = pending_work_items
self.shutdown_lock = shutdown_lock
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)

Expand All @@ -179,8 +181,7 @@ def _on_queue_feeder_error(self, e, obj):
tb = format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
with self.shutdown_lock:
self.thread_wakeup.wakeup()
self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this
# case, the executor_manager_thread fails all work_items
# with BrokenProcessPool
Expand Down Expand Up @@ -296,12 +297,10 @@ def __init__(self, executor):
# if there is no pending work item.
def weakref_cb(_,
thread_wakeup=self.thread_wakeup,
shutdown_lock=self.shutdown_lock,
mp_util_debug=mp.util.debug):
mp_util_debug('Executor collected: triggering callback for'
' QueueManager wakeup')
with shutdown_lock:
thread_wakeup.wakeup()
thread_wakeup.wakeup()

self.executor_reference = weakref.ref(executor, weakref_cb)

Expand Down Expand Up @@ -429,11 +428,6 @@ def wait_result_broken_or_wakeup(self):
elif wakeup_reader in ready:
is_broken = False

# No need to hold the _shutdown_lock here because:
# 1. we're the only thread to use the wakeup reader
# 2. we're also the only thread to call thread_wakeup.close()
# 3. we want to avoid a possible deadlock when both reader and writer
# would block (gh-105829)
self.thread_wakeup.clear()

return result_item, is_broken, cause
Expand Down Expand Up @@ -721,10 +715,9 @@ def __init__(self, max_workers=None, mp_context=None,
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
#
# _shutdown_lock must be locked to access _ThreadWakeup.close() and
# .wakeup(). Care must also be taken to not call clear or close from
# more than one thread since _ThreadWakeup.clear() is not protected by
# the _shutdown_lock
# Care must be taken to only call clear and close from the
# executor_manager_thread, since _ThreadWakeup.clear() is not protected
# by a lock.
self._executor_manager_thread_wakeup = _ThreadWakeup()

# Create communication channels for the executor
Expand All @@ -735,7 +728,6 @@ def __init__(self, max_workers=None, mp_context=None,
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
shutdown_lock=self._shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
Expand Down
3 changes: 0 additions & 3 deletions Lib/test/test_concurrent_futures/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,6 @@ def test_cancel_futures_wait_false(self):


class ProcessPoolShutdownTest(ExecutorShutdownTest):
# gh-125451: 'lock' cannot be serialized, the test is broken
# and hangs randomly
@unittest.skipIf(True, "broken test")
def test_processes_terminate(self):
def acquire_lock(lock):
lock.acquire()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix deadlock when :class:`concurrent.futures.ProcessPoolExecutor` shuts down
concurrently with an error when feeding a job to a worker process.

0 comments on commit 760872e

Please sign in to comment.