Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Please explain if/how one can avoid putting too much data into a pipeline #13

Open
johann-petrak opened this issue Oct 30, 2018 · 4 comments

Comments

@johann-petrak
Copy link

My understanding is that whenever pipe.put(something) is executed it is put into a queue or similar.
If we only want to process the somethings by workers, without caring at all about any return value, we can use disable_result but it seems there is no limit to how much data can get put into the pipeline. If a large number of large data is put into the pipeline, will this cause problems? Is it possible the have only a certain maximum number of items waiting for processing before put(something) blocks?

@vmlaker
Copy link
Owner

vmlaker commented Nov 1, 2018

You're right: when you put data on the pipeline, it's either put on a multiprocessing.Queue in case of an UnorderedStage, or sent to a multiprocessing.Pipe if using an OrderedStage. Both of those inter-process communications (IPCs) use sockets. Socket buffers are usually limited by system memory.

Each task occupies socket buffer space until it is taken off by the worker process. When you use up all the buffer space, the next put should block. And you will likely start having other out-of-memory problems, like program crashing. There is an example in the documentation that hopefully illustrates this.

Unfortunately there's no safety built into MPipe, so it's up to the user to monitor and manage how much they're feeding the pipeline.

@johann-petrak
Copy link
Author

Thank you for this answer! As far as I understand multiprocessing.Queue has a maxsize parameter, so would it not possible to use this to easily limit the maximum data that can be put into the Queue before it blocks? But unfortunately the API does not seem to allow to pass this on to when the Queue instance is created...

If the above is not possible for some reason, how exactly would it possible to actually monitor on manage how much gets fed into the pipeline? For this it would be necessary to keep track of how much has put in already versus how much has been processed. Are these numbers available easily somewhere?

@vmlaker
Copy link
Owner

vmlaker commented Nov 1, 2018

The maxsize parameter is not implemented. This is a good idea; I'm noting this a requested enhancement.

Also, there is no facility to retrieve total count/size of tasks submitted and processed. I'm noting this as a requested feature for a future release. For now the user would have to do some kind of bookkeeping. Counting submitted tasks is obvious since it's under user control. As for counting how many tasks have been processed, something like this might work:

from threading import Thread

class Counter:
    def __init__(self, pipe):
        self._pipe = pipe
        self._count = 0
        self._thread = Thread(target=self._get_results)
        self._thread.start()
        
    def get_count(self):
        return self._count

    def _get_results(self):
        while True:
            self._pipe.get()
            self._count += 1

I haven't fully tested this, but the basic concept is to have a background thread continuously retrieving results, and bumping the count. But make sure your stages have disable_result=False.

@johann-petrak
Copy link
Author

Thank you so much, also for accepting this as a feature request!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants