Skip to content

Commit

Permalink
Support max_queue=None like the legacy implementation.
Browse files Browse the repository at this point in the history
Fix #1540.
  • Loading branch information
aaugustin committed Nov 11, 2024
1 parent 3034834 commit 7adad97
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 40 deletions.
7 changes: 7 additions & 0 deletions docs/project/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ notice.

.. _14.0:

Improvements
............

* Supported ``max_queue=None`` in the :mod:`asyncio` and :mod:`threading`
implementations for consistency with the legacy implementation, even though
this is never a good idea.

Bug fixes
.........

Expand Down
7 changes: 4 additions & 3 deletions src/websockets/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(
ping_interval: float | None = 20,
ping_timeout: float | None = 20,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
write_limit: int | tuple[int, int | None] = 2**15,
) -> None:
self.protocol: ClientProtocol
Expand Down Expand Up @@ -222,7 +222,8 @@ class connect:
max_queue: High-water mark of the buffer where frames are received.
It defaults to 16 frames. The low-water mark defaults to ``max_queue
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
and low-water marks.
and low-water marks. If you want to disable flow control entirely,
you may set it to ``None``, although that's a bad idea.
write_limit: High-water mark of write buffer in bytes. It is passed to
:meth:`~asyncio.WriteTransport.set_write_buffer_limits`. It defaults
to 32 KiB. You may pass a ``(high, low)`` tuple to set the
Expand Down Expand Up @@ -283,7 +284,7 @@ def __init__(
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
write_limit: int | tuple[int, int | None] = 2**15,
# Logging
logger: LoggerLike | None = None,
Expand Down
4 changes: 2 additions & 2 deletions src/websockets/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ def __init__(
ping_interval: float | None = 20,
ping_timeout: float | None = 20,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
write_limit: int | tuple[int, int | None] = 2**15,
) -> None:
self.protocol = protocol
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.close_timeout = close_timeout
if isinstance(max_queue, int):
if isinstance(max_queue, int) or max_queue is None:
max_queue = (max_queue, None)
self.max_queue = max_queue
if isinstance(write_limit, int):
Expand Down
23 changes: 17 additions & 6 deletions src/websockets/asyncio/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class Assembler:
# coverage reports incorrectly: "line NN didn't jump to the function exit"
def __init__( # pragma: no cover
self,
high: int = 16,
high: int | None = None,
low: int | None = None,
pause: Callable[[], Any] = lambda: None,
resume: Callable[[], Any] = lambda: None,
Expand All @@ -96,12 +96,15 @@ def __init__( # pragma: no cover
# call to Protocol.data_received() could produce thousands of frames,
# which must be buffered. Instead, we pause reading when the buffer goes
# above the high limit and we resume when it goes under the low limit.
if low is None:
if high is not None and low is None:
low = high // 4
if low < 0:
raise ValueError("low must be positive or equal to zero")
if high < low:
raise ValueError("high must be greater than or equal to low")
if high is None and low is not None:
high = low * 4
if high is not None and low is not None:
if low < 0:
raise ValueError("low must be positive or equal to zero")
if high < low:
raise ValueError("high must be greater than or equal to low")
self.high, self.low = high, low
self.pause = pause
self.resume = resume
Expand Down Expand Up @@ -256,13 +259,21 @@ def put(self, frame: Frame) -> None:

def maybe_pause(self) -> None:
"""Pause the writer if queue is above the high water mark."""
# Skip if flow control is disabled
if self.high is None:
return

# Check for "> high" to support high = 0
if len(self.frames) > self.high and not self.paused:
self.paused = True
self.pause()

def maybe_resume(self) -> None:
"""Resume the writer if queue is below the low water mark."""
# Skip if flow control is disabled
if self.low is None:
return

# Check for "<= low" to support low = 0
if len(self.frames) <= self.low and self.paused:
self.paused = False
Expand Down
7 changes: 4 additions & 3 deletions src/websockets/asyncio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(
ping_interval: float | None = 20,
ping_timeout: float | None = 20,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
write_limit: int | tuple[int, int | None] = 2**15,
) -> None:
self.protocol: ServerProtocol
Expand Down Expand Up @@ -643,7 +643,8 @@ def handler(websocket):
max_queue: High-water mark of the buffer where frames are received.
It defaults to 16 frames. The low-water mark defaults to ``max_queue
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
and low-water marks.
and low-water marks. If you want to disable flow control entirely,
you may set it to ``None``, although that's a bad idea.
write_limit: High-water mark of write buffer in bytes. It is passed to
:meth:`~asyncio.WriteTransport.set_write_buffer_limits`. It defaults
to 32 KiB. You may pass a ``(high, low)`` tuple to set the
Expand Down Expand Up @@ -713,7 +714,7 @@ def __init__(
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
write_limit: int | tuple[int, int | None] = 2**15,
# Logging
logger: LoggerLike | None = None,
Expand Down
7 changes: 4 additions & 3 deletions src/websockets/sync/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(
protocol: ClientProtocol,
*,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
) -> None:
self.protocol: ClientProtocol
self.response_rcvd = threading.Event()
Expand Down Expand Up @@ -139,7 +139,7 @@ def connect(
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
# Logging
logger: LoggerLike | None = None,
# Escape hatch for advanced customization
Expand Down Expand Up @@ -191,7 +191,8 @@ def connect(
max_queue: High-water mark of the buffer where frames are received.
It defaults to 16 frames. The low-water mark defaults to ``max_queue
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
and low-water marks.
and low-water marks. If you want to disable flow control entirely,
you may set it to ``None``, although that's a bad idea.
logger: Logger for this client.
It defaults to ``logging.getLogger("websockets.client")``.
See the :doc:`logging guide <../../topics/logging>` for details.
Expand Down
4 changes: 2 additions & 2 deletions src/websockets/sync/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ def __init__(
protocol: Protocol,
*,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
) -> None:
self.socket = socket
self.protocol = protocol
self.close_timeout = close_timeout
if isinstance(max_queue, int):
if isinstance(max_queue, int) or max_queue is None:
max_queue = (max_queue, None)
self.max_queue = max_queue

Expand Down
25 changes: 19 additions & 6 deletions src/websockets/sync/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Assembler:

def __init__(
self,
high: int = 16,
high: int | None = None,
low: int | None = None,
pause: Callable[[], Any] = lambda: None,
resume: Callable[[], Any] = lambda: None,
Expand All @@ -49,12 +49,15 @@ def __init__(
# call to Protocol.data_received() could produce thousands of frames,
# which must be buffered. Instead, we pause reading when the buffer goes
# above the high limit and we resume when it goes under the low limit.
if low is None:
if high is not None and low is None:
low = high // 4
if low < 0:
raise ValueError("low must be positive or equal to zero")
if high < low:
raise ValueError("high must be greater than or equal to low")
if high is None and low is not None:
high = low * 4
if high is not None and low is not None:
if low < 0:
raise ValueError("low must be positive or equal to zero")
if high < low:
raise ValueError("high must be greater than or equal to low")
self.high, self.low = high, low
self.pause = pause
self.resume = resume
Expand Down Expand Up @@ -260,15 +263,25 @@ def put(self, frame: Frame) -> None:

def maybe_pause(self) -> None:
"""Pause the writer if queue is above the high water mark."""
# Skip if flow control is disabled
if self.high is None:
return

assert self.mutex.locked()

# Check for "> high" to support high = 0
if self.frames.qsize() > self.high and not self.paused:
self.paused = True
self.pause()

def maybe_resume(self) -> None:
"""Resume the writer if queue is below the low water mark."""
# Skip if flow control is disabled
if self.low is None:
return

assert self.mutex.locked()

# Check for "<= low" to support low = 0
if self.frames.qsize() <= self.low and self.paused:
self.paused = False
Expand Down
7 changes: 4 additions & 3 deletions src/websockets/sync/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(
protocol: ServerProtocol,
*,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
) -> None:
self.protocol: ServerProtocol
self.request_rcvd = threading.Event()
Expand Down Expand Up @@ -356,7 +356,7 @@ def serve(
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
# Logging
logger: LoggerLike | None = None,
# Escape hatch for advanced customization
Expand Down Expand Up @@ -438,7 +438,8 @@ def handler(websocket):
max_queue: High-water mark of the buffer where frames are received.
It defaults to 16 frames. The low-water mark defaults to ``max_queue
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
and low-water marks.
and low-water marks. If you want to disable flow control entirely,
you may set it to ``None``, although that's a bad idea.
logger: Logger for this server.
It defaults to ``logging.getLogger("websockets.server")``. See the
:doc:`logging guide <../../topics/logging>` for details.
Expand Down
12 changes: 10 additions & 2 deletions tests/asyncio/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,14 +1066,22 @@ async def test_close_timeout(self):
self.assertEqual(connection.close_timeout, 42 * MS)

async def test_max_queue(self):
"""max_queue parameter configures high-water mark of frames buffer."""
"""max_queue configures high-water mark of frames buffer."""
connection = Connection(Protocol(self.LOCAL), max_queue=4)
transport = Mock()
connection.connection_made(transport)
self.assertEqual(connection.recv_messages.high, 4)

async def test_max_queue_none(self):
"""max_queue disables high-water mark of frames buffer."""
connection = Connection(Protocol(self.LOCAL), max_queue=None)
transport = Mock()
connection.connection_made(transport)
self.assertEqual(connection.recv_messages.high, None)
self.assertEqual(connection.recv_messages.low, None)

async def test_max_queue_tuple(self):
"""max_queue parameter configures high-water mark of frames buffer."""
"""max_queue configures high-water and low-water marks of frames buffer."""
connection = Connection(
Protocol(self.LOCAL),
max_queue=(4, 2),
Expand Down
Loading

0 comments on commit 7adad97

Please sign in to comment.