From ec3bd2ab06278602c1d6018b476699e090036373 Mon Sep 17 00:00:00 2001 From: Aymeric Augustin Date: Fri, 3 Nov 2023 08:22:33 +0100 Subject: [PATCH] Make sync reassembler more readable. No logic changes. --- src/websockets/sync/messages.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/websockets/sync/messages.py b/src/websockets/sync/messages.py index 67a22313..d98ff855 100644 --- a/src/websockets/sync/messages.py +++ b/src/websockets/sync/messages.py @@ -47,13 +47,13 @@ def __init__(self) -> None: # queue for transferring frames from the writing thread (library code) # to the reading thread (user code). We're buffering when chunks_queue # is None and streaming when it's a SimpleQueue. None is a sentinel - # value marking the end of the stream, superseding message_complete. + # value marking the end of the message, superseding message_complete. # Stream data from frames belonging to the same message. # Remove quotes around type when dropping Python < 3.9. self.chunks_queue: Optional["queue.SimpleQueue[Optional[Data]]"] = None - # This flag marks the end of the stream. + # This flag marks the end of the connection. self.closed = False def get(self, timeout: Optional[float] = None) -> Data: @@ -108,12 +108,12 @@ def get(self, timeout: Optional[float] = None) -> Data: # mypy cannot figure out that chunks have the proper type. message: Data = joiner.join(self.chunks) # type: ignore - assert not self.message_fetched.is_set() - self.message_fetched.set() - self.chunks = [] assert self.chunks_queue is None + assert not self.message_fetched.is_set() + self.message_fetched.set() + return message def get_iter(self) -> Iterator[Data]: @@ -169,26 +169,26 @@ def get_iter(self) -> Iterator[Data]: with self.mutex: self.get_in_progress = False - assert self.message_complete.is_set() - self.message_complete.clear() - # get_iter() was unblocked by close() rather than put(). if self.closed: raise EOFError("stream of frames ended") - assert not self.message_fetched.is_set() - self.message_fetched.set() + assert self.message_complete.is_set() + self.message_complete.clear() assert self.chunks == [] self.chunks_queue = None + assert not self.message_fetched.is_set() + self.message_fetched.set() + def put(self, frame: Frame) -> None: """ Add ``frame`` to the next message. When ``frame`` is the final frame in a message, :meth:`put` waits until - the message is fetched, either by calling :meth:`get` or by fully - consuming the return value of :meth:`get_iter`. + the message is fetched, which can be achieved by calling :meth:`get` or + by fully consuming the return value of :meth:`get_iter`. :meth:`put` assumes that the stream of frames respects the protocol. If it doesn't, the behavior is undefined. @@ -247,13 +247,13 @@ def put(self, frame: Frame) -> None: with self.mutex: self.put_in_progress = False - assert self.message_fetched.is_set() - self.message_fetched.clear() - # put() was unblocked by close() rather than get() or get_iter(). if self.closed: raise EOFError("stream of frames ended") + assert self.message_fetched.is_set() + self.message_fetched.clear() + self.decoder = None def close(self) -> None: