Skip to content

Commit

Permalink
Refactor blocking_consumer to include more logging/exceptions aroun…
Browse files Browse the repository at this point in the history
…d shutdown errors

Update `MQConnector` to allow more time for threads to join and logging around errors
  • Loading branch information
NeonDaniel committed Jan 14, 2025
1 parent 3ebd805 commit 145488d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
10 changes: 7 additions & 3 deletions neon_mq_connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class MQConnector(ABC):

__run_retries__ = 5
__max_consumer_restarts__ = -1
__consumer_join_timeout__ = 1
__consumer_join_timeout__ = 3

async_consumers_enabled = False

Expand Down Expand Up @@ -560,9 +560,13 @@ def stop_consumers(self, names: Optional[tuple] = None):
names = list(self.consumers)
for name in names:
try:
if isinstance(self.consumers.get(name), SUPPORTED_THREADED_CONSUMERS) and self.consumers[name].is_alive():
if isinstance(self.consumers.get(name),
SUPPORTED_THREADED_CONSUMERS) and \
self.consumers[name].is_alive():
self.consumers[name].join(timeout=self.__consumer_join_timeout__)
time.sleep(self.__consumer_join_timeout__)
if self.consumers[name].is_alive():
LOG.error(f"Failed to join consumer thread: {name} "
f"after {self.__consumer_join_timeout__}s")
self.consumer_properties[name]['started'] = False
except Exception as e:
raise ChildProcessError(e)
Expand Down
5 changes: 4 additions & 1 deletion neon_mq_connector/consumers/blocking_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,18 @@ def join(self, timeout: Optional[float] = None) -> None:
"""Terminating consumer channel"""
if self._is_consumer_alive:
self._close_connection()
super(BlockingConsumerThread, self).join(timeout=timeout)
threading.Thread.join(self, timeout=timeout)

def _close_connection(self):
self._is_consumer_alive = False
try:
if self.connection and self.connection.is_open:
self.connection.close()
if self.connection.is_open:
raise RuntimeError(f"Connection still open: {self.connection}")
except pika.exceptions.StreamLostError:
pass
except Exception as e:
LOG.exception(f"Failed to close connection due to unexpected exception: {e}")
self._consumer_started.clear()
LOG.info("Consumer connection closed")

0 comments on commit 145488d

Please sign in to comment.