Skip to content

Commit

Permalink
make out-of-order non-fatal (#56)
Browse files Browse the repository at this point in the history
Why
===

_Describe what prompted you to make this change, link relevant
resources: Linear issues, Slack discussions, etc._

- same as replit/river#244

What changed
============

- no longer raise protocol error on out of order and instead close the
connection
- the reasoning is that the client will attempt to re-establish the
connection and re-agree on the right seq number and resend the send
buffer
- if theres a disagreement at this stage, we will kill the session but
otherwise we can actually transparently recover!

Test plan
=========

_Describe what you did to test this change to a level of detail that
allows your reviewer to test it_
  • Loading branch information
jackyzha0 authored Jul 26, 2024
1 parent ea7e1fe commit 2154c9b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
18 changes: 14 additions & 4 deletions replit_river/seq_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ class InvalidMessageException(Exception):
pass


class OutOfOrderMessageException(Exception):
"""Error when a message is received out of order, we close the connection
and wait for the client to resychronize. If the resychronization fails,
we close the session.
"""

pass


class SessionStateMismatchException(Exception):
"""Error when the session state mismatch, we reject handshake and
close the connection"""
Expand Down Expand Up @@ -68,13 +77,14 @@ async def check_seq_and_update(self, msg: TransportMessage) -> None:
f" expected {self.ack}"
)
else:
logger.error(
logger.warn(
f"Out of order message received got {msg.seq} expected "
f"{self.ack}"
)
raise InvalidMessageException(
f"{msg.from_} received out of order, got {msg.seq}"
f" expected {self.ack}"

raise OutOfOrderMessageException(
f"Out of order message received got {msg.seq} expected "
f"{self.ack}"
)
self.receiver_ack = msg.ack
await self._set_ack(msg.seq + 1)
Expand Down
5 changes: 5 additions & 0 deletions replit_river/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from replit_river.seq_manager import (
IgnoreMessageException,
InvalidMessageException,
OutOfOrderMessageException,
SeqManager,
)
from replit_river.task_manager import BackgroundTaskManager
Expand Down Expand Up @@ -200,6 +201,10 @@ async def _handle_messages_from_ws(
except IgnoreMessageException as e:
logger.debug("Ignoring transport message : %r", e)
continue
except OutOfOrderMessageException as e:
logger.error(f"Out of order message, closing connection : {e}")
await ws_wrapper.close()
return
except InvalidMessageException as e:
logger.error(
f"Got invalid transport message, closing session : {e}"
Expand Down
4 changes: 2 additions & 2 deletions tests/test_seq_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from replit_river.seq_manager import (
IgnoreMessageException,
InvalidMessageException,
OutOfOrderMessageException,
SeqManager,
)
from tests.conftest import transport_message
Expand Down Expand Up @@ -47,7 +47,7 @@ async def test_message_reception(no_logging_error: NoErrors) -> None:

# Test out of order message
msg.seq = 2
with pytest.raises(InvalidMessageException):
with pytest.raises(OutOfOrderMessageException):
await manager.check_seq_and_update(msg)


Expand Down

0 comments on commit 2154c9b

Please sign in to comment.