From 145488d469aeab48bf0a1219549f82574de28228 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 13 Jan 2025 17:57:31 -0800 Subject: [PATCH] Refactor `blocking_consumer` to include more logging/exceptions around shutdown errors Update `MQConnector` to allow more time for threads to join and logging around errors --- neon_mq_connector/connector.py | 10 +++++++--- neon_mq_connector/consumers/blocking_consumer.py | 5 ++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index 2865285..42780f6 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -63,7 +63,7 @@ class MQConnector(ABC): __run_retries__ = 5 __max_consumer_restarts__ = -1 - __consumer_join_timeout__ = 1 + __consumer_join_timeout__ = 3 async_consumers_enabled = False @@ -560,9 +560,13 @@ def stop_consumers(self, names: Optional[tuple] = None): names = list(self.consumers) for name in names: try: - if isinstance(self.consumers.get(name), SUPPORTED_THREADED_CONSUMERS) and self.consumers[name].is_alive(): + if isinstance(self.consumers.get(name), + SUPPORTED_THREADED_CONSUMERS) and \ + self.consumers[name].is_alive(): self.consumers[name].join(timeout=self.__consumer_join_timeout__) - time.sleep(self.__consumer_join_timeout__) + if self.consumers[name].is_alive(): + LOG.error(f"Failed to join consumer thread: {name} " + f"after {self.__consumer_join_timeout__}s") self.consumer_properties[name]['started'] = False except Exception as e: raise ChildProcessError(e) diff --git a/neon_mq_connector/consumers/blocking_consumer.py b/neon_mq_connector/consumers/blocking_consumer.py index a2aaa37..0dc3cc3 100644 --- a/neon_mq_connector/consumers/blocking_consumer.py +++ b/neon_mq_connector/consumers/blocking_consumer.py @@ -147,15 +147,18 @@ def join(self, timeout: Optional[float] = None) -> None: """Terminating consumer channel""" if self._is_consumer_alive: self._close_connection() - super(BlockingConsumerThread, self).join(timeout=timeout) + threading.Thread.join(self, timeout=timeout) def _close_connection(self): self._is_consumer_alive = False try: if self.connection and self.connection.is_open: self.connection.close() + if self.connection.is_open: + raise RuntimeError(f"Connection still open: {self.connection}") except pika.exceptions.StreamLostError: pass except Exception as e: LOG.exception(f"Failed to close connection due to unexpected exception: {e}") self._consumer_started.clear() + LOG.info("Consumer connection closed")