Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support max_queue=None like the legacy implementation. #1542

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/project/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ notice.

.. _14.0:

Improvements
............

* Supported ``max_queue=None`` in the :mod:`asyncio` and :mod:`threading`
implementations for consistency with the legacy implementation, even though
this is never a good idea.

Bug fixes
.........

Expand Down
7 changes: 4 additions & 3 deletions src/websockets/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(
ping_interval: float | None = 20,
ping_timeout: float | None = 20,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
write_limit: int | tuple[int, int | None] = 2**15,
) -> None:
self.protocol: ClientProtocol
Expand Down Expand Up @@ -222,7 +222,8 @@ class connect:
max_queue: High-water mark of the buffer where frames are received.
It defaults to 16 frames. The low-water mark defaults to ``max_queue
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
and low-water marks.
and low-water marks. If you want to disable flow control entirely,
you may set it to ``None``, although that's a bad idea.
write_limit: High-water mark of write buffer in bytes. It is passed to
:meth:`~asyncio.WriteTransport.set_write_buffer_limits`. It defaults
to 32 KiB. You may pass a ``(high, low)`` tuple to set the
Expand Down Expand Up @@ -283,7 +284,7 @@ def __init__(
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
write_limit: int | tuple[int, int | None] = 2**15,
# Logging
logger: LoggerLike | None = None,
Expand Down
4 changes: 2 additions & 2 deletions src/websockets/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ def __init__(
ping_interval: float | None = 20,
ping_timeout: float | None = 20,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
write_limit: int | tuple[int, int | None] = 2**15,
) -> None:
self.protocol = protocol
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.close_timeout = close_timeout
if isinstance(max_queue, int):
if isinstance(max_queue, int) or max_queue is None:
max_queue = (max_queue, None)
self.max_queue = max_queue
if isinstance(write_limit, int):
Expand Down
23 changes: 17 additions & 6 deletions src/websockets/asyncio/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class Assembler:
# coverage reports incorrectly: "line NN didn't jump to the function exit"
def __init__( # pragma: no cover
self,
high: int = 16,
high: int | None = None,
low: int | None = None,
pause: Callable[[], Any] = lambda: None,
resume: Callable[[], Any] = lambda: None,
Expand All @@ -96,12 +96,15 @@ def __init__( # pragma: no cover
# call to Protocol.data_received() could produce thousands of frames,
# which must be buffered. Instead, we pause reading when the buffer goes
# above the high limit and we resume when it goes under the low limit.
if low is None:
if high is not None and low is None:
low = high // 4
if low < 0:
raise ValueError("low must be positive or equal to zero")
if high < low:
raise ValueError("high must be greater than or equal to low")
if high is None and low is not None:
high = low * 4
if high is not None and low is not None:
if low < 0:
raise ValueError("low must be positive or equal to zero")
if high < low:
raise ValueError("high must be greater than or equal to low")
self.high, self.low = high, low
self.pause = pause
self.resume = resume
Expand Down Expand Up @@ -256,13 +259,21 @@ def put(self, frame: Frame) -> None:

def maybe_pause(self) -> None:
"""Pause the writer if queue is above the high water mark."""
# Skip if flow control is disabled
if self.high is None:
return

# Check for "> high" to support high = 0
if len(self.frames) > self.high and not self.paused:
self.paused = True
self.pause()

def maybe_resume(self) -> None:
"""Resume the writer if queue is below the low water mark."""
# Skip if flow control is disabled
if self.low is None:
return

# Check for "<= low" to support low = 0
if len(self.frames) <= self.low and self.paused:
self.paused = False
Expand Down
7 changes: 4 additions & 3 deletions src/websockets/asyncio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(
ping_interval: float | None = 20,
ping_timeout: float | None = 20,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
write_limit: int | tuple[int, int | None] = 2**15,
) -> None:
self.protocol: ServerProtocol
Expand Down Expand Up @@ -643,7 +643,8 @@ def handler(websocket):
max_queue: High-water mark of the buffer where frames are received.
It defaults to 16 frames. The low-water mark defaults to ``max_queue
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
and low-water marks.
and low-water marks. If you want to disable flow control entirely,
you may set it to ``None``, although that's a bad idea.
write_limit: High-water mark of write buffer in bytes. It is passed to
:meth:`~asyncio.WriteTransport.set_write_buffer_limits`. It defaults
to 32 KiB. You may pass a ``(high, low)`` tuple to set the
Expand Down Expand Up @@ -713,7 +714,7 @@ def __init__(
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
write_limit: int | tuple[int, int | None] = 2**15,
# Logging
logger: LoggerLike | None = None,
Expand Down
7 changes: 4 additions & 3 deletions src/websockets/sync/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(
protocol: ClientProtocol,
*,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
) -> None:
self.protocol: ClientProtocol
self.response_rcvd = threading.Event()
Expand Down Expand Up @@ -139,7 +139,7 @@ def connect(
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
# Logging
logger: LoggerLike | None = None,
# Escape hatch for advanced customization
Expand Down Expand Up @@ -191,7 +191,8 @@ def connect(
max_queue: High-water mark of the buffer where frames are received.
It defaults to 16 frames. The low-water mark defaults to ``max_queue
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
and low-water marks.
and low-water marks. If you want to disable flow control entirely,
you may set it to ``None``, although that's a bad idea.
logger: Logger for this client.
It defaults to ``logging.getLogger("websockets.client")``.
See the :doc:`logging guide <../../topics/logging>` for details.
Expand Down
4 changes: 2 additions & 2 deletions src/websockets/sync/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ def __init__(
protocol: Protocol,
*,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
) -> None:
self.socket = socket
self.protocol = protocol
self.close_timeout = close_timeout
if isinstance(max_queue, int):
if isinstance(max_queue, int) or max_queue is None:
max_queue = (max_queue, None)
self.max_queue = max_queue

Expand Down
25 changes: 19 additions & 6 deletions src/websockets/sync/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Assembler:

def __init__(
self,
high: int = 16,
high: int | None = None,
low: int | None = None,
pause: Callable[[], Any] = lambda: None,
resume: Callable[[], Any] = lambda: None,
Expand All @@ -49,12 +49,15 @@ def __init__(
# call to Protocol.data_received() could produce thousands of frames,
# which must be buffered. Instead, we pause reading when the buffer goes
# above the high limit and we resume when it goes under the low limit.
if low is None:
if high is not None and low is None:
low = high // 4
if low < 0:
raise ValueError("low must be positive or equal to zero")
if high < low:
raise ValueError("high must be greater than or equal to low")
if high is None and low is not None:
high = low * 4
if high is not None and low is not None:
if low < 0:
raise ValueError("low must be positive or equal to zero")
if high < low:
raise ValueError("high must be greater than or equal to low")
self.high, self.low = high, low
self.pause = pause
self.resume = resume
Expand Down Expand Up @@ -260,15 +263,25 @@ def put(self, frame: Frame) -> None:

def maybe_pause(self) -> None:
"""Pause the writer if queue is above the high water mark."""
# Skip if flow control is disabled
if self.high is None:
return

assert self.mutex.locked()

# Check for "> high" to support high = 0
if self.frames.qsize() > self.high and not self.paused:
self.paused = True
self.pause()

def maybe_resume(self) -> None:
"""Resume the writer if queue is below the low water mark."""
# Skip if flow control is disabled
if self.low is None:
return

assert self.mutex.locked()

# Check for "<= low" to support low = 0
if self.frames.qsize() <= self.low and self.paused:
self.paused = False
Expand Down
7 changes: 4 additions & 3 deletions src/websockets/sync/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(
protocol: ServerProtocol,
*,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
) -> None:
self.protocol: ServerProtocol
self.request_rcvd = threading.Event()
Expand Down Expand Up @@ -356,7 +356,7 @@ def serve(
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
max_queue: int | tuple[int, int | None] = 16,
max_queue: int | None | tuple[int | None, int | None] = 16,
# Logging
logger: LoggerLike | None = None,
# Escape hatch for advanced customization
Expand Down Expand Up @@ -438,7 +438,8 @@ def handler(websocket):
max_queue: High-water mark of the buffer where frames are received.
It defaults to 16 frames. The low-water mark defaults to ``max_queue
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
and low-water marks.
and low-water marks. If you want to disable flow control entirely,
you may set it to ``None``, although that's a bad idea.
logger: Logger for this server.
It defaults to ``logging.getLogger("websockets.server")``. See the
:doc:`logging guide <../../topics/logging>` for details.
Expand Down
12 changes: 10 additions & 2 deletions tests/asyncio/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,14 +1066,22 @@ async def test_close_timeout(self):
self.assertEqual(connection.close_timeout, 42 * MS)

async def test_max_queue(self):
"""max_queue parameter configures high-water mark of frames buffer."""
"""max_queue configures high-water mark of frames buffer."""
connection = Connection(Protocol(self.LOCAL), max_queue=4)
transport = Mock()
connection.connection_made(transport)
self.assertEqual(connection.recv_messages.high, 4)

async def test_max_queue_none(self):
"""max_queue disables high-water mark of frames buffer."""
connection = Connection(Protocol(self.LOCAL), max_queue=None)
transport = Mock()
connection.connection_made(transport)
self.assertEqual(connection.recv_messages.high, None)
self.assertEqual(connection.recv_messages.low, None)

async def test_max_queue_tuple(self):
"""max_queue parameter configures high-water mark of frames buffer."""
"""max_queue configures high-water and low-water marks of frames buffer."""
connection = Connection(
Protocol(self.LOCAL),
max_queue=(4, 2),
Expand Down
Loading