From f107c8d860137b41e509b179d605db30082cb0da Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 26 Aug 2016 14:30:46 -0700 Subject: [PATCH] Rename to tokio-core, add in futures-io Renames the futures-mio crate to tokio-core, pulls in the futures-io crate under an `io` module, and gets everything compiling. --- .gitignore | 2 + .travis.yml | 24 ++ Cargo.toml | 15 +- README.md | 45 ++- src/bin/echo.rs | 7 +- src/bin/sink.rs | 11 +- src/channel.rs | 2 +- src/io/copy.rs | 82 +++++ src/io/flush.rs | 39 +++ src/io/mod.rs | 47 +++ src/io/read_exact.rs | 77 +++++ src/io/read_to_end.rs | 62 ++++ src/io/task.rs | 102 ++++++ src/io/window.rs | 116 +++++++ src/io/write_all.rs | 80 +++++ src/lib.rs | 5 +- src/lock.rs | 106 ++++++ src/slot.rs | 691 +++++++++++++++++++++++++++++++++++++++ src/tcp.rs | 2 +- src/timeout.rs | 2 +- src/udp.rs | 2 +- tests/buffered.rs | 7 +- tests/chain.rs | 7 +- tests/echo.rs | 7 +- tests/limit.rs | 7 +- tests/stream-buffered.rs | 7 +- tests/tcp.rs | 8 +- tests/timeout.rs | 4 +- tests/udp.rs | 6 +- 29 files changed, 1495 insertions(+), 77 deletions(-) create mode 100644 .gitignore create mode 100644 .travis.yml create mode 100644 src/io/copy.rs create mode 100644 src/io/flush.rs create mode 100644 src/io/mod.rs create mode 100644 src/io/read_exact.rs create mode 100644 src/io/read_to_end.rs create mode 100644 src/io/task.rs create mode 100644 src/io/window.rs create mode 100644 src/io/write_all.rs create mode 100644 src/lock.rs create mode 100644 src/slot.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000000..a9d37c560c6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +target +Cargo.lock diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000000..a89a11e3c08 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,24 @@ +language: rust + +rust: + - stable + - beta + - nightly +sudo: false +before_script: + - pip install 'travis-cargo<0.2' --user && export PATH=$HOME/.local/bin:$PATH +script: + - cargo build + - cargo test + - cargo doc --no-deps +after_success: + - travis-cargo --only nightly doc-upload +env: + global: + - secure: LVrtwDI0IJradnHRk53dGWtTS+limhkuHym17wuto/Zaz6IJB9aq7G5wSYuZU3qabcxah7pigjXPFgzYwFD6mNHW1DAuAko1qOi4AL0rvg+rA7Fa5E9NEIxoqzCf+wBtqCvomBe/akOs7UtHdjE3CZpIEPwSHVf3jf61suB0mPVUW0AFTHvYTvHT4lyHjlruY+Ifi350yb4t0Oy9rU1bHNtX0q1T0mKuTnKkmpCT2Kj+2L7afgsAR3UgBjL3Py89LXmnF5VxSMGJWa6HL3xgEi3CXxBRQFdr+vipIDejWtjY+7DzvSRHid1rVfwCLdLfTwvA3Pf3b0I5DSJnjzRgKkfiH2j7JNFtCvLz+mM5C/4QJzAgNmdyNuDv0qOy07OABtYs/LE60f6ZZ5YMZAloMtA/9qQjJx+c2jO2nTZkx6vNJ5C421yzm2klQSL0d8pFaDmojqC5pT85MYhf3mESqSw1UjwFPa0xFtysT52oJBcyvwI/wBYbK40sArjSDZaU2Jncw9ptDWML/xUM+sWHF7ZW/mI1V15lqaCBX91xlbppfWDMgNF2c60vC90t0entbGpYLvHjQMdW6iucbsLLN5KAPzYPuufX2vJa8V1gxMxZ7CLcVLx9lmm3uEdrOZLEg4Fg7H7Xqc2JRygbNrTtOeBw1/o73znnnjEv8Vl3xqg= +notifications: + email: + on_success: never +os: + - linux + - osx diff --git a/Cargo.toml b/Cargo.toml index dfadac02de6..469ab143312 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,19 +1,18 @@ [package] -name = "futures-mio" +name = "tokio-core" version = "0.1.0" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" -repository = "https://github.com/alexcrichton/futures-rs" -homepage = "https://github.com/alexcrichton/futures-rs" -documentation = "http://alexcrichton.com/futures-rs/futures_mio/" +repository = "https://github.com/tokio-rs/tokio-core" +homepage = "https://github.com/tokio-rs/tokio-core" +documentation = "https://tokio-rs.github.io/tokio-core" description = """ -Bindings from the `futures` crate to the `mio` crate to get I/O in the form of -futures and streams. +Core I/O and event loop primitives for asynchronous I/O in Rust. Foundation for +the rest of the tokio crates. """ [dependencies] -futures = { path = "..", version = "0.1.0" } -futures-io = { path = "../futures-io", version = "0.1.0" } +futures = { git = "https://github.com/alexcrichton/futures-rs" } log = "0.3" mio = { git = "https://github.com/carllerche/mio" } scoped-tls = "0.1.0" diff --git a/README.md b/README.md index c09ac5dcbe1..98ea7739408 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ -# futures-mio +# tokio-core -Bindings to the `mio` crate implementing the `futures-io` and `futures` -abstractions. +Core I/O and event loop abstraction for asynchronous I/O in Rust built on +`futures` and `mio`. -[![Build Status](https://travis-ci.org/alexcrichton/futures-rs.svg?branch=master)](https://travis-ci.org/alexcrichton/futures-rs) +[![Build Status](https://travis-ci.org/tokio-rs/tokio-core.svg?branch=master)](https://travis-ci.org/tokio-rs/tokio-core) [![Build status](https://ci.appveyor.com/api/projects/status/yl5w3ittk4kggfsh?svg=true)](https://ci.appveyor.com/project/alexcrichton/futures-rs) -[Documentation](http://alexcrichton.com/futures-rs/futures_mio) +[Documentation](https://tokio-rs.github.io/tokio-core) ## Usage @@ -14,13 +14,13 @@ First, add this to your `Cargo.toml`: ```toml [dependencies] -futures-mio = { git = "https://github.com/alexcrichton/futures-rs" } +tokio-core = { git = "https://github.com/tokio-rs/tokio-core" } ``` Next, add this to your crate: ```rust -extern crate futures_mio; +extern crate tokio_core; ``` ## Examples @@ -30,18 +30,17 @@ There are a few small examples showing off how to use this library: * [echo.rs] - a simple TCP echo server * [socks5.rs] - an implementation of a SOCKSv5 proxy server -[echo.rs]: https://github.com/alexcrichton/futures-rs/blob/master/futures-mio/src/bin/echo.rs -[socks5.rs]: https://github.com/alexcrichton/futures-rs/blob/master/futures-socks5/src/main.rs +[echo.rs]: https://github.com/tokio-rs/tokio-core/blob/master/src/bin/echo.rs +[socks5.rs]: https://github.com/tokio-rs/tokio-socks5/blob/master/src/main.rs -## What is futures-mio? +## What is tokio-core? This crate is a connection `futures`, a zero-cost implementation of futures in -Rust, and `mio`, a crate for zero-cost asynchronous I/O, and `futures-io`, -abstractions for I/O on top of the `futures` crate. The types and structures -implemented in `futures-mio` implement `Future` and `Stream` traits as -appropriate. For example connecting a TCP stream returns a `Future` resolving -to a TCP stream, and a TCP listener implements a stream of TCP streams -(accepted connections). +Rust, and `mio` and a crate for zero-cost asynchronous I/O. The types and +structures implemented in `tokio-core` implement `Future` and `Stream` traits +as appropriate. For example connecting a TCP stream returns a `Future` +resolving to a TCP stream, and a TCP listener implements a stream of TCP +streams (accepted connections). This crate also provides facilities such as: @@ -52,20 +51,20 @@ This crate also provides facilities such as: * Data owned and local to the event loop * An `Executor` implementation for a futures' `Task` -The intention of `futures-mio` is to provide a concrete implementation for -crates built on top of `futures-io`. For example you can easily turn a TCP -stream into a TLS/SSL stream with the [`futures-tls`] crate or use the -combinators to compose working with data on sockets. +The intention of `tokio-core` is to provide a concrete implementation for crates +built on top of asynchronous I/O. For example you can easily turn a TCP stream +into a TLS/SSL stream with the [`tokio-tls`] crate or use the combinators to +compose working with data on sockets. -[`futures-tls`]: http://alexcrichton.com/futures-rs/futures_tls +[`tokio-tls`]: https://tokio-rs.github.io/tokio-tls Check out the [documentation] for more information, and more coming here soon! -[documentation]: http://alexcrichton.com/futures-rs/futures_mio +[documentation]: https://tokio-rs.github.io/tokio-core # License -`futures-mio` is primarily distributed under the terms of both the MIT license +`tokio-core` is primarily distributed under the terms of both the MIT license and the Apache License (Version 2.0), with portions covered by various BSD-like licenses. diff --git a/src/bin/echo.rs b/src/bin/echo.rs index ccd9475221b..710db79d3cb 100644 --- a/src/bin/echo.rs +++ b/src/bin/echo.rs @@ -1,22 +1,21 @@ //! An echo server that just writes back everything that's written to it. extern crate futures; -extern crate futures_io; -extern crate futures_mio; +extern crate tokio_core; use std::env; use std::net::SocketAddr; use futures::Future; use futures::stream::Stream; -use futures_io::{copy, TaskIo}; +use tokio_core::io::{copy, TaskIo}; fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::().unwrap(); // Create the event loop that will drive this server - let mut l = futures_mio::Loop::new().unwrap(); + let mut l = tokio_core::Loop::new().unwrap(); // Create a TCP listener which will listen for incoming connections let server = l.handle().tcp_listen(&addr); diff --git a/src/bin/sink.rs b/src/bin/sink.rs index 49feed3584b..baf96b8eeb9 100644 --- a/src/bin/sink.rs +++ b/src/bin/sink.rs @@ -5,8 +5,7 @@ #[macro_use] extern crate futures; -extern crate futures_io; -extern crate futures_mio; +extern crate tokio_core; use std::env; use std::iter; @@ -14,13 +13,13 @@ use std::net::SocketAddr; use futures::Future; use futures::stream::{self, Stream}; -use futures_io::IoFuture; +use tokio_core::io::IoFuture; fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::().unwrap(); - let mut l = futures_mio::Loop::new().unwrap(); + let mut l = tokio_core::Loop::new().unwrap(); let server = l.handle().tcp_listen(&addr).and_then(|socket| { socket.incoming().and_then(|(socket, addr)| { println!("got a socket: {}", addr); @@ -34,10 +33,10 @@ fn main() { l.run(server).unwrap(); } -fn write(socket: futures_mio::TcpStream) -> IoFuture<()> { +fn write(socket: tokio_core::TcpStream) -> IoFuture<()> { static BUF: &'static [u8] = &[0; 64 * 1024]; let iter = iter::repeat(()).map(|()| Ok(())); stream::iter(iter).fold(socket, |socket, ()| { - futures_io::write_all(socket, BUF).map(|(socket, _)| socket) + tokio_core::io::write_all(socket, BUF).map(|(socket, _)| socket) }).map(|_| ()).boxed() } diff --git a/src/channel.rs b/src/channel.rs index cd5fef469e5..3da44b531aa 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -3,10 +3,10 @@ use std::sync::mpsc::TryRecvError; use futures::{Future, Poll}; use futures::stream::Stream; -use futures_io::IoFuture; use mio::channel; use {ReadinessStream, LoopHandle}; +use io::IoFuture; /// The transmission half of a channel used for sending messages to a receiver. /// diff --git a/src/io/copy.rs b/src/io/copy.rs new file mode 100644 index 00000000000..042e819bd6d --- /dev/null +++ b/src/io/copy.rs @@ -0,0 +1,82 @@ +use std::io::{self, Read, Write}; + +use futures::{Future, Poll}; + +/// A future which will copy all data from a reader into a writer. +/// +/// Created by the `copy` function, this future will resolve to the number of +/// bytes copied or an error if one happens. +pub struct Copy { + reader: R, + read_done: bool, + writer: W, + pos: usize, + cap: usize, + amt: u64, + buf: Box<[u8]>, +} + +/// Creates a future which represents copying all the bytes from one object to +/// another. +/// +/// The returned future will copy all the bytes read from `reader` into the +/// `writer` specified. This future will only complete once the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned and the `reader` and `writer` are +/// consumed. On error the error is returned and the I/O objects are consumed as +/// well. +pub fn copy(reader: R, writer: W) -> Copy + where R: Read, + W: Write, +{ + Copy { + reader: reader, + read_done: false, + writer: writer, + amt: 0, + pos: 0, + cap: 0, + buf: Box::new([0; 2048]), + } +} + +impl Future for Copy + where R: Read, + W: Write, +{ + type Item = u64; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + loop { + // If our buffer is empty, then we need to read some data to + // continue. + if self.pos == self.cap && !self.read_done { + let n = try_nb!(self.reader.read(&mut self.buf)); + if n == 0 { + self.read_done = true; + } else { + self.pos = 0; + self.cap = n; + } + } + + // If our buffer has some data, let's write it out! + while self.pos < self.cap { + let i = try_nb!(self.writer.write(&self.buf[self.pos..self.cap])); + self.pos += i; + self.amt += i as u64; + } + + // If we've written al the data and we've seen EOF, flush out the + // data and finish the transfer. + // done with the entire transfer. + if self.pos == self.cap && self.read_done { + try_nb!(self.writer.flush()); + return Poll::Ok(self.amt) + } + } + } +} diff --git a/src/io/flush.rs b/src/io/flush.rs new file mode 100644 index 00000000000..159a178b648 --- /dev/null +++ b/src/io/flush.rs @@ -0,0 +1,39 @@ +use std::io::{self, Write}; + +use futures::{Poll, Future}; + +/// A future used to fully flush an I/O object. +/// +/// Resolves to the underlying I/O object once the flush operation is complete. +/// +/// Created by the `flush` function. +pub struct Flush { + a: Option, +} + +/// Creates a future which will entirely flush an I/O object and then yield the +/// object itself. +/// +/// This function will consume the object provided if an error happens, and +/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling +/// a retry if `WouldBlock` is seen along the way. +pub fn flush(a: A) -> Flush + where A: Write, +{ + Flush { + a: Some(a), + } +} + +impl Future for Flush + where A: Write, +{ + type Item = A; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + try_nb!(self.a.as_mut().unwrap().flush()); + Poll::Ok(self.a.take().unwrap()) + } +} + diff --git a/src/io/mod.rs b/src/io/mod.rs new file mode 100644 index 00000000000..41a73978216 --- /dev/null +++ b/src/io/mod.rs @@ -0,0 +1,47 @@ +//! I/O conveniences when working with primitives in `tokio-core` +//! +//! Contains various combinators to work with I/O objects and type definitions +//! as well. + +use std::io; + +use futures::BoxFuture; +use futures::stream::BoxStream; + +/// A convenience typedef around a `Future` whose error component is `io::Error` +pub type IoFuture = BoxFuture; + +/// A convenience typedef around a `Stream` whose error component is `io::Error` +pub type IoStream = BoxStream; + +/// A convenience macro for working with `io::Result` from the `Read` and +/// `Write` traits. +/// +/// This macro takes `io::Result` as input, and returns `T` as the output. If +/// the input type is of the `Err` variant, then `Poll::NotReady` is returned if +/// it indicates `WouldBlock` or otherwise `Err` is returned. +#[macro_export] +macro_rules! try_nb { + ($e:expr) => (match $e { + Ok(t) => t, + Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => { + return ::futures::Poll::NotReady + } + Err(e) => return ::futures::Poll::Err(e.into()), + }) +} + +mod copy; +mod flush; +mod read_exact; +mod read_to_end; +mod task; +mod window; +mod write_all; +pub use self::copy::{copy, Copy}; +pub use self::flush::{flush, Flush}; +pub use self::read_exact::{read_exact, ReadExact}; +pub use self::read_to_end::{read_to_end, ReadToEnd}; +pub use self::task::{TaskIo, TaskIoRead, TaskIoWrite}; +pub use self::window::Window; +pub use self::write_all::{write_all, WriteAll}; diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs new file mode 100644 index 00000000000..b251bfeaa84 --- /dev/null +++ b/src/io/read_exact.rs @@ -0,0 +1,77 @@ +use std::io::{self, Read}; +use std::mem; + +use futures::{Poll, Future}; + +/// A future which can be used to easily read the entire contents of a stream +/// into a vector. +/// +/// Created by the `read_exact` function. +pub struct ReadExact { + state: State, +} + +enum State { + Reading { + a: A, + buf: T, + pos: usize, + }, + Empty, +} + +/// Creates a future which will read exactly enough bytes to fill `buf`, +/// returning an error if EOF is hit sooner. +/// +/// The returned future will resolve to both the I/O stream as well as the +/// buffer once the read operation is completed. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_exact(a: A, buf: T) -> ReadExact + where A: Read, + T: AsMut<[u8]>, +{ + ReadExact { + state: State::Reading { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn eof() -> io::Error { + io::Error::new(io::ErrorKind::UnexpectedEof, "early eof") +} + +impl Future for ReadExact + where A: Read, + T: AsMut<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Reading { ref mut a, ref mut buf, ref mut pos } => { + let buf = buf.as_mut(); + while *pos < buf.len() { + let n = try_nb!(a.read(&mut buf[*pos..])); + *pos += n; + if n == 0 { + return Poll::Err(eof()) + } + } + } + State::Empty => panic!("poll a WriteAll after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf, .. } => Poll::Ok((a, buf)), + State::Empty => panic!(), + } + } +} diff --git a/src/io/read_to_end.rs b/src/io/read_to_end.rs new file mode 100644 index 00000000000..80e7cd6c556 --- /dev/null +++ b/src/io/read_to_end.rs @@ -0,0 +1,62 @@ +use std::io::{self, Read}; +use std::mem; + +use futures::{Poll, Future}; + +/// A future which can be used to easily read the entire contents of a stream +/// into a vector. +/// +/// Created by the `read_to_end` function. +pub struct ReadToEnd { + state: State, +} + +enum State { + Reading { + a: A, + buf: Vec, + }, + Empty, +} + +/// Creates a future which will read all the bytes associated with the I/O +/// object `A` into the buffer provided. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_to_end(a: A, buf: Vec) -> ReadToEnd + where A: Read, +{ + ReadToEnd { + state: State::Reading { + a: a, + buf: buf, + } + } +} + +impl Future for ReadToEnd + where A: Read, +{ + type Item = (A, Vec); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, Vec), io::Error> { + match self.state { + State::Reading { ref mut a, ref mut buf } => { + // If we get `Ok`, then we know the stream hit EOF and we're done. If we + // hit "would block" then all the read data so far is in our buffer, and + // otherwise we propagate errors + try_nb!(a.read_to_end(buf)); + }, + State::Empty => panic!("poll ReadToEnd after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf } => Poll::Ok((a, buf)), + State::Empty => unreachable!(), + } + } +} diff --git a/src/io/task.rs b/src/io/task.rs new file mode 100644 index 00000000000..3c87a32e350 --- /dev/null +++ b/src/io/task.rs @@ -0,0 +1,102 @@ +use std::cell::RefCell; +use std::io::{self, Read, Write}; + +use futures::task::TaskData; + +/// Abstraction that allows inserting an I/O object into task-local storage, +/// returning a handle that can be split. +/// +/// A `TaskIo` handle implements the `ReadTask` and `WriteTask` and will only +/// work with the same task that the associated object was inserted into. The +/// handle may then be optionally `split` into the read/write halves so they can +/// be worked with independently. +/// +/// Note that it is important that the future returned from `TaskIo::new`, when +/// polled, will pin the yielded `TaskIo` object to that specific task. Any +/// attempt to read or write the object on other tasks will result in a panic. +pub struct TaskIo { + handle: TaskData>, +} + +/// The readable half of a `TaskIo` instance returned from `TaskIo::split`. +/// +/// This handle implements the `ReadTask` trait and can be used to split up an +/// I/O object into two distinct halves. +pub struct TaskIoRead { + handle: TaskData>, +} + +/// The writable half of a `TaskIo` instance returned from `TaskIo::split`. +/// +/// This handle implements the `WriteTask` trait and can be used to split up an +/// I/O object into two distinct halves. +pub struct TaskIoWrite { + handle: TaskData>, +} + +impl TaskIo { + /// Returns a new future which represents the insertion of the I/O object + /// `T` into task local storage, returning a `TaskIo` handle to it. + /// + /// The returned future will never resolve to an error. + pub fn new(t: T) -> TaskIo { + TaskIo { + handle: TaskData::new(RefCell::new(t)), + } + } +} + +impl TaskIo + where T: Read + Write, +{ + /// For an I/O object which is both readable and writable, this method can + /// be used to split the handle into two independently owned halves. + /// + /// The returned pair implements the `ReadTask` and `WriteTask` traits, + /// respectively, and can be used to pass around the object to different + /// combinators if necessary. + pub fn split(self) -> (TaskIoRead, TaskIoWrite) { + (TaskIoRead { handle: self.handle.clone() }, + TaskIoWrite { handle: self.handle }) + } +} + +impl Read for TaskIo + where T: io::Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.handle.with(|t| t.borrow_mut().read(buf)) + } +} + +impl Write for TaskIo + where T: io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result { + self.handle.with(|t| t.borrow_mut().write(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + self.handle.with(|t| t.borrow_mut().flush()) + } +} + +impl Read for TaskIoRead + where T: io::Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.handle.with(|t| t.borrow_mut().read(buf)) + } +} + +impl Write for TaskIoWrite + where T: io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result { + self.handle.with(|t| t.borrow_mut().write(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + self.handle.with(|t| t.borrow_mut().flush()) + } +} diff --git a/src/io/window.rs b/src/io/window.rs new file mode 100644 index 00000000000..c61360990e2 --- /dev/null +++ b/src/io/window.rs @@ -0,0 +1,116 @@ +use std::ops; + +/// A owned window around an underlying buffer. +/// +/// Normally slices work great for considering sub-portions of a buffer, but +/// unfortunately a slice is a *borrowed* type in Rust which has an associated +/// lifetime. When working with future and async I/O these lifetimes are not +/// always appropriate, and are sometimes difficult to store in tasks. This +/// type strives to fill this gap by providing an "owned slice" around an +/// underlying buffer of bytes. +/// +/// A `Window` wraps an underlying buffer, `T`, and has configurable +/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation +/// that this type carries. +/// +/// This type can be particularly useful when working with the `write_all` +/// combinator in this crate. Data can be sliced via `Window`, consumed by +/// `write_all`, and then earned back once the write operation finishes through +/// the `into_inner` method on this type. +pub struct Window { + inner: T, + range: ops::Range, +} + +impl> Window { + /// Creates a new window around the buffer `t` defaulting to the entire + /// slice. + /// + /// Further methods can be called on the returned `Window` to alter the + /// window into the data provided. + pub fn new(t: T) -> Window { + Window { + range: 0..t.as_ref().len(), + inner: t, + } + } + + /// Gets a shared reference to the underlying buffer inside of this + /// `Window`. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Gets a mutable reference to the underlying buffer inside of this + /// `Window`. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes this `Window`, returning the underlying buffer. + pub fn into_inner(self) -> T { + self.inner + } + + /// Returns the starting index of this window into the underlying buffer + /// `T`. + pub fn start(&self) -> usize { + self.range.start + } + + /// Returns the end index of this window into the underlying buffer + /// `T`. + pub fn end(&self) -> usize { + self.range.end + } + + /// Changes the starting index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `start` is out of bounds for the underlying + /// slice or if it comes after the `end` configured in this window. + pub fn set_start(&mut self, start: usize) -> &mut Window { + assert!(start < self.inner.as_ref().len()); + assert!(start <= self.range.end); + self.range.start = start; + self + } + + /// Changes the end index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `end` is out of bounds for the underlying + /// slice or if it comes after the `end` configured in this window. + pub fn set_end(&mut self, end: usize) -> &mut Window { + assert!(end < self.inner.as_ref().len()); + assert!(self.range.start <= end); + self.range.end = end; + self + } + + // TODO: how about a generic set() method along the lines of: + // + // buffer.set(..3) + // .set(0..2) + // .set(4..) + // + // etc. +} + +impl> AsRef<[u8]> for Window { + fn as_ref(&self) -> &[u8] { + &self.inner.as_ref()[self.range.start..self.range.end] + } +} + +impl> AsMut<[u8]> for Window { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.inner.as_mut()[self.range.start..self.range.end] + } +} diff --git a/src/io/write_all.rs b/src/io/write_all.rs new file mode 100644 index 00000000000..df4751df927 --- /dev/null +++ b/src/io/write_all.rs @@ -0,0 +1,80 @@ +use std::io::{self, Write}; +use std::mem; + +use futures::{Poll, Future}; + +/// A future used to write the entire contents of some data to a stream. +/// +/// This is created by the `write_all` top-level method. +pub struct WriteAll { + state: State, +} + +enum State { + Writing { + a: A, + buf: T, + pos: usize, + }, + Empty, +} + +/// Creates a future that will write the entire contents of the buffer `buf` to +/// the stream `a` provided. +/// +/// The returned future will not return until all the data has been written, and +/// the future will resolve to the stream as well as the buffer (for reuse if +/// needed). +/// +/// Any error which happens during writing will cause both the stream and the +/// buffer to get destroyed. +/// +/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should +/// be broadly applicable to accepting data which can be converted to a slice. +/// The `Window` struct is also available in this crate to provide a different +/// window into a slice if necessary. +pub fn write_all(a: A, buf: T) -> WriteAll + where A: Write, + T: AsRef<[u8]>, +{ + WriteAll { + state: State::Writing { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn zero_write() -> io::Error { + io::Error::new(io::ErrorKind::WriteZero, "zero-length write") +} + +impl Future for WriteAll + where A: Write, + T: AsRef<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Writing { ref mut a, ref buf, ref mut pos } => { + let buf = buf.as_ref(); + while *pos < buf.len() { + let n = try_nb!(a.write(&buf[*pos..])); + *pos += n; + if n == 0 { + return Poll::Err(zero_write()) + } + } + } + State::Empty => panic!("poll a WriteAll after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Writing { a, buf, .. } => Poll::Ok((a, buf)), + State::Empty => panic!(), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 449dc6004c2..3250a0dc876 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,6 @@ #![deny(missing_docs)] extern crate futures; -extern crate futures_io; extern crate mio; extern crate slab; @@ -16,9 +15,7 @@ extern crate scoped_tls; #[macro_use] extern crate log; -#[path = "../../src/slot.rs"] mod slot; -#[path = "../../src/lock.rs"] mod lock; mod channel; @@ -30,6 +27,8 @@ mod timeout; mod timer_wheel; mod udp; +pub mod io; + pub use channel::{Sender, Receiver}; pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout}; pub use event_loop::{LoopData, AddLoopData, TimeoutToken, IoToken}; diff --git a/src/lock.rs b/src/lock.rs new file mode 100644 index 00000000000..e5bc6b2b6a9 --- /dev/null +++ b/src/lock.rs @@ -0,0 +1,106 @@ +//! A "mutex" which only supports try_lock +//! +//! As a futures library the eventual call to an event loop should be the only +//! thing that ever blocks, so this is assisted with a fast user-space +//! implementation of a lock that can only have a `try_lock` operation. + +extern crate core; + +use self::core::cell::UnsafeCell; +use self::core::ops::{Deref, DerefMut}; +use self::core::sync::atomic::Ordering::{Acquire, Release}; +use self::core::sync::atomic::AtomicBool; + +/// A "mutex" around a value, similar to `std::sync::Mutex`. +/// +/// This lock only supports the `try_lock` operation, however, and does not +/// implement poisoning. +pub struct Lock { + locked: AtomicBool, + data: UnsafeCell, +} + +/// Sentinel representing an acquired lock through which the data can be +/// accessed. +pub struct TryLock<'a, T: 'a> { + __ptr: &'a Lock, +} + +// The `Lock` structure is basically just a `Mutex`, and these two impls are +// intended to mirror the standard library's corresponding impls for `Mutex`. +// +// If a `T` is sendable across threads, so is the lock, and `T` must be sendable +// across threads to be `Sync` because it allows mutable access from multiple +// threads. +unsafe impl Send for Lock {} +unsafe impl Sync for Lock {} + +impl Lock { + /// Creates a new lock around the given value. + pub fn new(t: T) -> Lock { + Lock { + locked: AtomicBool::new(false), + data: UnsafeCell::new(t), + } + } + + /// Attempts to acquire this lock, returning whether the lock was acquired or + /// not. + /// + /// If `Some` is returned then the data this lock protects can be accessed + /// through the sentinel. This sentinel allows both mutable and immutable + /// access. + /// + /// If `None` is returned then the lock is already locked, either elsewhere + /// on this thread or on another thread. + pub fn try_lock(&self) -> Option> { + if !self.locked.swap(true, Acquire) { + Some(TryLock { __ptr: self }) + } else { + None + } + } +} + +impl<'a, T> Deref for TryLock<'a, T> { + type Target = T; + fn deref(&self) -> &T { + // The existence of `TryLock` represents that we own the lock, so we + // can safely access the data here. + unsafe { &*self.__ptr.data.get() } + } +} + +impl<'a, T> DerefMut for TryLock<'a, T> { + fn deref_mut(&mut self) -> &mut T { + // The existence of `TryLock` represents that we own the lock, so we + // can safely access the data here. + // + // Additionally, we're the *only* `TryLock` in existence so mutable + // access should be ok. + unsafe { &mut *self.__ptr.data.get() } + } +} + +impl<'a, T> Drop for TryLock<'a, T> { + fn drop(&mut self) { + self.__ptr.locked.store(false, Release); + } +} + +#[cfg(test)] +mod tests { + use super::Lock; + + #[test] + fn smoke() { + let a = Lock::new(1); + let mut a1 = a.try_lock().unwrap(); + assert!(a.try_lock().is_none()); + assert_eq!(*a1, 1); + *a1 = 2; + drop(a1); + assert_eq!(*a.try_lock().unwrap(), 2); + assert_eq!(*a.try_lock().unwrap(), 2); + } +} diff --git a/src/slot.rs b/src/slot.rs new file mode 100644 index 00000000000..d802c98764a --- /dev/null +++ b/src/slot.rs @@ -0,0 +1,691 @@ +//! A slot in memory for communicating between a producer and a consumer. +//! +//! This module contains an implementation detail of this library for a type +//! which is only intended to be shared between one consumer and one producer of +//! a value. It is unlikely that this module will survive stabilization of this +//! library, so it is not recommended to rely on it. + +#![allow(dead_code)] // imported in a few places + +use std::prelude::v1::*; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use lock::Lock; + +/// A slot in memory intended to represent the communication channel between one +/// producer and one consumer. +/// +/// Each slot contains space for a piece of data of type `T`, and can have +/// callbacks registered to run when the slot is either full or empty. +/// +/// Slots are only intended to be shared between exactly one producer and +/// exactly one consumer. If there are multiple concurrent producers or +/// consumers then this is still memory safe but will have unpredictable results +/// (and maybe panics). Note that this does not require that the "consumer" is +/// the same for the entire lifetime of a slot, simply that there is only one +/// consumer at a time. +/// +/// # Registering callbacks +/// +/// [`on_empty`](#method.on_empty) registers a callback to run when the slot +/// becomes empty, and [`on_full`](#method.on_full) registers one to run when it +/// becomes full. In both cases, the callback will run immediately if possible. +/// +/// At most one callback can be registered at any given time: it is an error to +/// attempt to register a callback with `on_full` if one is currently registered +/// via `on_empty`, or any other combination. +/// +/// # Cancellation +/// +/// Registering a callback returns a `Token` which can be used to +/// [`cancel`](#method.cancel) the callback. Only callbacks that have not yet +/// started running can be canceled. Canceling a callback that has already run +/// is not an error, and `cancel` does not signal whether or not the callback +/// was actually canceled to the caller. +pub struct Slot { + // The purpose of this data type is to communicate when a value becomes + // available and coordinate between a producer and consumer about that + // value. Slots end up being at the core of many futures as they handle + // values transferring between producers and consumers, which means that + // they can never block. + // + // As a result, this `Slot` is a lock-free implementation in terms of not + // actually blocking at any point in time. The `Lock` types are + // half-optional and half-not-optional. They aren't actually mutexes as they + // only support a `try_lock` operation, and all the methods below ensure + // that progress can always be made without blocking. + // + // The `state` variable keeps track of the state of this slot, while the + // other fields here are just the payloads of the slot itself. Note that the + // exact bits of `state` are typically wrapped up in a `State` for + // inspection (see below). + state: AtomicUsize, + slot: Lock>, + on_full: Lock>>>, + on_empty: Lock>, Option)>>, +} + +/// Error value returned from erroneous calls to `try_produce`, which contains +/// the value that was passed to `try_produce`. +#[derive(Debug, PartialEq)] +pub struct TryProduceError(T); + +/// Error value returned from erroneous calls to `try_consume`. +#[derive(Debug, PartialEq)] +pub struct TryConsumeError(()); + +/// Error value returned from erroneous calls to `on_full`. +#[derive(Debug, PartialEq)] +pub struct OnFullError(()); + +/// Error value returned from erroneous calls to `on_empty`. +#[derive(Debug, PartialEq)] +pub struct OnEmptyError(()); + +/// A `Token` represents a registered callback, and can be used to cancel the callback. +#[derive(Clone, Copy)] +pub struct Token(usize); + +// Slot state: the lowest 3 bits are flags; the remaining bits are used to +// store the `Token` for the currently registered callback. The special token +// value 0 means no callback is registered. +// +// The flags are: +// - `DATA`: the `Slot` contains a value +// - `ON_FULL`: the `Slot` has an `on_full` callback registered +// - `ON_EMPTY`: the `Slot` has an `on_empty` callback registered +struct State(usize); + +const DATA: usize = 1 << 0; +const ON_FULL: usize = 1 << 1; +const ON_EMPTY: usize = 1 << 2; +const STATE_BITS: usize = 3; +const STATE_MASK: usize = (1 << STATE_BITS) - 1; + +fn _is_send() {} +fn _is_sync() {} + +fn _assert() { + _is_send::>(); + _is_sync::>(); +} + +impl Slot { + /// Creates a new `Slot` containing `val`, which may be `None` to create an + /// empty `Slot`. + pub fn new(val: Option) -> Slot { + Slot { + state: AtomicUsize::new(if val.is_some() {DATA} else {0}), + slot: Lock::new(val), + on_full: Lock::new(None), + on_empty: Lock::new(None), + } + } + + /// Attempts to store `t` in the slot. + /// + /// This method can only be called by the one consumer working on this + /// `Slot`. Concurrent calls to this method or `on_empty` will result in + /// panics or possibly errors. + /// + /// # Errors + /// + /// Returns `Err` if the slot is already full. The value you attempted to + /// store is included in the error value. + /// + /// # Panics + /// + /// This method will panic if called concurrently with `try_produce` or + /// `on_empty`, or if `on_empty` has been called previously but the callback + /// hasn't fired. + pub fn try_produce(&self, t: T) -> Result<(), TryProduceError> { + // First up, let's take a look at our current state. Of our three flags, + // we check a few: + // + // * DATA - if this is set, then the production fails as a value has + // already been produced and we're not ready to receive it yet. + // * ON_EMPTY - this should never be set as it indicates a contract + // violation as the producer already registered interest in + // a value but the callback wasn't fired. + // * ON_FULL - doesn't matter in this use case, we don't check it as + // either state is valid. + let mut state = State(self.state.load(Ordering::SeqCst)); + assert!(!state.flag(ON_EMPTY)); + if state.flag(DATA) { + return Err(TryProduceError(t)) + } + + // Ok, so we've determined that our state is either `ON_FULL` or `0`, in + // both cases we're going to store our data into our slot. This should + // always succeed as access to `slot` is gated on the `DATA` flag being + // set on the consumer side (which isn't set) and there should only be + // one producer. + let mut slot = self.slot.try_lock().expect("interference with consumer?"); + assert!(slot.is_none()); + *slot = Some(t); + drop(slot); + + // Next, we update our state with `DATA` to say that something is + // available, and we also unset `ON_FULL` because we'll invoke the + // callback if it's available. + loop { + assert!(!state.flag(ON_EMPTY)); + let new_state = state.set_flag(DATA, true).set_flag(ON_FULL, false); + let old = self.state.compare_and_swap(state.0, + new_state.0, + Ordering::SeqCst); + if old == state.0 { + break + } + state.0 = old; + } + + // If our previous state we transitioned from indicates that it has an + // on-full callback, we call that callback here. There's a few unwraps + // here that should never fail because the consumer shouldn't be placing + // another callback here and there shouldn't be any other producers as + // well. + if state.flag(ON_FULL) { + let cb = self.on_full.try_lock().expect("interference2") + .take().expect("ON_FULL but no callback"); + cb.call_box(self); + } + Ok(()) + } + + /// Registers `f` as a callback to run when the slot becomes empty. + /// + /// The callback will run immediately if the slot is already empty. Returns + /// a token that can be used to cancel the callback. This method is to be + /// called by the producer, and it is illegal to call this method + /// concurrently with either `on_empty` or `try_produce`. + /// + /// # Panics + /// + /// Panics if another callback was already registered via `on_empty` or + /// `on_full`, or if this value is called concurrently with other producer + /// methods. + pub fn on_empty(&self, item: Option, f: F) -> Token + where F: FnOnce(&Slot, Option) + Send + 'static + { + // First up, as usual, take a look at our state. Of the three flags we + // check two: + // + // * DATA - if set, we keep going, but if unset we're done as there's no + // data and we're already empty. + // * ON_EMPTY - this should be impossible as it's a contract violation + // to call this twice or concurrently. + // * ON_FULL - it's illegal to have both an empty and a full callback + // simultaneously, so we check this just after we ensure + // there's data available. If there's data there should not + // be a full callback as it should have been called. + let mut state = State(self.state.load(Ordering::SeqCst)); + assert!(!state.flag(ON_EMPTY)); + if !state.flag(DATA) { + f(self, item); + return Token(0) + } + assert!(!state.flag(ON_FULL)); + + // At this point we've precisely determined that our state is `DATA` and + // all other flags are unset. We're cleared for landing in initializing + // the `on_empty` slot so we store our callback here. + let mut slot = self.on_empty.try_lock().expect("on_empty interference"); + assert!(slot.is_none()); + *slot = Some((Box::new(f), item)); + drop(slot); + + // In this loop, we transition ourselves from the `DATA` state to a + // state which has the on empty flag state. Note that we also increase + // the token of this state as we're registering a new callback. + loop { + assert!(state.flag(DATA)); + assert!(!state.flag(ON_FULL)); + assert!(!state.flag(ON_EMPTY)); + let new_state = state.set_flag(ON_EMPTY, true) + .set_token(state.token() + 1); + let old = self.state.compare_and_swap(state.0, + new_state.0, + Ordering::SeqCst); + + // If we succeeded in the CAS, then we're done and our token is + // valid. + if old == state.0 { + return Token(new_state.token()) + } + state.0 = old; + + // If we failed the CAS but the data was taken in the meantime we + // abort our attempt to set on-empty and call the callback + // immediately. Note that the on-empty flag was never set, so it + // should still be there and should be available to take. + if !state.flag(DATA) { + let cb = self.on_empty.try_lock().expect("on_empty interference2") + .take().expect("on_empty not empty??"); + let (cb, item) = cb; + cb.call_box(self, item); + return Token(0) + } + } + } + + /// Attempts to consume the value stored in the slot. + /// + /// This method can only be called by the one consumer of this slot, and + /// cannot be called concurrently with `try_consume` or `on_full`. + /// + /// # Errors + /// + /// Returns `Err` if the slot is already empty. + /// + /// # Panics + /// + /// This method will panic if called concurrently with `try_consume` or + /// `on_full`, or otherwise show weird behavior. + pub fn try_consume(&self) -> Result { + // The implementation of this method is basically the same as + // `try_produce` above, it's just the opposite of all the operations. + let mut state = State(self.state.load(Ordering::SeqCst)); + assert!(!state.flag(ON_FULL)); + if !state.flag(DATA) { + return Err(TryConsumeError(())) + } + let mut slot = self.slot.try_lock().expect("interference with producer?"); + let val = slot.take().expect("DATA but not data"); + drop(slot); + + loop { + assert!(!state.flag(ON_FULL)); + let new_state = state.set_flag(DATA, false).set_flag(ON_EMPTY, false); + let old = self.state.compare_and_swap(state.0, + new_state.0, + Ordering::SeqCst); + if old == state.0 { + break + } + state.0 = old; + } + assert!(!state.flag(ON_FULL)); + if state.flag(ON_EMPTY) { + let cb = self.on_empty.try_lock().expect("interference3") + .take().expect("ON_EMPTY but no callback"); + let (cb, item) = cb; + cb.call_box(self, item); + } + Ok(val) + } + + /// Registers `f` as a callback to run when the slot becomes full. + /// + /// The callback will run immediately if the slot is already full. Returns a + /// token that can be used to cancel the callback. + /// + /// This method is to be called by the consumer. + /// + /// # Panics + /// + /// Panics if another callback was already registered via `on_empty` or + /// `on_full` or if called concurrently with `on_full` or `try_consume`. + pub fn on_full(&self, f: F) -> Token + where F: FnOnce(&Slot) + Send + 'static + { + // The implementation of this method is basically the same as + // `on_empty` above, it's just the opposite of all the operations. + let mut state = State(self.state.load(Ordering::SeqCst)); + assert!(!state.flag(ON_FULL)); + if state.flag(DATA) { + f(self); + return Token(0) + } + assert!(!state.flag(ON_EMPTY)); + + let mut slot = self.on_full.try_lock().expect("on_full interference"); + assert!(slot.is_none()); + *slot = Some(Box::new(f)); + drop(slot); + + loop { + assert!(!state.flag(DATA)); + assert!(!state.flag(ON_EMPTY)); + assert!(!state.flag(ON_FULL)); + let new_state = state.set_flag(ON_FULL, true) + .set_token(state.token() + 1); + let old = self.state.compare_and_swap(state.0, + new_state.0, + Ordering::SeqCst); + if old == state.0 { + return Token(new_state.token()) + } + state.0 = old; + + if state.flag(DATA) { + let cb = self.on_full.try_lock().expect("on_full interference2") + .take().expect("on_full not full??"); + cb.call_box(self); + return Token(0) + } + } + } + + /// Cancels the callback associated with `token`. + /// + /// Canceling a callback that has already started running, or has already + /// run will do nothing, and is not an error. See + /// [Cancellation](#cancellation). + /// + /// # Panics + /// + /// This method may cause panics if it is called concurrently with + /// `on_empty` or `on_full`, depending on which callback is being canceled. + pub fn cancel(&self, token: Token) { + // Tokens with a value of "0" are sentinels which don't actually do + // anything. + let token = token.0; + if token == 0 { + return + } + + let mut state = State(self.state.load(Ordering::SeqCst)); + loop { + // If we've moved on to a different token, then we're guaranteed + // that our token won't show up again, so we can return immediately + // as our closure has likely already run (or been previously + // canceled). + if state.token() != token { + return + } + + // If our token matches, then let's see if we're cancelling either + // the on-full or on-empty callbacks. It's illegal to have them both + // registered, so we only need to look at one. + // + // If neither are set then the token has probably already run, so we + // just continue along our merry way and don't worry. + let new_state = if state.flag(ON_FULL) { + assert!(!state.flag(ON_EMPTY)); + state.set_flag(ON_FULL, false) + } else if state.flag(ON_EMPTY) { + assert!(!state.flag(ON_FULL)); + state.set_flag(ON_EMPTY, false) + } else { + return + }; + let old = self.state.compare_and_swap(state.0, + new_state.0, + Ordering::SeqCst); + if old == state.0 { + break + } + state.0 = old; + } + + // Figure out which callback we just canceled, and now that the flag is + // unset we should own the callback to clear it. + + if state.flag(ON_FULL) { + let cb = self.on_full.try_lock().expect("on_full interference3") + .take().expect("on_full not full??"); + drop(cb); + } else { + let cb = self.on_empty.try_lock().expect("on_empty interference3") + .take().expect("on_empty not empty??"); + drop(cb); + } + } +} + +impl TryProduceError { + /// Extracts the value that was attempted to be produced. + pub fn into_inner(self) -> T { + self.0 + } +} + +trait FnBox: Send { + fn call_box(self: Box, other: &Slot); +} + +impl FnBox for F + where F: FnOnce(&Slot) + Send, +{ + fn call_box(self: Box, other: &Slot) { + (*self)(other) + } +} + +trait FnBox2: Send { + fn call_box(self: Box, other: &Slot, Option); +} + +impl FnBox2 for F + where F: FnOnce(&Slot, Option) + Send, +{ + fn call_box(self: Box, other: &Slot, item: Option) { + (*self)(other, item) + } +} + +impl State { + fn flag(&self, f: usize) -> bool { + self.0 & f != 0 + } + + fn set_flag(&self, f: usize, val: bool) -> State { + State(if val { + self.0 | f + } else { + self.0 & !f + }) + } + + fn token(&self) -> usize { + self.0 >> STATE_BITS + } + + fn set_token(&self, gen: usize) -> State { + State((gen << STATE_BITS) | (self.0 & STATE_MASK)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::thread; + + use super::Slot; + + #[test] + fn sequential() { + let slot = Slot::new(Some(1)); + + // We can consume once + assert_eq!(slot.try_consume(), Ok(1)); + assert!(slot.try_consume().is_err()); + + // Consume a production + assert_eq!(slot.try_produce(2), Ok(())); + assert_eq!(slot.try_consume(), Ok(2)); + + // Can't produce twice + assert_eq!(slot.try_produce(3), Ok(())); + assert!(slot.try_produce(3).is_err()); + + // on_full is run immediately if full + let hit = Arc::new(AtomicUsize::new(0)); + let hit2 = hit.clone(); + slot.on_full(move |_s| { + hit2.fetch_add(1, Ordering::SeqCst); + }); + assert_eq!(hit.load(Ordering::SeqCst), 1); + + // on_full can be run twice, and we can consume in the callback + let hit2 = hit.clone(); + slot.on_full(move |s| { + hit2.fetch_add(1, Ordering::SeqCst); + assert_eq!(s.try_consume(), Ok(3)); + }); + assert_eq!(hit.load(Ordering::SeqCst), 2); + + // Production can't run a previous callback + assert_eq!(slot.try_produce(4), Ok(())); + assert_eq!(hit.load(Ordering::SeqCst), 2); + assert_eq!(slot.try_consume(), Ok(4)); + + // Productions run new callbacks + let hit2 = hit.clone(); + slot.on_full(move |s| { + hit2.fetch_add(1, Ordering::SeqCst); + assert_eq!(s.try_consume(), Ok(5)); + }); + assert_eq!(slot.try_produce(5), Ok(())); + assert_eq!(hit.load(Ordering::SeqCst), 3); + + // on empty should fire immediately for an empty slot + let hit2 = hit.clone(); + slot.on_empty(None, move |_, _| { + hit2.fetch_add(1, Ordering::SeqCst); + }); + assert_eq!(hit.load(Ordering::SeqCst), 4); + } + + #[test] + fn channel() { + const N: usize = 10000; + + struct Sender { + slot: Arc>, + hit: Arc, + } + + struct Receiver { + slot: Arc>, + hit: Arc, + } + + impl Sender { + fn send(&self, val: usize) { + if self.slot.try_produce(val).is_ok() { + return + } + let me = thread::current(); + self.hit.store(0, Ordering::SeqCst); + let hit = self.hit.clone(); + self.slot.on_empty(None, move |_slot, _| { + hit.store(1, Ordering::SeqCst); + me.unpark(); + }); + while self.hit.load(Ordering::SeqCst) == 0 { + thread::park(); + } + self.slot.try_produce(val).expect("can't produce after on_empty") + } + } + + impl Receiver { + fn recv(&self) -> usize { + if let Ok(i) = self.slot.try_consume() { + return i + } + + let me = thread::current(); + self.hit.store(0, Ordering::SeqCst); + let hit = self.hit.clone(); + self.slot.on_full(move |_slot| { + hit.store(1, Ordering::SeqCst); + me.unpark(); + }); + while self.hit.load(Ordering::SeqCst) == 0 { + thread::park(); + } + self.slot.try_consume().expect("can't consume after on_full") + } + } + + let slot = Arc::new(Slot::new(None)); + let slot2 = slot.clone(); + + let tx = Sender { slot: slot2, hit: Arc::new(AtomicUsize::new(0)) }; + let rx = Receiver { slot: slot, hit: Arc::new(AtomicUsize::new(0)) }; + + let a = thread::spawn(move || { + for i in 0..N { + assert_eq!(rx.recv(), i); + } + }); + + for i in 0..N { + tx.send(i); + } + + a.join().unwrap(); + } + + #[test] + fn cancel() { + let slot = Slot::new(None); + let hits = Arc::new(AtomicUsize::new(0)); + + let add = || { + let hits = hits.clone(); + move |_: &Slot| { hits.fetch_add(1, Ordering::SeqCst); } + }; + let add_empty = || { + let hits = hits.clone(); + move |_: &Slot, _: Option| { + hits.fetch_add(1, Ordering::SeqCst); + } + }; + + // cancel on_full + let n = hits.load(Ordering::SeqCst); + assert_eq!(hits.load(Ordering::SeqCst), n); + let token = slot.on_full(add()); + assert_eq!(hits.load(Ordering::SeqCst), n); + slot.cancel(token); + assert_eq!(hits.load(Ordering::SeqCst), n); + assert!(slot.try_consume().is_err()); + assert!(slot.try_produce(1).is_ok()); + assert!(slot.try_consume().is_ok()); + assert_eq!(hits.load(Ordering::SeqCst), n); + + // cancel on_empty + let n = hits.load(Ordering::SeqCst); + assert_eq!(hits.load(Ordering::SeqCst), n); + slot.try_produce(1).unwrap(); + let token = slot.on_empty(None, add_empty()); + assert_eq!(hits.load(Ordering::SeqCst), n); + slot.cancel(token); + assert_eq!(hits.load(Ordering::SeqCst), n); + assert!(slot.try_produce(1).is_err()); + + // cancel with no effect + let n = hits.load(Ordering::SeqCst); + assert_eq!(hits.load(Ordering::SeqCst), n); + let token = slot.on_full(add()); + assert_eq!(hits.load(Ordering::SeqCst), n + 1); + slot.cancel(token); + assert_eq!(hits.load(Ordering::SeqCst), n + 1); + assert!(slot.try_consume().is_ok()); + let token = slot.on_empty(None, add_empty()); + assert_eq!(hits.load(Ordering::SeqCst), n + 2); + slot.cancel(token); + assert_eq!(hits.load(Ordering::SeqCst), n + 2); + + // cancel old ones don't count + let n = hits.load(Ordering::SeqCst); + assert_eq!(hits.load(Ordering::SeqCst), n); + let token1 = slot.on_full(add()); + assert_eq!(hits.load(Ordering::SeqCst), n); + assert!(slot.try_produce(1).is_ok()); + assert_eq!(hits.load(Ordering::SeqCst), n + 1); + assert!(slot.try_consume().is_ok()); + assert_eq!(hits.load(Ordering::SeqCst), n + 1); + let token2 = slot.on_full(add()); + assert_eq!(hits.load(Ordering::SeqCst), n + 1); + slot.cancel(token1); + assert_eq!(hits.load(Ordering::SeqCst), n + 1); + slot.cancel(token2); + assert_eq!(hits.load(Ordering::SeqCst), n + 1); + } +} diff --git a/src/tcp.rs b/src/tcp.rs index c0aca32b882..bacf2ce13d8 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -5,10 +5,10 @@ use std::net::{self, SocketAddr, Shutdown}; use futures::stream::Stream; use futures::{Future, IntoFuture, failed, Poll}; -use futures_io::{IoFuture, IoStream}; use mio; use {ReadinessStream, LoopHandle}; +use io::{IoFuture, IoStream}; /// An I/O object representing a TCP socket listening for incoming connections. /// diff --git a/src/timeout.rs b/src/timeout.rs index 93199be15ae..e4ada461bdd 100644 --- a/src/timeout.rs +++ b/src/timeout.rs @@ -2,9 +2,9 @@ use std::io; use std::time::{Duration, Instant}; use futures::{Future, Poll}; -use futures_io::IoFuture; use LoopHandle; +use io::IoFuture; use event_loop::TimeoutToken; /// A future representing the notification that a timeout has occurred. diff --git a/src/udp.rs b/src/udp.rs index 5a0de2b58da..b0a4dc6f966 100644 --- a/src/udp.rs +++ b/src/udp.rs @@ -3,10 +3,10 @@ use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr}; use std::fmt; use futures::{Future, failed, Poll}; -use futures_io::IoFuture; use mio; use {ReadinessStream, LoopHandle}; +use io::IoFuture; /// An I/O object representing a UDP socket. pub struct UdpSocket { diff --git a/tests/buffered.rs b/tests/buffered.rs index a6bc899259a..3da24e85e15 100644 --- a/tests/buffered.rs +++ b/tests/buffered.rs @@ -1,6 +1,5 @@ extern crate futures; -extern crate futures_io; -extern crate futures_mio; +extern crate tokio_core; extern crate env_logger; use std::net::TcpStream; @@ -9,7 +8,7 @@ use std::io::{Read, Write, BufReader, BufWriter}; use futures::Future; use futures::stream::Stream; -use futures_io::copy; +use tokio_core::io::copy; macro_rules! t { ($e:expr) => (match $e { @@ -23,7 +22,7 @@ fn echo_server() { const N: usize = 1024; drop(env_logger::init()); - let mut l = t!(futures_mio::Loop::new()); + let mut l = t!(tokio_core::Loop::new()); let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = t!(l.run(srv)); let addr = t!(srv.local_addr()); diff --git a/tests/chain.rs b/tests/chain.rs index 410b9a0e6df..daeb52ab66f 100644 --- a/tests/chain.rs +++ b/tests/chain.rs @@ -1,6 +1,5 @@ extern crate futures; -extern crate futures_io; -extern crate futures_mio; +extern crate tokio_core; use std::net::TcpStream; use std::thread; @@ -8,7 +7,7 @@ use std::io::{Write, Read}; use futures::Future; use futures::stream::Stream; -use futures_io::read_to_end; +use tokio_core::io::read_to_end; macro_rules! t { ($e:expr) => (match $e { @@ -19,7 +18,7 @@ macro_rules! t { #[test] fn chain_clients() { - let mut l = t!(futures_mio::Loop::new()); + let mut l = t!(tokio_core::Loop::new()); let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = t!(l.run(srv)); let addr = t!(srv.local_addr()); diff --git a/tests/echo.rs b/tests/echo.rs index 77635539e66..a104beef58a 100644 --- a/tests/echo.rs +++ b/tests/echo.rs @@ -1,7 +1,6 @@ extern crate env_logger; extern crate futures; -extern crate futures_io; -extern crate futures_mio; +extern crate tokio_core; use std::io::{Read, Write}; use std::net::TcpStream; @@ -9,7 +8,7 @@ use std::thread; use futures::Future; use futures::stream::Stream; -use futures_io::{copy, TaskIo}; +use tokio_core::io::{copy, TaskIo}; macro_rules! t { ($e:expr) => (match $e { @@ -22,7 +21,7 @@ macro_rules! t { fn echo_server() { drop(env_logger::init()); - let mut l = t!(futures_mio::Loop::new()); + let mut l = t!(tokio_core::Loop::new()); let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = t!(l.run(srv)); let addr = t!(srv.local_addr()); diff --git a/tests/limit.rs b/tests/limit.rs index 620c495e4f4..d7caaad02ff 100644 --- a/tests/limit.rs +++ b/tests/limit.rs @@ -1,6 +1,5 @@ extern crate futures; -extern crate futures_io; -extern crate futures_mio; +extern crate tokio_core; use std::net::TcpStream; use std::thread; @@ -8,7 +7,7 @@ use std::io::{Write, Read}; use futures::Future; use futures::stream::Stream; -use futures_io::read_to_end; +use tokio_core::io::read_to_end; macro_rules! t { ($e:expr) => (match $e { @@ -19,7 +18,7 @@ macro_rules! t { #[test] fn limit() { - let mut l = t!(futures_mio::Loop::new()); + let mut l = t!(tokio_core::Loop::new()); let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = t!(l.run(srv)); let addr = t!(srv.local_addr()); diff --git a/tests/stream-buffered.rs b/tests/stream-buffered.rs index a472a3cc44c..8474619e603 100644 --- a/tests/stream-buffered.rs +++ b/tests/stream-buffered.rs @@ -1,6 +1,5 @@ extern crate futures; -extern crate futures_io; -extern crate futures_mio; +extern crate tokio_core; extern crate env_logger; use std::io::{Read, Write}; @@ -9,7 +8,7 @@ use std::thread; use futures::Future; use futures::stream::Stream; -use futures_io::{copy, TaskIo}; +use tokio_core::io::{copy, TaskIo}; macro_rules! t { ($e:expr) => (match $e { @@ -22,7 +21,7 @@ macro_rules! t { fn echo_server() { drop(env_logger::init()); - let mut l = t!(futures_mio::Loop::new()); + let mut l = t!(tokio_core::Loop::new()); let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = t!(l.run(srv)); let addr = t!(srv.local_addr()); diff --git a/tests/tcp.rs b/tests/tcp.rs index 5fb38b1b029..5384d8b1daa 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -1,6 +1,6 @@ extern crate env_logger; extern crate futures; -extern crate futures_mio; +extern crate tokio_core; use std::net::{TcpListener, TcpStream}; use std::sync::mpsc::channel; @@ -19,7 +19,7 @@ macro_rules! t { #[test] fn connect() { drop(env_logger::init()); - let mut l = t!(futures_mio::Loop::new()); + let mut l = t!(tokio_core::Loop::new()); let srv = t!(TcpListener::bind("127.0.0.1:0")); let addr = t!(srv.local_addr()); let t = thread::spawn(move || { @@ -37,7 +37,7 @@ fn connect() { #[test] fn accept() { drop(env_logger::init()); - let mut l = t!(futures_mio::Loop::new()); + let mut l = t!(tokio_core::Loop::new()); let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = t!(l.run(srv)); let addr = t!(srv.local_addr()); @@ -63,7 +63,7 @@ fn accept() { #[test] fn accept2() { drop(env_logger::init()); - let mut l = t!(futures_mio::Loop::new()); + let mut l = t!(tokio_core::Loop::new()); let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = t!(l.run(srv)); let addr = t!(srv.local_addr()); diff --git a/tests/timeout.rs b/tests/timeout.rs index 7676a50c3b5..a00b64dc063 100644 --- a/tests/timeout.rs +++ b/tests/timeout.rs @@ -1,6 +1,6 @@ extern crate env_logger; extern crate futures; -extern crate futures_mio; +extern crate tokio_core; use std::time::{Instant, Duration}; @@ -16,7 +16,7 @@ macro_rules! t { #[test] fn smoke() { drop(env_logger::init()); - let mut l = t!(futures_mio::Loop::new()); + let mut l = t!(tokio_core::Loop::new()); let dur = Duration::from_millis(10); let timeout = l.handle().timeout(dur).and_then(|t| t); let start = Instant::now(); diff --git a/tests/udp.rs b/tests/udp.rs index 8f1ba5a9b7f..1bfe512aa9a 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -1,11 +1,11 @@ extern crate futures; -extern crate futures_mio; +extern crate tokio_core; use std::io; use std::net::SocketAddr; use futures::{Future, Poll}; -use futures_mio::UdpSocket; +use tokio_core::UdpSocket; macro_rules! t { ($e:expr) => (match $e { @@ -16,7 +16,7 @@ macro_rules! t { #[test] fn send_messages() { - let mut l = t!(futures_mio::Loop::new()); + let mut l = t!(tokio_core::Loop::new()); let a = l.handle().udp_bind(&"127.0.0.1:0".parse().unwrap()); let b = l.handle().udp_bind(&"127.0.0.1:0".parse().unwrap()); let (a, b) = t!(l.run(a.join(b)));