Skip to content

Commit

Permalink
RC1: add examples and improve doc
Browse files Browse the repository at this point in the history
  • Loading branch information
sbarral committed Jul 16, 2023
1 parent 1b63ae0 commit f9013c8
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 166 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name = "async-event"
# - Update CHANGELOG.md
# - Update if necessary copyright notice in LICENSE-MIT
# - Create a "vX.Y.Z" git tag
version = "0.1.0-pre1"
version = "0.1.0-rc1"
authors = ["Serge Barral <[email protected]>"]
edition = "2021"
rust-version = "1.56"
Expand All @@ -19,8 +19,11 @@ categories = ["asynchronous", "concurrency"]
keywords = ["async", "event", "atomic", "futures"]

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
futures-executor = "0.3"

[target.'cfg(async_event_loom)'.dependencies]
waker-fn = "1.1"
loom = "0.5"

[target.'cfg(async_event_loom)'.dev-dependencies]
waker-fn = "1.1"
163 changes: 76 additions & 87 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,122 +15,111 @@ predicate is satisfied by checking the predicate each time it receives a
notification.

While functionally similar to the [event_listener] crate, this implementation is
specialized for the `async` case and tries to be more efficient by limiting the
number of locking operations on the mutex-protected list of notifiers: the lock
is typically taken only once for each time a waiter is blocked and once for
notifying, thus reducing the need for synchronization operations. Finally,
spurious wake-ups are only generated in very rare circumstances.
more opinionated and limited to the `async` case. It strives to be more
efficient, however, by limiting the amount of locking operations on the
mutex-protected list of notifiers: the lock is typically taken only once for
each time a waiter is blocked and once for notifying, thus reducing the need for
synchronization operations. Finally, spurious wake-ups are only generated in
very rare circumstances.

Note that if you only need to send notifications to a single task, you may use
instead the [Diatomic Waker][diatomic-waker] crate for extra performance.

This library is an offshoot of [Asynchronix][asynchronix], an ongoing effort at
a high performance asynchronous computation framework for system simulation.
a high performance asynchronous computation framework for system simulation. It
is also used in the [Tachyonix][tachyonix] MPSC channel.

[event_listener]: https://docs.rs/event_listener/latest/event_listener/
[eventcount]: https://www.1024cores.net/home/lock-free-algorithms/eventcounts
[diatomic-waker]: https://github.com/asynchronics/diatomic-waker
[asynchronix]: https://github.com/asynchronics/asynchronix
[tachyonix]: https://github.com/asynchronics/tachyonix

## Usage

This crate needs more testing and hasn't been released to crates.io yet. Use at
your own risk by adding this to your `Cargo.toml`:
Add this to your `Cargo.toml`:

```toml
[dependencies]
async-event = { git = "https://github.com/asynchronics/async-event.git" }
```

## Example
## Differences with `event_listener`

This `Event` primitive is expected to be faster than that of the
`event_listener` crate in the general case. That being said, your mileage may
vary depending on your particular application and you should probably benchmark
both.

The API is more opinionated and designed to preventing potential misuse such as:

- *Forgetting to check again the predicate after requesting a notification, i.e.
after a call to `Event::listen()` in the `event_listener` crate*.
`async-event` provides instead the `Event::wait_until` method which takes care
of checking the predicate whenever necessary to prevent races.
- *Confusion between `notify` and `notify_additional` in the `event_listener`
crate*. Our experience and the API of other similar libraries suggest that the
latter is almost always what the user needs, so the `notify*` methods in this
crate actually behave like `notify_additional` in the `event_listener` crate.
- *Inadequate atomic synchronization of the predicate*. The `notify*` and
`wait_until` methods always insert atomic fences to ensure proper
synchronization: there is no equivalent to `notify_additional_relaxed`.


## Examples

A multi-producer, multi-consumer channel of capacity 1 for sending
`NonZeroUsize` values:
### Send a non-zero value asynchronously

