diff --git a/eqcorrscan/core/match_filter/helpers/processes.py b/eqcorrscan/core/match_filter/helpers/processes.py index 2813f26f3..7a85945c0 100644 --- a/eqcorrscan/core/match_filter/helpers/processes.py +++ b/eqcorrscan/core/match_filter/helpers/processes.py @@ -108,14 +108,41 @@ def _wait_on_output_to_be_available( poison_queue: Queue, output_queue: Queue, raise_exception: bool = False, - item=None + item=None, + wait_warning: float = 60, ) -> Union[Poison, None]: """ + Wait until the output queue is not full to put something in it. - :param poison_queue: + :param poison_queue: Queue to put or containing poison :param output_queue: - :param item: - :return: + Output Queue to check whether we can put something in it + :param item: Thing to put in the queue when we can + :param raise_exception: + Whether to raise an exception on poison (True), or pass back (False) + + :return: Poison if poisoned, or None if all okay + + .. rubric:: Example + + >>> from multiprocessing import Queue, Process + >>> poison_queue, output_queue = Queue(), Queue(maxsize=1) + >>> output_queue.put("Stopper") + >>> process = Process( + ... target=_wait_on_output_to_be_available, + ... kwargs={"poison_queue": poison_queue, + ... "output_queue": output_queue, + ... "raise_exception": True, + ... "item": "Carry on", + ... "wait_warning": 5}) + >>> process.start() + >>> time.sleep(7) + >>> poison_queue.put( + ... Poison(Exception("cyanide"))) + >>> time.sleep(7) + >>> process.is_alive() + False + >>> process.join() """ killed = _check_for_poison(poison_queue) # Wait until output queue is empty to limit rate and memory use @@ -126,7 +153,7 @@ def _wait_on_output_to_be_available( if killed: break waited = default_timer() - tic - if waited > 60: + if waited > wait_warning: Logger.debug("Waiting for output_queue to not be full") tic = default_timer() if not killed and item: