Skip to content

Commit

Permalink
Fixed issue with handling message error with socket io
Browse files Browse the repository at this point in the history
  • Loading branch information
kirgrim committed Dec 15, 2024
1 parent 7d26c88 commit a1a44c2
Showing 1 changed file with 47 additions and 37 deletions.
84 changes: 47 additions & 37 deletions services/klatchat_observer/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def __init__(
self.sio_url = config["SIO_URL"]

self._sio: socketio.Client = socketio.Client()
self.sio_connected = False
self.sio_connecting = False
self.sio_queued_messages = Queue(maxsize=256)
self.register_sio_handlers()
Expand Down Expand Up @@ -343,26 +342,37 @@ def register_sio_handlers(self):

def connect_sio(self):
"""
Method for establishing connection with Socket IO server
Method for establishing connection with Socket.IO server, ensuring persistent connection.
"""
# This flag is needed to lock parallel attempts
# Avoid parallel connection attempts
if self.sio_connecting or self._sio.connected:
return

self.sio_connecting = True
self._sio.connect(
url=self.sio_url,
namespaces=["/"],
retry=True,
headers={
"session": self._klat_session_token,
"nano_session": self._klat_nano_token,
},
)
time.sleep(1)
LOG.info("Socket IO connected")
self.sio_connecting = False
self.sio_connected = True
while not self.sio_queued_messages.empty():
self._sio_emit(**self.sio_queued_messages.get())
time.sleep(0.1)
try:
self._sio.connect(
url=self.sio_url,
namespaces=["/"],
headers={
"session": self._klat_session_token,
"nano_session": self._klat_nano_token,
},
)
LOG.info("Socket.IO connected")
except SocketIOError as err:
LOG.error(f"Failed to connect to Socket.IO: {err}")
finally:
self.sio_connecting = False

# Retry on failed connection
if not self._sio.connected:
LOG.info("Retrying Socket.IO connection in 5 seconds")
Timer(5, self.connect_sio).start()
else:
# Flush queued messages
while not self.sio_queued_messages.empty():
self._sio_emit(**self.sio_queued_messages.get())
time.sleep(0.1)

@property
def sio(self):
Expand All @@ -372,15 +382,9 @@ def sio(self):
:return: connected async socket io instance
"""
if not (self.sio_connected or self.sio_connecting):
try:
# Assuming that Socket IO is connected unless Exception is raised during the connection
# This is done to prevent parallel invocation of this method from consumers
self.connect_sio()
except Exception as ex:
LOG.error(f"Failed to connect to sio: {ex}")
self.sio_connecting = False
self.sio_connected = False
if not self._sio.connected:
LOG.info("Socket.IO disconnected, reconnecting...")
self.connect_sio()
return self._sio

def _handle_auth_expired(self, data: dict):
Expand Down Expand Up @@ -870,7 +874,6 @@ def _login_to_klat_server(self):
if response.ok:
self._klat_session_token = response.json()["token"]
self._sio.disconnect()
self.sio_connected = False
else:
LOG.error(
f"Klat API authorization error: [{response.status_code}] {response.text}"
Expand Down Expand Up @@ -902,13 +905,20 @@ def request_revoke_submind_ban_from_conversation(self, data: dict):
)

def _sio_emit(self, event: str, data: dict):
if self.sio_connected:
"""
Emit events to the Socket.IO server, ensuring reliability.
"""
if self._sio.connected:
try:
self.sio.emit(event=event, data=data)
self._sio.emit(event=event, data=data)
except SocketIOError as err:
LOG.error(
f"Socket IO event={event} emit failed due to error: {err}, reconnecting"
)
self.sio_connected = False
if not self.sio_connected:
self.sio_queued_messages.put(item={"event": event, "data": data})
LOG.error(f"Failed to emit event {event} due to: {err}")
self._sio.disconnect()
time.sleep(0.5)
self.sio_queued_messages.put({"event": event, "data": data})
self.connect_sio()
else:
LOG.debug(f"Queueing event {event} due to disconnected Socket.IO")
self.sio_queued_messages.put({"event": event, "data": data})
if not self.sio_connecting:
self.connect_sio()

0 comments on commit a1a44c2

Please sign in to comment.