From cdc597d1fe6054f4698d569e809f538dcf0322f4 Mon Sep 17 00:00:00 2001 From: Noah Ablaseau Date: Thu, 14 Apr 2022 15:23:10 -0400 Subject: [PATCH] fix pipe fd leak from parallel task processes need to close() or unref the multiprocessing.Process --- fabric/job_queue.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/fabric/job_queue.py b/fabric/job_queue.py index c90c5139..f531560d 100644 --- a/fabric/job_queue.py +++ b/fabric/job_queue.py @@ -148,7 +148,12 @@ def _advance_the_queue(): if self._debug: print("Job queue found finished proc: %s." % job.name) done = self._running.pop(id) - self._completed.append(done) + self._completed.append((done.name, done.exitcode)) + if hasattr(done, 'close') and callable(done.close): + done.close() + # multiprocessing.Process.close() added in Python-3.7 + # for older versions of python, GC will have to do + del done if self._debug: print("Job queue has %d running." % len(self._running)) @@ -157,9 +162,6 @@ def _advance_the_queue(): if self._debug: print("Job queue finished.") - for job in self._completed: - job.join() - self._finished = True # Each loop pass, try pulling results off the queue to keep its @@ -175,9 +177,9 @@ def _advance_the_queue(): self._fill_results(results) # Attach exit codes now that we're all done & have joined all jobs - for job in self._completed: + for job_name, exit_code in self._completed: if isinstance(job, Process): - results[job.name]['exit_code'] = job.exitcode + results[job_name]['exit_code'] = exit_code return results