Skip to content

Commit

Permalink
Update doc for IO safety API changes
Browse files Browse the repository at this point in the history
Adds an unsafe helper for wrapping an `AsRawFd` implementer in `AsFd`.
Which is needed with things like zmq until they update to use IO safety
traits: erickt/rust-zmq#361.
  • Loading branch information
ids1024 committed Dec 31, 2022
1 parent 523f530 commit 2440b00
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 61 deletions.
3 changes: 2 additions & 1 deletion doc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ edition = "2018"
publish = false

[dependencies]
io-lifetimes = "1.0.3"
calloop = { path = "..", features = ["futures-io", "executor"] }
anyhow = "1.0.56"
futures = "0.3.21"
zmq = { version = "0.9.2", features = ["vendored"] }
zmq = { version = "0.10.0" }

# Here we create bin targets so each chapter's code may be tested.

Expand Down
16 changes: 6 additions & 10 deletions doc/src/ch02-01-generic.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,21 @@ The `Generic` event source wraps a file descriptor ("fd") and fires its callback

For example on Linux, fd-based interfaces are available for GPIO, I2C, USB, UART, network interfaces, timers and many other systems. Integrating these into calloop starts with obtaining the appropriate fd, creating a `Generic` event source from it, and building up a more useful, abstracted event source around that. A detailed example of this is [given for ZeroMQ](ch04-00-a-full-example-zeromq.md).

