Skip to content

Commit

Permalink
client reconnect on server conn close, more logs (#58)
Browse files Browse the repository at this point in the history
Why + What changed
===

_Describe what prompted you to make this change, link relevant
resources: Linear issues, Slack discussions, etc._

- if the server closed the conn, we never tried to reconnect as a client
- we are also seeing cases where pid2 doesnt receive a heartbeat in time
so it closes the connection so I added some more logs for o11y around
session lifetimes

Test plan
=========

_Describe what you did to test this change to a level of detail that
allows your reviewer to test it_
  • Loading branch information
jackyzha0 authored Jul 31, 2024
1 parent 0085399 commit 980ec5b
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
7 changes: 7 additions & 0 deletions replit_river/client_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ async def _establish_new_connection(
rate_limit = self._rate_limiter
max_retry = self._transport_options.connection_retry_options.max_retry
client_id = self._client_id
logger.info("Attempting to establish new ws connection")
for i in range(max_retry):
if i > 0:
logger.info(f"Retrying build handshake number {i} times")
Expand Down Expand Up @@ -143,6 +144,7 @@ async def _establish_new_connection(
async def _create_new_session(
self,
) -> ClientSession:
logger.info("Creating new session")
new_ws, hs_request, hs_response = await self._establish_new_connection()
if not hs_response.status.ok:
message = hs_response.status.reason
Expand Down Expand Up @@ -186,9 +188,14 @@ async def _get_or_create_session(self) -> ClientSession:
existing_session
)
if hs_response.status.sessionId == existing_session.session_id:
logger.info(
"Replacing ws connection in session id %s",
existing_session.session_id,
)
await existing_session.replace_with_new_websocket(new_ws)
return existing_session
else:
logger.info("Closing stale session %s", existing_session.session_id)
await existing_session.close()
return await self._create_new_session()

Expand Down
11 changes: 8 additions & 3 deletions replit_river/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ async def serve(self) -> None:
try:
await self._handle_messages_from_ws(tg)
except ConnectionClosed:
if self._retry_connection_callback:
self._task_manager.create_task(
self._retry_connection_callback()
)

await self._begin_close_session_countdown()
logger.debug("ConnectionClosed while serving", exc_info=True)
except FailedSendingMessageException:
Expand Down Expand Up @@ -251,7 +256,7 @@ async def _check_to_close_session(self) -> None:
if not self._close_session_after_time_secs:
continue
if current_time > self._close_session_after_time_secs:
logger.debug(
logger.info(
"Grace period ended for %s, closing session", self._transport_id
)
await self.close()
Expand All @@ -274,7 +279,7 @@ async def _heartbeat(
return
try:
await self.send_message(
str(nanoid.generate()),
"heartbeat",
# TODO: make this a message class
# https://github.com/replit/river/blob/741b1ea6d7600937ad53564e9cf8cd27a92ec36a/transport/message.ts#L42
{
Expand All @@ -290,7 +295,7 @@ async def _heartbeat(
if self._close_session_after_time_secs is not None:
# already in grace period, no need to set again
continue
logger.debug(
logger.info(
"%r closing websocket because of heartbeat misses",
self.session_id,
)
Expand Down
6 changes: 3 additions & 3 deletions replit_river/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def get_or_create_session(
session_to_close: Optional[Session] = None
new_session: Optional[Session] = None
if to_id not in self._sessions:
logger.debug(
logger.info(
'Creating new session with "%s" using ws: %s', to_id, websocket.id
)
new_session = Session(
Expand All @@ -89,7 +89,7 @@ async def get_or_create_session(
else:
old_session = self._sessions[to_id]
if old_session.session_id != session_id:
logger.debug(
logger.info(
'Create new session with "%s" for session id %s'
" and close old session %s",
to_id,
Expand Down Expand Up @@ -122,7 +122,7 @@ async def get_or_create_session(
raise e

if session_to_close:
logger.debug("Closing stale session %s", session_to_close.session_id)
logger.info("Closing stale session %s", session_to_close.session_id)
await session_to_close.close()
self._set_session(new_session)
return new_session

0 comments on commit 980ec5b

Please sign in to comment.