Skip to content

Commit

Permalink
Merge pull request #44 from pshv/master
Browse files Browse the repository at this point in the history
Added timeout to dequeue to properly raise queue.Empty instead of hanging
  • Loading branch information
arozumenko authored Apr 22, 2019
2 parents e660ed7 + b3191f5 commit 8d22445
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions reportportal_client/service_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@
class QueueListener(object):
_sentinel_item = None

def __init__(self, queue, *handlers):
def __init__(self, queue, *handlers, **kwargs):
self.queue = queue
self.queue_get_timeout = kwargs.get("queue_get_timeout", None)
self.handlers = handlers
self._stop_nowait = threading.Event()
self._stop = threading.Event()
self._thread = None

def dequeue(self, block=True):
"""Dequeue a record and return item."""
return self.queue.get(block)
return self.queue.get(block, self.queue_get_timeout)

def start(self):
"""Start the listener.
Expand Down Expand Up @@ -133,7 +134,7 @@ class ReportPortalServiceAsync(object):
def __init__(self, endpoint, project, token, api_base="api/v1",
error_handler=None, log_batch_size=20,
is_skipped_an_issue=True,
verify_ssl=True):
verify_ssl=True, queue_get_timeout=5):
"""Init the service class.
Args:
Expand All @@ -160,7 +161,8 @@ def __init__(self, endpoint, project, token, api_base="api/v1",
"start_test_item", "finish_test_item", "log"]

self.queue = queue.Queue()
self.listener = QueueListener(self.queue, self.process_item)
self.listener = QueueListener(self.queue, self.process_item,
queue_get_timeout=queue_get_timeout)
self.listener.start()
self.lock = threading.Lock()

Expand Down

0 comments on commit 8d22445

Please sign in to comment.