Skip to content

Commit

Permalink
limit command_queue to limit memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-schick committed Oct 10, 2022
1 parent 3729653 commit f453a0e
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions mara_pipelines/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import functools
import multiprocessing
import os
import queue
import sys
import signal
import atexit
Expand Down Expand Up @@ -250,7 +251,9 @@ def track_finished_pipelines():
if next_node.parent in running_pipelines:
running_pipelines[next_node.parent][1] += 1

command_queue = multiprocessing_context.Queue()
command_queue = multiprocessing_context.Queue(
# allow max 100 waiting objects per worker to limit memory usage
maxsize=(len(sub_pipeline.nodes) * 100))

for _, node in sub_pipeline.nodes.items():
if isinstance(node, pipelines.Worker):
Expand Down Expand Up @@ -532,22 +535,40 @@ def __init__(self, task: pipelines.ParallelTask, command_queue: multiprocessing.
self._status_queue = multiprocessing_context.Queue()
self.start_time = datetime.datetime.now(tz.utc)

def _command_queue_put(self, commands, max_retries: int = None):
"""Puts a command to the queue. When it is full, try again"""
retry_count: int = 0
while True:
try:
self.command_queue.put(commands)
break
except queue.Full:
retry_count += 1
if max_retries and retry_count > max_retries:
break

# if the queue limit is reached, wait for one second and try again
time.sleep(1)

def run(self):
# redirect stdout and stderr to queue
logger.redirect_output(self.event_queue, self.task.path())

succeeded = True
try:
for commands in self.task.feed_workers():
self.command_queue.put(commands)

for _ in range(self.task.max_number_of_parallel_tasks):
# per worker send a "DONE" message to inform that all commands are send
self.command_queue.put("DONE")
self._command_queue_put(commands)
except Exception as e:
logger.log(message=traceback.format_exc(), format=logger.Format.VERBATIM, is_error=True)
succeeded = False
finally:
# per worker send a "DONE" message to inform that all commands are send
for _ in range(self.task.max_number_of_parallel_tasks):
# In case all workers crashed this might end into an endless loop.
# Therefore we use a max_retries here which will try over 3 seconds
# to put the "DONE" into the queue.
self._command_queue_put("DONE", max_retries=3)

self.command_queue.close()

self._status_queue.put(succeeded)
Expand Down

0 comments on commit f453a0e

Please sign in to comment.