Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type annotations for windows_pipes and its test #2812

Merged
merged 13 commits into from
Oct 17, 2023
Merged
4 changes: 0 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ disallow_untyped_calls = false
# files not yet fully typed
[[tool.mypy.overrides]]
module = [
# internal
"trio/_windows_pipes",

# tests
"trio/testing/_fake_net",
"trio/_core/_tests/test_guest_mode",
Expand Down Expand Up @@ -116,7 +113,6 @@ module = [
"trio/_tests/test_tracing",
"trio/_tests/test_util",
"trio/_tests/test_wait_for_object",
"trio/_tests/test_windows_pipes",
"trio/_tests/tools/test_gen_exports",
]
check_untyped_defs = false
Expand Down
44 changes: 27 additions & 17 deletions trio/_tests/test_windows_pipes.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,47 @@
from __future__ import annotations

import sys
from typing import Any, Tuple
from typing import TYPE_CHECKING, Any

import pytest

from .. import _core
from ..testing import check_one_way_stream, wait_all_tasks_blocked

# Mark all the tests in this file as being windows-only
pytestmark = pytest.mark.skipif(sys.platform != "win32", reason="windows only")

assert (
sys.platform == "win32" or not TYPE_CHECKING
) # Skip type checking when not on Windows
CoolCat467 marked this conversation as resolved.
Show resolved Hide resolved

if sys.platform == "win32":
from asyncio.windows_utils import pipe

from .._core._windows_cffi import _handle, kernel32
from .._windows_pipes import PipeReceiveStream, PipeSendStream
else:
pytestmark = pytest.mark.skip(reason="windows only")
pipe: Any = None
PipeSendStream: Any = None
PipeReceiveStream: Any = None
pipe = Any
_handle = Any
kernel32 = Any
PipeReceiveStream = Any
PipeSendStream = Any
CoolCat467 marked this conversation as resolved.
Show resolved Hide resolved


async def make_pipe() -> Tuple[PipeSendStream, PipeReceiveStream]:
async def make_pipe() -> tuple[PipeSendStream, PipeReceiveStream]:
"""Makes a new pair of pipes."""
(r, w) = pipe()
return PipeSendStream(w), PipeReceiveStream(r)


async def test_pipe_typecheck():
async def test_pipe_typecheck() -> None:
with pytest.raises(TypeError):
PipeSendStream(1.0)
PipeSendStream(1.0) # type: ignore[arg-type]
with pytest.raises(TypeError):
PipeReceiveStream(None)
PipeReceiveStream(None) # type: ignore[arg-type]


async def test_pipe_error_on_close():
async def test_pipe_error_on_close() -> None:
# Make sure we correctly handle a failure from kernel32.CloseHandle
r, w = pipe()

Expand All @@ -47,18 +57,18 @@ async def test_pipe_error_on_close():
await receive_stream.aclose()


async def test_pipes_combined():
async def test_pipes_combined() -> None:
write, read = await make_pipe()
count = 2**20
replicas = 3

async def sender():
async def sender() -> None:
async with write:
big = bytearray(count)
for _ in range(replicas):
await write.send_all(big)

async def reader():
async def reader() -> None:
async with read:
await wait_all_tasks_blocked()
total_received = 0
Expand All @@ -76,7 +86,7 @@ async def reader():
n.start_soon(reader)


async def test_async_with():
async def test_async_with() -> None:
w, r = await make_pipe()
async with w, r:
pass
Expand All @@ -87,11 +97,11 @@ async def test_async_with():
await r.receive_some(10)


async def test_close_during_write():
async def test_close_during_write() -> None:
w, r = await make_pipe()
async with _core.open_nursery() as nursery:

async def write_forever():
async def write_forever() -> None:
with pytest.raises(_core.ClosedResourceError) as excinfo:
while True:
await w.send_all(b"x" * 4096)
Expand All @@ -102,7 +112,7 @@ async def write_forever():
await w.aclose()


async def test_pipe_fully():
async def test_pipe_fully() -> None:
# passing make_clogged_pipe tests wait_send_all_might_not_block, and we
# can't implement that on Windows
await check_one_way_stream(make_pipe, None)
20 changes: 11 additions & 9 deletions trio/_windows_pipes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import sys
from typing import TYPE_CHECKING

Expand All @@ -23,18 +25,18 @@ def __init__(self, handle: int) -> None:
_core.register_with_iocp(self.handle)

@property
def closed(self):
def closed(self) -> bool:
return self.handle == -1

def close(self):
def close(self) -> None:
if self.closed:
return
handle = self.handle
self.handle = -1
if not kernel32.CloseHandle(_handle(handle)):
raise_winerror()

def __del__(self):
def __del__(self) -> None:
self.close()


Expand All @@ -50,7 +52,7 @@ def __init__(self, handle: int) -> None:
"another task is currently using this pipe"
)

async def send_all(self, data: bytes):
async def send_all(self, data: bytes) -> None:
with self._conflict_detector:
if self._handle_holder.closed:
raise _core.ClosedResourceError("this pipe is already closed")
Expand All @@ -76,10 +78,10 @@ async def wait_send_all_might_not_block(self) -> None:
# not implemented yet, and probably not needed
await _core.checkpoint()

def close(self):
def close(self) -> None:
self._handle_holder.close()

async def aclose(self):
async def aclose(self) -> None:
self.close()
await _core.checkpoint()

Expand All @@ -94,7 +96,7 @@ def __init__(self, handle: int) -> None:
"another task is currently using this pipe"
)

async def receive_some(self, max_bytes=None) -> bytes:
async def receive_some(self, max_bytes: int | None = None) -> bytes:
with self._conflict_detector:
if self._handle_holder.closed:
raise _core.ClosedResourceError("this pipe is already closed")
Expand Down Expand Up @@ -133,9 +135,9 @@ async def receive_some(self, max_bytes=None) -> bytes:
del buffer[size:]
return buffer

def close(self):
def close(self) -> None:
self._handle_holder.close()

async def aclose(self):
async def aclose(self) -> None:
self.close()
await _core.checkpoint()