```rust
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;

use futures_executor::block_on;

use async_event::Event;

// Data stored by the channel.
struct Inner {
sender_notifier: Event,
receiver_notifier: Event,
value: AtomicUsize,
}

// The sending side of the channel.
#[derive(Clone)]
struct Sender {
inner: Arc<Inner>,
}

// The receiving side of the channel.
#[derive(Clone)]
struct Receiver {
inner: Arc<Inner>,
}

// Creates an empty channel.
fn channel() -> (Sender, Receiver) {
let inner = Arc::new(Inner {
sender_notifier: Event::new(),
receiver_notifier: Event::new(),
value: AtomicUsize::new(0),
});
(
Sender {
inner: inner.clone(),
},
Receiver { inner },
)
}

impl Sender {
// Sends a value asynchronously.
async fn send(&self, value: NonZeroUsize) {
// Wait until the predicate returns `Some`, i.e. until the atomic value
// is found to be zero (empty channel) and the new value is set.
self.inner
.sender_notifier
.wait_until(|| {
self.inner
.value
.compare_exchange(0, value.get(), Ordering::Relaxed, Ordering::Relaxed)
.ok()
})
.await;

// Let one of the blocked receivers (if any) know that a value is
// available.
self.inner.receiver_notifier.notify(1);
}
}

impl Receiver {
// Receives a value asynchronously.
async fn recv(&self) -> NonZeroUsize {
// Wait until the predicate returns `Some(value)`, i.e. when the atomic
// value becomes non-zero (the channel contains an actual value).
let value = self
.inner
.receiver_notifier
.wait_until(|| NonZeroUsize::new(self.inner.value.swap(0, Ordering::Relaxed)))
.await;

// Let one of the blocked senders (if any) know that the value slot is
// empty.
self.inner.sender_notifier.notify(1);

value
let value = Arc::new(AtomicUsize::new(0));
let event = Arc::new(Event::new());

// Set a non-zero value concurrently.
thread::spawn({
let value = value.clone();
let event = event.clone();

move || {
// A relaxed store is sufficient here: `Event::notify*` methods insert
// atomic fences to warrant adequate synchronization.
value.store(42, Ordering::Relaxed);
event.notify_one();
}
}
});

// Wait until the value is set.
block_on(async move {
let v = event
.wait_until(|| {
// A relaxed load is sufficient here: `Event::wait_until` inserts
// atomic fences to warrant adequate synchronization.
let v = value.load(Ordering::Relaxed);
if v != 0 { Some(v) } else { None }
})
.await;

assert_eq!(v, 42);
});
```

### Single-slot MPMC channel for non-zero values

See [implementation](examples/mpmc_channel.rs) in the `examples` directory.

## Safety

This is a low-level primitive and as such its implementation relies on `unsafe`.
The test suite makes extensive use of [Loom] to assess its correctness. As
amazing as it is, however, Loom is only a tool: it cannot formally prove the
absence of data races.
The test suite makes extensive use of [Loom] and MIRI to assess its correctness.
As amazing as they are, however, Loom and MIRI cannot formally prove the absence
of data races so soundness issues _are_ possible.

[Loom]: https://github.com/tokio-rs/loom

Expand Down
103 changes: 103 additions & 0 deletions examples/mpmc_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//! A multi-producer, multi-consumer channel of capacity 1 for sending
//! `NonZeroUsize` values.

use async_event::Event;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

// Data stored by the channel.
struct Inner {
sender_notifier: Event,
receiver_notifier: Event,
value: AtomicUsize,
}

// The sending side of the channel.
#[derive(Clone)]
struct Sender {
inner: Arc<Inner>,
}

// The receiving side of the channel.
#[derive(Clone)]
struct Receiver {
inner: Arc<Inner>,
}

// Creates an empty channel.
fn channel() -> (Sender, Receiver) {
let inner = Arc::new(Inner {
sender_notifier: Event::new(),
receiver_notifier: Event::new(),
value: AtomicUsize::new(0),
});
(
Sender {
inner: inner.clone(),
},
Receiver { inner },
)
}

impl Sender {
// Sends a value asynchronously.
async fn send(&self, value: NonZeroUsize) {
// Wait until the predicate returns `Some`, i.e. until the atomic value
// is found to be zero (empty channel) and the new value is set.
self.inner
.sender_notifier
.wait_until(|| {
self.inner
.value
.compare_exchange(0, value.get(), Ordering::Relaxed, Ordering::Relaxed)
.ok()
})
.await;

// Let one of the blocked receivers (if any) know that a value is
// available.
self.inner.receiver_notifier.notify(1);
}
}

impl Receiver {
// Receives a value asynchronously.
async fn recv(&self) -> NonZeroUsize {
// Wait until the predicate returns `Some(value)`, i.e. when the atomic
// value becomes non-zero (the channel contains an actual value).
let value = self
.inner
.receiver_notifier
.wait_until(|| NonZeroUsize::new(self.inner.value.swap(0, Ordering::Relaxed)))
.await;

// Let one of the blocked senders (if any) know that the value slot is
// empty.
self.inner.sender_notifier.notify(1);

value
}
}

#[tokio::main]
async fn main() {
let (s1, r1) = channel();
let s2 = s1.clone();
let r2 = r1.clone();

// Receivers.
let task1 = tokio::spawn(async move { r1.recv().await });
let task2 = tokio::spawn(async move { r2.recv().await });

// Senders.
tokio::spawn(async move {
s1.send(NonZeroUsize::new(1).unwrap()).await;
});
tokio::spawn(async move {
s2.send(NonZeroUsize::new(2).unwrap()).await;
});

println!("Task 1 received value {}", task1.await.unwrap());
println!("Task 2 received value {}", task2.await.unwrap());
}
Loading

0 comments on commit f9013c8

Please sign in to comment.