Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor channel interfaces #1115

Merged
merged 6 commits into from
Jul 2, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ Using channels to pass values between tasks
different tasks. They're particularly useful for implementing
producer/consumer patterns.

The channel API is defined by the abstract base classes
The core channel API is defined by the abstract base classes
:class:`trio.abc.SendChannel` and :class:`trio.abc.ReceiveChannel`.
You can use these to implement your own custom channels, that do
things like pass objects between processes or over the network. But in
Expand All @@ -1228,14 +1228,23 @@ inside a single process, and for that you can use
what you use when you're looking for a queue. The main difference
is that Trio splits the classic queue interface up into two
objects. The advantage of this is that it makes it possible to put
the two ends in different processes, and that we can close the two
sides separately.
the two ends in different processes without rewriting your code,
and that we can close the two sides separately.

`MemorySendChannel` and `MemoryReceiveChannel` also expose several
more features beyond the core channel interface:

.. autoclass:: MemorySendChannel
:members:

.. autoclass:: MemoryReceiveChannel
:members:


A simple channel example
++++++++++++++++++++++++

Here's a simple example of how to use channels:
Here's a simple example of how to use memory channels:

.. literalinclude:: reference-core/channels-simple.py

Expand Down Expand Up @@ -1347,14 +1356,13 @@ program above:
.. literalinclude:: reference-core/channels-mpmc-fixed.py
:emphasize-lines: 7, 9, 10, 12, 13

This example demonstrates using the :meth:`SendChannel.clone
<trio.abc.SendChannel.clone>` and :meth:`ReceiveChannel.clone
<trio.abc.ReceiveChannel.clone>` methods. What these do is create
copies of our endpoints, that act just like the original – except that
they can be closed independently. And the underlying channel is only
closed after *all* the clones have been closed. So this completely
solves our problem with shutdown, and if you run this program, you'll
see it print its six lines of output and then exits cleanly.
This example demonstrates using the `MemorySendChannel.clone` and
`MemoryReceiveChannel.clone` methods. What these do is create copies
of our endpoints, that act just like the original – except that they
can be closed independently. And the underlying channel is only closed
after *all* the clones have been closed. So this completely solves our
problem with shutdown, and if you run this program, you'll see it
print its six lines of output and then exits cleanly.

Notice a small trick we use: the code in ``main`` creates clone
objects to pass into all the child tasks, and then closes the original
Expand Down
17 changes: 13 additions & 4 deletions docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,19 @@ Abstract base classes
- :class:`~trio.SocketListener`, :class:`~trio.SSLListener`
* - :class:`SendChannel`
- :class:`AsyncResource`
- :meth:`~SendChannel.send`, :meth:`~SendChannel.send_nowait`
- :meth:`~SendChannel.send`
-
- :func:`~trio.open_memory_channel`
- `~trio.MemorySendChannel`
* - :class:`ReceiveChannel`
- :class:`AsyncResource`
- :meth:`~ReceiveChannel.receive`, :meth:`~ReceiveChannel.receive_nowait`
- :meth:`~ReceiveChannel.receive`
- ``__aiter__``, ``__anext__``
- :func:`~trio.open_memory_channel`
- `~trio.MemoryReceiveChannel`
* - `Channel`
- `SendChannel`, `ReceiveChannel`
-
-
-

.. autoclass:: trio.abc.AsyncResource
:members:
Expand Down Expand Up @@ -165,6 +170,10 @@ Abstract base classes
:members:
:show-inheritance:

.. autoclass:: trio.abc.Channel
:members:
:show-inheritance:

.. currentmodule:: trio


Expand Down
6 changes: 6 additions & 0 deletions newsfragments/719.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
We cleaned up the distinction between the "abstract channel interface"
and the "memory channel" concrete implementation.
`trio.abc.SendChannel` and `trio.abc.ReceiveChannel` have been slimmed
down, `trio.MemorySendChannel` and `trio.MemoryReceiveChannel` are now
public types that can be used in type hints, and there's a new
`trio.abc.Channel` interface for future bidirectional channels.
4 changes: 3 additions & 1 deletion trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@

from ._highlevel_generic import aclose_forcefully, StapledStream

from ._channel import open_memory_channel
from ._channel import (
open_memory_channel, MemorySendChannel, MemoryReceiveChannel
)

from ._signals import open_signal_receiver

Expand Down
138 changes: 28 additions & 110 deletions trio/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,15 +484,18 @@ async def send_eof(self):
"""


# A regular invariant generic type
T = TypeVar("T")

# The type of object produced by a ReceiveChannel (covariant because
# ReceiveChannel[Derived] can be passed to someone expecting
# ReceiveChannel[Base])
T_co = TypeVar("T_co", covariant=True)
ReceiveType = TypeVar("ReceiveType", covariant=True)

# The type of object accepted by a SendChannel (contravariant because
# SendChannel[Base] can be passed to someone expecting
# SendChannel[Derived])
T_contra = TypeVar("T_contra", contravariant=True)
SendType = TypeVar("SendType", contravariant=True)

# The type of object produced by a Listener (covariant plus must be
# an AsyncResource)
Expand Down Expand Up @@ -537,39 +540,21 @@ async def accept(self):
"""