You do not have to use a low-level fd either: any type that implements [`AsRawFd`](https://doc.rust-lang.org/beta/std/os/unix/io/trait.AsRawFd.html) can be provided. This means that you can use a wrapper type that handles allocation and disposal itself, and implement `AsRawFd` on it so that `Generic` can manage it in the event loop.
You do not have to use a low-level fd either: any type that implements [`AsFd`](https://doc.rust-lang.org/stable/std/os/fd/trait.AsFd.html) can be provided. This means that you can use a wrapper type that handles allocation and disposal itself, and implement `AsRawFd` on it so that `Generic` can manage it in the event loop.

## Creating a Generic source

Creating a `Generic` event source requires three things:
- The fd itself, of course, possibly in a wrapper type that implements `AsRawFd`
- An `OwnedFd` or a wrapper type that implements `AsFd`
- The ["interest"](api/calloop/struct.Interest.html) you have in this descriptor — this means, whether you want to generate events when the fd becomes readable, writeable, or both
- The ["mode"](api/calloop/enum.Mode.html) of event generation - level triggered or edge triggered

The easiest constructor to use is the [`new()`](api/calloop/generic/struct.Generic.html#method.new) method, but if you need control over the [associated error type](ch02-06-errors.md) there is also [`new_with_error()`](api/calloop/generic/struct.Generic.html#method.new_with_error).

## Ownership and AsRawFd wrappers
## Ownership and AsFd wrappers

It's important to remember that file descriptors by themselves have no concept of ownership attached to them in Rust — they are simply bare integers. Dropping them does not close the resource they refer to, and copying them does not carry information about how many there are.
Rust 1.63 introduced a concept of file descriptor ownership and borrowing through the `OwnedFd` and `BorrowedFd` types, which are also available on older Rust versions through the `io-lifetimes` crate. The `AsFd` trait provides a way to get a `BorrowedFd` corresponding to a file, socket, etc. while guaranteeing the fd will be valid for the lifetime of the `BorrowedFd`.

Typically (eg. in the standard library) they would be an underlying implementation detail of another type that *did* encode ownership somehow. This how you can manage them in any of your own integration code - use drop handlers, reference counting (if necessary) and general RAII principles in a wrapper type, and then implement `AsRawFd` to allow `Generic` to use it. The `Generic` source will take ownership of it, so it will be dropped when the `Generic` is.
Not all third party crates use `AsFd` yet, and may instead provide types implementing `AsRawFd`. ['AsFdWrapper'](api/calloop/generic/struct.AsFdWrapper.html) provides a way to adapt these types. To use this safely, ensure the `AsRawFd` implementation of the type it wraps returns a valid fd as long as the type exists. And to avoid an fd leak, it should ultimately be `close`d properly.

This means you need to do at least two things:
- follow the rules of the API you obtain the fd from
- wrap them in a type that manages ownership appropriately

For example, on Unix-like systems the [`UdpSocket`](https://doc.rust-lang.org/beta/std/net/struct.UdpSocket.html) contains a fd, and upon being dropped, `libc::close(fd)` is called on it. If you create a `Generic<UdpSocket>` then, it will be closed upon dropping it.
Safe types like `OwnedFd` and `BorrowedFd` should be preferred over `RawFd`s, and the use of `RawFd`s outside of implementing FFI shouldn't be necessary as libraries move to using the IO safe types and traits.
4 changes: 2 additions & 2 deletions doc/src/ch03-02-async-io-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

> This section is about adapting blocking IO types for use with `async` Rust code, and powering that `async` code with Calloop. If you just want to add blocking IO types to your event loop and use Calloop's callback/composition-based design, you only need to wrap your blocking IO type in a [generic event source](api/calloop/generic/struct.Generic.html).
You may find that you need to write ordinary Rust `async` code around blocking IO types. Calloop provides the ability to wrap blocking types — anything that implements the [`AsRawFd`](https://doc.rust-lang.org/stable/std/os/unix/io/trait.AsRawFd.html) trait — in its own async type. This can be polled in any executor you may have chosen for your async code, but if you're using Calloop you'll probably be using [Calloop's executor](api/calloop/futures/fn.executor.html).
You may find that you need to write ordinary Rust `async` code around blocking IO types. Calloop provides the ability to wrap blocking types — anything that implements the [`AsFd`](https://doc.rust-lang.org/stable/std/os/unix/io/trait.AsFd.html) trait — in its own async type. This can be polled in any executor you may have chosen for your async code, but if you're using Calloop you'll probably be using [Calloop's executor](api/calloop/futures/fn.executor.html).

> ## Enable the `futures-io` feature!
>
Expand Down Expand Up @@ -65,4 +65,4 @@ Starting event loop. Use Ctrl-C to exit.
Async block ended with: Sent data...
Async block ended with: Hello, world
^C
```
```
7 changes: 2 additions & 5 deletions doc/src/ch04-02-creating-our-source-part-1-our-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ So at a minimum, our type needs to contain these:
pub struct ZeroMQSource
{
// Calloop components.
socket_source: calloop::generic::Generic<std::os::unix::io::RawFd>,
socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
mpsc_receiver: calloop::channel::Channel<?>,
wake_ping_receiver: calloop::ping::PingSource,
}
Expand All @@ -26,15 +26,12 @@ What else do we need? If the `PingSource` is there to wake up the loop manually,
pub struct ZeroMQSource
{
// Calloop components.
socket_source: calloop::generic::Generic<std::os::unix::io::RawFd>,
socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
mpsc_receiver: calloop::channel::Channel<?>,
wake_ping_receiver: calloop::ping::PingSource,
/// Sending end of the ping source.
wake_ping_sender: calloop::ping::Ping,
/// The underlying ZeroMQ socket.
socket: zmq::Socket,
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ self.mpsc_receiver
.process_events(readiness, token, |evt, _| {
if let calloop::channel::Event::Msg(msg) = evt {
self.socket
.file
.send_multipart(msg, 0)
.context("Failed to send message")?;
}
Expand All @@ -123,16 +124,13 @@ where
T::Item: Into<zmq::Message>,
{
// Calloop components.
socket_source: calloop::generic::Generic<std::os::unix::io::RawFd>,
socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,
mpsc_receiver: calloop::channel::Channel<T>,
wake_ping_receiver: calloop::ping::PingSource,
/// Sending end of the ping source.
wake_ping_sender: calloop::ping::Ping,
/// The underlying ZeroMQ socket.
socket: zmq::Socket,
/// FIFO queue for the messages to be published.
outbox: std::collections::VecDeque<T>,
}
Expand All @@ -155,15 +153,18 @@ And our "zsocket is writeable" code becomes:

```rust,noplayground
self.socket
.file
.process_events(readiness, token, |_, _| {
let events = self
.socket
.file
.get_events()
.context("Failed to read ZeroMQ events")?;
if events.contains(zmq::POLLOUT) {
if let Some(parts) = self.outbox.pop_front() {
self.socket
.file
.send_multipart(parts, 0)
.context("Failed to send message")?;
}
Expand All @@ -172,6 +173,7 @@ self.socket
if events.contains(zmq::POLLIN) {
let messages =
self.socket
.file
.recv_multipart(0)
.context("Failed to receive message")?;
callback(messages, &mut ())
Expand Down
36 changes: 18 additions & 18 deletions doc/src/zmqsource.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! A Calloop event source implementation for ZeroMQ sockets.

use std::{collections, io, os::unix::io::RawFd};
use std::{collections, io};

use anyhow::Context;

Expand Down Expand Up @@ -48,7 +48,7 @@ where
{
// Calloop components.
/// Event source for ZeroMQ socket.
socket_source: calloop::generic::Generic<RawFd>,
socket: calloop::generic::Generic<calloop::generic::FdWrapper<zmq::Socket>>,

/// Event source for channel.
mpsc_receiver: calloop::channel::Channel<T>,
Expand All @@ -62,9 +62,6 @@ where
wake_ping_sender: calloop::ping::Ping,

// ZeroMQ socket.
/// The underlying ZeroMQ socket that we're proxying things to.
socket: zmq::Socket,

/// FIFO queue for the messages to be published.
outbox: collections::VecDeque<T>,
}
Expand All @@ -80,15 +77,15 @@ where
let (mpsc_sender, mpsc_receiver) = calloop::channel::channel();
let (wake_ping_sender, wake_ping_receiver) = calloop::ping::make_ping()?;

let fd = socket.get_fd()?;

let socket_source =
calloop::generic::Generic::new(fd, calloop::Interest::READ, calloop::Mode::Edge);
let socket = calloop::generic::Generic::new(
unsafe { calloop::generic::FdWrapper::new(socket) },
calloop::Interest::READ,
calloop::Mode::Edge,
);

Ok((
Self {
socket,
socket_source,
mpsc_receiver,
wake_ping_receiver,
wake_ping_sender,
Expand Down Expand Up @@ -164,6 +161,7 @@ where
// on the socket that warrants reading the events again.
let events = self
.socket
.file
.get_events()
.context("Failed to read ZeroMQ events")?;

Expand All @@ -172,6 +170,7 @@ where
if events.contains(zmq::POLLOUT) {
if let Some(parts) = self.outbox.pop_front() {
self.socket
.file
.send_multipart(parts, 0)
.context("Failed to send message")?;
used_socket = true;
Expand All @@ -183,6 +182,7 @@ where
// sending, which includes all parts of a multipart message.
let messages = self
.socket
.file
.recv_multipart(0)
.context("Failed to receive message")?;
used_socket = true;
Expand All @@ -205,7 +205,7 @@ where
poll: &mut calloop::Poll,
token_factory: &mut calloop::TokenFactory,
) -> calloop::Result<()> {
self.socket_source.register(poll, token_factory)?;
self.socket.register(poll, token_factory)?;
self.mpsc_receiver.register(poll, token_factory)?;
self.wake_ping_receiver.register(poll, token_factory)?;

Expand All @@ -219,7 +219,7 @@ where
poll: &mut calloop::Poll,
token_factory: &mut calloop::TokenFactory,
) -> calloop::Result<()> {
self.socket_source.reregister(poll, token_factory)?;
self.socket.reregister(poll, token_factory)?;
self.mpsc_receiver.reregister(poll, token_factory)?;
self.wake_ping_receiver.reregister(poll, token_factory)?;

Expand All @@ -229,7 +229,7 @@ where
}

fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
self.socket_source.unregister(poll)?;
self.socket.unregister(poll)?;
self.mpsc_receiver.unregister(poll)?;
self.wake_ping_receiver.unregister(poll)?;
Ok(())
Expand All @@ -247,14 +247,14 @@ where
//
// - https://stackoverflow.com/a/38338578/188535
// - http://api.zeromq.org/4-0:zmq-ctx-term
self.socket.set_linger(0).ok();
self.socket.set_rcvtimeo(0).ok();
self.socket.set_sndtimeo(0).ok();
self.socket.file.set_linger(0).ok();
self.socket.file.set_rcvtimeo(0).ok();
self.socket.file.set_sndtimeo(0).ok();

// Double result because (a) possible failure on call and (b) possible
// failure decoding.
if let Ok(Ok(last_endpoint)) = self.socket.get_last_endpoint() {
self.socket.disconnect(&last_endpoint).ok();
if let Ok(Ok(last_endpoint)) = self.socket.file.get_last_endpoint() {
self.socket.file.disconnect(&last_endpoint).ok();
}
}
}
Expand Down
40 changes: 38 additions & 2 deletions src/sources/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,47 @@
//! these `Generic<_>` as fields of your event source, and delegate the
//! [`EventSource`](crate::EventSource) implementation to them.

use io_lifetimes::AsFd;
use std::marker::PhantomData;
use io_lifetimes::{AsFd, BorrowedFd};
use std::{marker::PhantomData, ops, os::unix::io::AsRawFd};

use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};

/// Wrapper to use a type implementing `AsRawFd` but not `AsFd` with `Generic`
#[derive(Debug)]
pub struct FdWrapper<T: AsRawFd>(T);

impl<T: AsRawFd> FdWrapper<T> {
/// Wrap `inner` with an `AsFd` implementation.
///
/// # Safety
/// This is safe if the `AsRawFd` implementation of `inner` always returns
/// a valid fd. This should usually be true for types implementing
/// `AsRawFd`. But this isn't guaranteed with `FdWrapper<RawFd>`.
pub unsafe fn new(inner: T) -> Self {
Self(inner)
}
}

impl<T: AsRawFd> ops::Deref for FdWrapper<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<T: AsRawFd> ops::DerefMut for FdWrapper<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl<T: AsRawFd> AsFd for FdWrapper<T> {
fn as_fd(&self) -> BorrowedFd {
unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) }
}
}

/// A generic event source wrapping a FD-backed type
#[derive(Debug)]
pub struct Generic<F: AsFd, E = std::io::Error> {
Expand Down
27 changes: 8 additions & 19 deletions src/sources/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
//! they'll inherit their parent signal mask.

use std::convert::TryFrom;
use std::os::{raw::c_int, unix::io::AsRawFd};
use std::os::raw::c_int;

use io_lifetimes::{AsFd, BorrowedFd};
use nix::sys::signal::SigSet;
pub use nix::sys::signal::Signal;
pub use nix::sys::signalfd::siginfo;
use nix::sys::signalfd::{SfdFlags, SignalFd};

use super::generic::Generic;
use super::generic::{FdWrapper, Generic};
use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};

/// An event generated by the signal event source
Expand All @@ -40,20 +39,10 @@ impl Event {
}
}

// Wrap `nix` SignalFd type with `AsFd` impl
#[derive(Debug)]
struct SignalFdWrapper(SignalFd);

impl AsFd for SignalFdWrapper {
fn as_fd(&self) -> BorrowedFd<'_> {
unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) }
}
}

/// An event source for receiving Unix signals
#[derive(Debug)]
pub struct Signals {
sfd: Generic<SignalFdWrapper>,
sfd: Generic<FdWrapper<SignalFd>>,
mask: SigSet,
}

Expand All @@ -71,7 +60,7 @@ impl Signals {
let sfd = SignalFd::with_flags(&mask, SfdFlags::SFD_NONBLOCK | SfdFlags::SFD_CLOEXEC)?;

Ok(Signals {
sfd: Generic::new(SignalFdWrapper(sfd), Interest::READ, Mode::Level),
sfd: Generic::new(unsafe { FdWrapper::new(sfd) }, Interest::READ, Mode::Level),
mask,
})
}
Expand All @@ -85,7 +74,7 @@ impl Signals {
self.mask.add(s);
}
self.mask.thread_block()?;
self.sfd.file.0.set_mask(&self.mask)?;
self.sfd.file.set_mask(&self.mask)?;
Ok(())
}

Expand All @@ -100,7 +89,7 @@ impl Signals {
removed.add(s);
}
removed.thread_unblock()?;
self.sfd.file.0.set_mask(&self.mask)?;
self.sfd.file.set_mask(&self.mask)?;
Ok(())
}

Expand All @@ -116,7 +105,7 @@ impl Signals {

self.mask.thread_unblock()?;
new_mask.thread_block()?;
self.sfd.file.0.set_mask(&new_mask)?;
self.sfd.file.set_mask(&new_mask)?;
self.mask = new_mask;

Ok(())
Expand Down Expand Up @@ -150,7 +139,7 @@ impl EventSource for Signals {
self.sfd
.process_events(readiness, token, |_, sfd| {
loop {
match sfd.0.read_signal() {
match sfd.read_signal() {
Ok(Some(info)) => callback(Event { info }, &mut ()),
Ok(None) => break,
Err(e) => {
Expand Down

0 comments on commit 2440b00

Please sign in to comment.