diff --git a/services/klatchat_observer/controller.py b/services/klatchat_observer/controller.py index 5d335929..1e5117fa 100644 --- a/services/klatchat_observer/controller.py +++ b/services/klatchat_observer/controller.py @@ -100,7 +100,6 @@ def __init__( self.sio_url = config["SIO_URL"] self._sio: socketio.Client = socketio.Client() - self.sio_connected = False self.sio_connecting = False self.sio_queued_messages = Queue(maxsize=256) self.register_sio_handlers() @@ -343,26 +342,37 @@ def register_sio_handlers(self): def connect_sio(self): """ - Method for establishing connection with Socket IO server + Method for establishing connection with Socket.IO server, ensuring persistent connection. """ - # This flag is needed to lock parallel attempts + # Avoid parallel connection attempts + if self.sio_connecting or self._sio.connected: + return + self.sio_connecting = True - self._sio.connect( - url=self.sio_url, - namespaces=["/"], - retry=True, - headers={ - "session": self._klat_session_token, - "nano_session": self._klat_nano_token, - }, - ) - time.sleep(1) - LOG.info("Socket IO connected") - self.sio_connecting = False - self.sio_connected = True - while not self.sio_queued_messages.empty(): - self._sio_emit(**self.sio_queued_messages.get()) - time.sleep(0.1) + try: + self._sio.connect( + url=self.sio_url, + namespaces=["/"], + headers={ + "session": self._klat_session_token, + "nano_session": self._klat_nano_token, + }, + ) + LOG.info("Socket.IO connected") + except SocketIOError as err: + LOG.error(f"Failed to connect to Socket.IO: {err}") + finally: + self.sio_connecting = False + + # Retry on failed connection + if not self._sio.connected: + LOG.info("Retrying Socket.IO connection in 5 seconds") + Timer(5, self.connect_sio).start() + else: + # Flush queued messages + while not self.sio_queued_messages.empty(): + self._sio_emit(**self.sio_queued_messages.get()) + time.sleep(0.1) @property def sio(self): @@ -372,15 +382,9 @@ def sio(self): :return: connected async socket io instance """ - if not (self.sio_connected or self.sio_connecting): - try: - # Assuming that Socket IO is connected unless Exception is raised during the connection - # This is done to prevent parallel invocation of this method from consumers - self.connect_sio() - except Exception as ex: - LOG.error(f"Failed to connect to sio: {ex}") - self.sio_connecting = False - self.sio_connected = False + if not self._sio.connected: + LOG.info("Socket.IO disconnected, reconnecting...") + self.connect_sio() return self._sio def _handle_auth_expired(self, data: dict): @@ -870,7 +874,6 @@ def _login_to_klat_server(self): if response.ok: self._klat_session_token = response.json()["token"] self._sio.disconnect() - self.sio_connected = False else: LOG.error( f"Klat API authorization error: [{response.status_code}] {response.text}" @@ -902,13 +905,20 @@ def request_revoke_submind_ban_from_conversation(self, data: dict): ) def _sio_emit(self, event: str, data: dict): - if self.sio_connected: + """ + Emit events to the Socket.IO server, ensuring reliability. + """ + if self._sio.connected: try: - self.sio.emit(event=event, data=data) + self._sio.emit(event=event, data=data) except SocketIOError as err: - LOG.error( - f"Socket IO event={event} emit failed due to error: {err}, reconnecting" - ) - self.sio_connected = False - if not self.sio_connected: - self.sio_queued_messages.put(item={"event": event, "data": data}) + LOG.error(f"Failed to emit event {event} due to: {err}") + self._sio.disconnect() + time.sleep(0.5) + self.sio_queued_messages.put({"event": event, "data": data}) + self.connect_sio() + else: + LOG.debug(f"Queueing event {event} due to disconnected Socket.IO") + self.sio_queued_messages.put({"event": event, "data": data}) + if not self.sio_connecting: + self.connect_sio()