class SendChannel(AsyncResource, Generic[T_contra]):
class SendChannel(AsyncResource, Generic[SendType]):
"""A standard interface for sending Python objects to some receiver.

:class:`SendChannel` objects also implement the :class:`AsyncResource`
interface, so they can be closed by calling :meth:`~AsyncResource.aclose`
or using an ``async with`` block.
`SendChannel` objects also implement the `AsyncResource` interface, so
they can be closed by calling `~AsyncResource.aclose` or using an ``async
with`` block.

If you want to send raw bytes rather than Python objects, see
:class:`ReceiveStream`.
`ReceiveStream`.

"""
__slots__ = ()

@abstractmethod
def send_nowait(self, value):
"""Attempt to send an object through the channel, without blocking.

Args:
value (object): The object to send.

Raises:
trio.WouldBlock: if the operation cannot be completed immediately
(for example, because the channel's internal buffer is full).
trio.BrokenResourceError: if something has gone wrong, and the
channel is broken. For example, you may get this if the receiver
has already been closed.
trio.ClosedResourceError: if you previously closed this
:class:`SendChannel` object.

"""

@abstractmethod
async def send(self, value):
async def send(self, value: SendType) -> None:
"""Attempt to send an object through the channel, blocking if necessary.

Args:
Expand All @@ -585,33 +570,8 @@ async def send(self, value):

"""

@abstractmethod
def clone(self):
"""Clone this send channel object.

This returns a new :class:`SendChannel` object, which acts as a
duplicate of the original: sending on the new object does exactly the
same thing as sending on the old object.

However, closing one of the objects does not close the other, and
receivers don't get :exc:`~trio.EndOfChannel` until *all* clones have
been closed.

This is useful for communication patterns that involve multiple
producers all sending objects to the same destination. If you give
each producer its own clone of the :class:`SendChannel`, and then make
sure to close each :class:`SendChannel` when it's finished, receivers
will automatically get notified when all producers are finished. See
:ref:`channel-mpmc` for examples.

Raises:
trio.ClosedResourceError: if you already closed this
:class:`SendChannel` object.

"""


class ReceiveChannel(AsyncResource, Generic[T_co]):
class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
"""A standard interface for receiving Python objects from some sender.

You can iterate over a :class:`ReceiveChannel` using an ``async for``
Expand All @@ -621,45 +581,22 @@ class ReceiveChannel(AsyncResource, Generic[T_co]):
...

This is equivalent to calling :meth:`receive` repeatedly. The loop exits
without error when :meth:`receive` raises :exc:`~trio.EndOfChannel`.
without error when `receive` raises `~trio.EndOfChannel`.

:class:`ReceiveChannel` objects also implement the :class:`AsyncResource`
interface, so they can be closed by calling :meth:`~AsyncResource.aclose`
or using an ``async with`` block.
`ReceiveChannel` objects also implement the `AsyncResource` interface, so
they can be closed by calling `~AsyncResource.aclose` or using an ``async
with`` block.

If you want to receive raw bytes rather than Python objects, see
:class:`ReceiveStream`.
`ReceiveStream`.

"""
__slots__ = ()

@abstractmethod
def receive_nowait(self):
"""Attempt to receive an incoming object, without blocking.

Returns:
object: Whatever object was received.

Raises:
trio.WouldBlock: if the operation cannot be completed immediately
(for example, because no object has been sent yet).
trio.EndOfChannel: if the sender has been closed cleanly, and no
more objects are coming. This is not an error condition.
trio.ClosedResourceError: if you previously closed this
:class:`ReceiveChannel` object.
trio.BrokenResourceError: if something has gone wrong, and the
channel is broken.

"""

@abstractmethod
async def receive(self):
async def receive(self) -> ReceiveType:
"""Attempt to receive an incoming object, blocking if necessary.

It's legal for multiple tasks to call :meth:`receive` at the same
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it would be useful to explicitly document what is and is not guaranteed about tasks calling send() or receive() simultaneously for all Channels. Also maybe something about cancellation safety. If you want to stop guaranteeing that multiple simultaneous receive() works OK, maybe move this text to MemoryReceiveChannel.receive() instead of removing it entirely?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah. I'm thinking ahead to "framing channels" that just wrap line-breaking or whatever around a stream, so they'll have the same single-user rule as streams. But that's a good point.

time. If this happens, then one task receives the first value sent,
another task receives the next value sent, and so on.

Returns:
object: Whatever object was received.

Expand All @@ -673,40 +610,21 @@ async def receive(self):

"""

@abstractmethod
def clone(self):
"""Clone this receive channel object.

This returns a new :class:`ReceiveChannel` object, which acts as a
duplicate of the original: receiving on the new object does exactly
the same thing as receiving on the old object.

However, closing one of the objects does not close the other, and the
underlying channel is not closed until all clones are closed.

This is useful for communication patterns involving multiple consumers
all receiving objects from the same underlying channel. See
:ref:`channel-mpmc` for examples.

.. warning:: The clones all share the same underlying channel.
Whenever a clone :meth:`receive`\\s a value, it is removed from the
channel and the other clones do *not* receive that value. If you
want to send multiple copies of the same stream of values to
multiple destinations, like :func:`itertools.tee`, then you need to
find some other solution; this method does *not* do that.

Raises:
trio.ClosedResourceError: if you already closed this
:class:`SendChannel` object.

"""

@aiter_compat
def __aiter__(self):
return self

async def __anext__(self):
async def __anext__(self) -> ReceiveType:
try:
return await self.receive()
except trio.EndOfChannel:
raise StopAsyncIteration


class Channel(SendChannel[T], ReceiveChannel[T]):
"""A standard interface for interacting with bidirectional channels.

A `Channel` is an object that implements both the `SendChannel` and
`ReceiveChannel` interfaces, so you can both send and receive objects.

"""
Loading