diff --git a/reportportal_client/service_async.py b/reportportal_client/service_async.py index 1bba6cb7..9f67846e 100644 --- a/reportportal_client/service_async.py +++ b/reportportal_client/service_async.py @@ -28,8 +28,9 @@ class QueueListener(object): _sentinel_item = None - def __init__(self, queue, *handlers): + def __init__(self, queue, *handlers, **kwargs): self.queue = queue + self.queue_get_timeout = kwargs.get("queue_get_timeout", None) self.handlers = handlers self._stop_nowait = threading.Event() self._stop = threading.Event() @@ -37,7 +38,7 @@ def __init__(self, queue, *handlers): def dequeue(self, block=True): """Dequeue a record and return item.""" - return self.queue.get(block) + return self.queue.get(block, self.queue_get_timeout) def start(self): """Start the listener. @@ -133,7 +134,7 @@ class ReportPortalServiceAsync(object): def __init__(self, endpoint, project, token, api_base="api/v1", error_handler=None, log_batch_size=20, is_skipped_an_issue=True, - verify_ssl=True): + verify_ssl=True, queue_get_timeout=5): """Init the service class. Args: @@ -160,7 +161,8 @@ def __init__(self, endpoint, project, token, api_base="api/v1", "start_test_item", "finish_test_item", "log"] self.queue = queue.Queue() - self.listener = QueueListener(self.queue, self.process_item) + self.listener = QueueListener(self.queue, self.process_item, + queue_get_timeout=queue_get_timeout) self.listener.start() self.lock = threading.Lock()