diff --git a/Cargo.toml b/Cargo.toml index 8f4df67..21d02ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,32 +1,32 @@ [package] -name = "nuclei" -version = "0.1.0" authors = ["Mahmut Bulut "] -edition = "2018" -description = "Proactive IO & runtime system" -keywords = ["io", "async", "runtime", "uring", "iouring", "proactor"] categories = ["concurrency", "asynchronous"] -homepage = "https://github.com/vertexclique/nuclei" -repository = "https://github.com/vertexclique/nuclei" +description = "Proactive IO & runtime system" documentation = "https://docs.rs/nuclei" -license = "Apache-2.0/MIT" -readme = "README.md" +edition = "2018" exclude = [ - "data/*", - ".github/*", - "examples/*", - "graphstore/*", - "tests/*", - "img/*", - "ci/*", - "benches/*", - "doc/*", - "*.png", - "*.dot", - "*.yml", - "*.toml", - "*.md" + "data/*", + ".github/*", + "examples/*", + "graphstore/*", + "tests/*", + "img/*", + "ci/*", + "benches/*", + "doc/*", + "*.png", + "*.dot", + "*.yml", + "*.toml", + "*.md", ] +homepage = "https://github.com/vertexclique/nuclei" +keywords = ["io", "async", "runtime", "uring", "iouring", "proactor"] +license = "Apache-2.0/MIT" +name = "nuclei" +readme = "README.md" +repository = "https://github.com/vertexclique/nuclei" +version = "0.1.0" [features] default = ["bastion", "iouring"] @@ -35,29 +35,30 @@ default = ["bastion", "iouring"] epoll = [] iouring = ["iou", "uring-sys"] -bastion = ["agnostik/runtime_bastion"] -tokio = ["agnostik/runtime_tokio"] asyncstd = ["agnostik/runtime_asyncstd"] +bastion = ["agnostik/runtime_bastion"] smol = ["agnostik/runtime_smol"] +tokio = ["agnostik/runtime_tokio"] [dependencies] lever = "0.1.1-alpha.8" futures = "0.3.5" futures-util = "0.3.5" -socket2 = { version = "0.3.12", features = ["pair", "unix"] } libc = "0.2" -pin-utils = "0.1.0" once_cell = "1.4.0" agnostik = "0.1.3" +pin-utils = "0.1.0" +socket2 = {version = "0.3.12", features = ["pair", "unix"]} +winapi = "0.3.9" # Other backends [target.'cfg(target_os = "linux")'.dependencies] -iou = { version = "0.0.0-ringbahn.1", optional = true } -uring-sys = { version = "0.6.1", optional = true } +iou = {version = "0.0.0-ringbahn.1", optional = true} +uring-sys = {version = "0.6.1", optional = true} [dev-dependencies] anyhow = "1.0.31" -async-h1 = "2.0.2" async-dup = "1.1.0" +async-h1 = "2.0.2" http-types = "2.2.1" -num_cpus = "1.13.0" \ No newline at end of file +num_cpus = "1.13.0" diff --git a/examples/fread-vect.rs b/examples/fread-vect.rs index b666bb9..73eb688 100644 --- a/examples/fread-vect.rs +++ b/examples/fread-vect.rs @@ -1,12 +1,12 @@ use nuclei::*; -use std::io; -use std::time::Duration; use std::fs::{File, OpenOptions}; +use std::io; use std::path::PathBuf; +use std::time::Duration; +use futures::io::IoSliceMut; use futures::AsyncRead; use futures_util::io::AsyncReadExt; -use futures::io::IoSliceMut; use std::ops::Deref; const IOVEC_WIDTH: usize = 1 << 10; @@ -30,18 +30,16 @@ fn main() -> io::Result<()> { let mut file = Handle::::new(fo).unwrap(); file.read_vectored(&mut bufs[..]).await.unwrap(); - vec![ - buf1, - buf2, - buf3, - ] + vec![buf1, buf2, buf3] }); - x.iter() - .enumerate() - .for_each(|(idx, e)| { - println!("::: iovec ::: {}, data ::: \n\n{}\n\n", idx, String::from_utf8_lossy(&e[..])); - }); + x.iter().enumerate().for_each(|(idx, e)| { + println!( + "::: iovec ::: {}, data ::: \n\n{}\n\n", + idx, + String::from_utf8_lossy(&e[..]) + ); + }); Ok(()) -} \ No newline at end of file +} diff --git a/examples/fread.rs b/examples/fread.rs index 32fbd5e..af5399a 100644 --- a/examples/fread.rs +++ b/examples/fread.rs @@ -1,13 +1,13 @@ use nuclei::*; -use std::io; -use std::time::Duration; use std::fs::{File, OpenOptions}; +use std::io; use std::path::PathBuf; +use std::time::Duration; +use std::time::Duration; use futures::AsyncRead; use futures_util::io::AsyncReadExt; - fn main() -> io::Result<()> { let x = drive(async { let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); @@ -25,4 +25,4 @@ fn main() -> io::Result<()> { println!("Length of file is {}", x.len()); Ok(()) -} \ No newline at end of file +} diff --git a/examples/fwrite-vect.rs b/examples/fwrite-vect.rs index e5c847a..0cbc37f 100644 --- a/examples/fwrite-vect.rs +++ b/examples/fwrite-vect.rs @@ -1,14 +1,14 @@ use nuclei::*; -use std::io; -use std::time::Duration; use std::fs::{File, OpenOptions}; +use std::io; use std::path::PathBuf; +use std::time::Duration; -use futures::{AsyncRead, AsyncWriteExt, AsyncSeek, AsyncSeekExt}; -use futures_util::io::AsyncReadExt; use futures::io::IoSliceMut; -use std::ops::Deref; +use futures::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWriteExt}; +use futures_util::io::AsyncReadExt; use std::io::{IoSlice, Read, SeekFrom}; +use std::ops::Deref; const IOVEC_WIDTH: usize = 1 << 10; @@ -27,7 +27,11 @@ fn main() -> io::Result<()> { IoSlice::new(&buf3), ]; - let fo = OpenOptions::new().read(true).write(true).open(&path).unwrap(); + let fo = OpenOptions::new() + .read(true) + .write(true) + .open(&path) + .unwrap(); let mut file = Handle::::new(fo).unwrap(); file.write_vectored(&bufs[..]).await.unwrap(); @@ -44,4 +48,4 @@ fn main() -> io::Result<()> { println!("SG write was: {}", x); Ok(()) -} \ No newline at end of file +} diff --git a/examples/fwrite.rs b/examples/fwrite.rs index d80d4be..518d6e1 100644 --- a/examples/fwrite.rs +++ b/examples/fwrite.rs @@ -1,11 +1,11 @@ use nuclei::*; -use std::io; -use std::time::Duration; use std::fs::{File, OpenOptions}; +use std::io; use std::path::PathBuf; +use std::time::Duration; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, AsyncSeek, AsyncSeekExt}; use futures::io::SeekFrom; +use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; const DARK_MATTER_TEXT: &'static str = "\ Dark matter is a form of matter thought to account for approximately \ @@ -31,7 +31,11 @@ fn main() -> io::Result<()> { path.push("data"); path.push("dark-matter"); - let fo = OpenOptions::new().read(true).write(true).open(&path).unwrap(); + let fo = OpenOptions::new() + .read(true) + .write(true) + .open(&path) + .unwrap(); let mut file = Handle::::new(fo).unwrap(); file.write_all(dark_matter.as_bytes()).await.unwrap(); @@ -45,4 +49,4 @@ fn main() -> io::Result<()> { println!("Length of file is {}", x.len()); Ok(()) -} \ No newline at end of file +} diff --git a/examples/h1-server.rs b/examples/h1-server.rs index 0d36dcd..a04152a 100644 --- a/examples/h1-server.rs +++ b/examples/h1-server.rs @@ -2,10 +2,10 @@ use nuclei::*; use std::net::TcpListener; use anyhow::Result; -use futures::prelude::*; -use http_types::{Request, Response, StatusCode}; use async_dup::Arc; use futures::pending; +use futures::prelude::*; +use http_types::{Request, Response, StatusCode}; /// Serves a request and returns a response. async fn serve(req: Request) -> http_types::Result { @@ -30,7 +30,7 @@ async fn listen(listener: Handle) -> Result<()> { // Spawn a background task serving this connection. let stream = Arc::new(stream); spawn(async move { - if let Err(err) = async_h1::accept( stream, serve).await { + if let Err(err) = async_h1::accept(stream, serve).await { println!("Connection error: {:#?}", err); } }); diff --git a/examples/tcp-server.rs b/examples/tcp-server.rs index ce954dd..7e884a5 100644 --- a/examples/tcp-server.rs +++ b/examples/tcp-server.rs @@ -1,6 +1,6 @@ +use futures::io; use nuclei::*; use std::net::{TcpListener, TcpStream}; -use futures::io; async fn echo(stream: Handle) -> io::Result<()> { io::copy(&stream, &mut &stream).await?; @@ -23,4 +23,4 @@ fn main() -> io::Result<()> { spawn_blocking(|| echo(stream)); } }) -} \ No newline at end of file +} diff --git a/src/async_io.rs b/src/async_io.rs index ce9f780..a85d586 100644 --- a/src/async_io.rs +++ b/src/async_io.rs @@ -1,19 +1,27 @@ -use std::{io, task}; -use std::{task::Poll, fs::File, pin::Pin, task::Context}; use super::handle::Handle; use futures::io::{AsyncRead, AsyncWrite, SeekFrom, ReadVectored, IoSliceMut, IoSlice}; use super::submission_handler::SubmissionHandler; +use futures::io::SeekFrom; +use futures::io::{AsyncRead, AsyncWrite}; use std::io::Read; +use std::{fs::File, pin::Pin, task::Context, task::Poll}; +use std::{io, task}; -use std::net::{TcpStream}; +use std::net::TcpStream; #[cfg(unix)] -use std::{mem::ManuallyDrop, os::unix::io::{AsRawFd, RawFd, FromRawFd}, os::unix::prelude::*}; +use std::{ + mem::ManuallyDrop, + os::unix::io::{AsRawFd, FromRawFd}, +}; #[cfg(unix)] -use std::os::unix::net::{UnixStream}; - -use std::future::Future; +use std::{ + mem::ManuallyDrop, + os::unix::io::{AsRawFd, FromRawFd, RawFd}, + os::unix::prelude::*, +}; +#[cfg(unix)] use crate::syscore::Processor; use crate::syscore::*; use std::sync::Arc; @@ -21,8 +29,10 @@ use futures::{AsyncBufRead, AsyncSeek, AsyncReadExt}; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; use futures_util::{pending_once}; use lever::prelude::TTas; -use crate::Proactor; +use std::future::Future; use std::path::Path; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; // // Proxy operations for Future registration via AsyncRead, AsyncWrite and others. @@ -39,7 +49,7 @@ macro_rules! impl_async_read { Pin::new(&mut &*Pin::get_mut(self)).poll_read(cx, buf) } } - } + }; } macro_rules! impl_async_write { @@ -61,12 +71,12 @@ macro_rules! impl_async_write { Pin::new(&mut &*Pin::get_mut(self)).poll_close(cx) } } - } + }; } -#[cfg(not(all(feature = "iouring", target_os = "linux")))] +#[cfg(all(not(feature = "iouring"), target_os = "linux"))] impl_async_read!(File); -#[cfg(not(all(feature = "iouring", target_os = "linux")))] +#[cfg(all(not(feature = "iouring"), target_os = "linux"))] impl_async_write!(File); impl_async_read!(TcpStream); @@ -77,14 +87,17 @@ impl_async_read!(UnixStream); #[cfg(unix)] impl_async_write!(UnixStream); - /////////////////////////////////// ///// Non proactive File /////////////////////////////////// -#[cfg(not(all(feature = "iouring", target_os = "linux")))] +#[cfg(all(not(feature = "iouring"), target_os = "linux"))] impl AsyncRead for &Handle { - fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut [u8], + ) -> Poll> { let raw_fd = self.as_raw_fd(); let buf_len = buf.len(); let buf = buf.as_mut_ptr(); @@ -103,9 +116,13 @@ impl AsyncRead for &Handle { } } -#[cfg(not(all(feature = "iouring", target_os = "linux")))] +#[cfg(all(not(feature = "iouring"), target_os = "linux"))] impl AsyncWrite for &Handle { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { let raw_fd = self.as_raw_fd(); let buf_len = buf.len(); let buf = buf.as_ptr(); @@ -132,7 +149,6 @@ impl AsyncWrite for &Handle { } } - /////////////////////////////////// ///// IO URING / Proactive / Linux /////////////////////////////////// @@ -153,10 +169,13 @@ impl Handle { } } - #[cfg(all(feature = "iouring", target_os = "linux"))] impl AsyncRead for Handle { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut [u8], + ) -> Poll> { let mut inner = futures::ready!(self.as_mut().poll_fill_buf(cx))?; let len = io::Read::read(&mut inner, buf)?; self.consume(len); @@ -210,7 +229,7 @@ impl AsyncBufRead for Handle { match fut.as_mut().poll(cx)? { Poll::Ready(n) => { *pos += n; - break Poll::Ready(Ok(n)) + break Poll::Ready(Ok(n)); } _ => {} } @@ -227,7 +246,6 @@ impl AsyncBufRead for Handle { } } - #[cfg(all(feature = "iouring", target_os = "linux"))] impl AsyncWrite for Handle { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, bufslice: &[u8]) -> Poll> { @@ -321,7 +339,11 @@ impl AsyncWrite for Handle { #[cfg(all(feature = "iouring", target_os = "linux"))] impl AsyncSeek for Handle { - fn poll_seek(self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { let mut store = &mut self.get_mut().store_file.as_mut().unwrap(); let (whence, offset) = match pos { @@ -330,7 +352,7 @@ impl AsyncSeek for Handle { return Poll::Ready(Ok(*store.pos() as u64)); } io::SeekFrom::Current(n) => (*store.pos(), n), - io::SeekFrom::End(n) => { + io::SeekFrom::End(n) => { let fut = store.poll_file_size(); futures::pin_mut!(fut); (futures::ready!(fut.as_mut().poll(cx))?, n) @@ -364,7 +386,11 @@ impl AsyncSeek for Handle { #[cfg(unix)] impl AsyncRead for &Handle { - fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut [u8], + ) -> Poll> { let raw_fd = self.as_raw_fd(); let buf_len = buf.len(); let buf = buf.as_mut_ptr(); @@ -443,11 +469,7 @@ impl AsyncRead for &Handle { #[cfg(unix)] impl AsyncWrite for &Handle { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8], - ) -> Poll> { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { let raw_fd = self.as_raw_fd(); let buf_len = buf.len(); let buf = buf.as_ptr(); diff --git a/src/handle.rs b/src/handle.rs index 59f9d2c..00aec5f 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -1,9 +1,16 @@ -use std::fmt; -use std::{pin::Pin, future::Future, io, ops::{DerefMut, Deref}, sync::Arc}; use lever::prelude::*; -use pin_utils::unsafe_unpinned; +#[cfg(unix)] use crate::syscore::{CompletionChan, StoreFile}; +use pin_utils::unsafe_unpinned; +use std::fmt; +use std::{ + future::Future, + io, + ops::{Deref, DerefMut}, + pin::Pin, + sync::Arc, +}; pub type AsyncOp = Pin>>>; @@ -16,8 +23,10 @@ pub struct Handle { /// IO task element pub(crate) io_task: Option, /// Notification channel + #[cfg(unix)] pub(crate) chan: Option, /// File operation storage + #[cfg(unix)] pub(crate) store_file: Option, /// Completion callback for read pub(crate) read: Arc>>>, @@ -69,7 +78,6 @@ impl HandleOpRegisterer for &Handle { } } - impl Deref for Handle { type Target = T; diff --git a/src/lib.rs b/src/lib.rs index 5352b89..2dfc118 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,9 @@ mod handle; +mod proactor; mod submission_handler; -mod async_io; mod sys; -mod waker; -mod proactor; mod utils; +mod waker; #[cfg(not(any( target_os = "linux", // epoll, iouring @@ -20,7 +19,6 @@ mod utils; )))] compile_error!("Target OS is not supported"); - #[cfg(any( target_os = "macos", target_os = "ios", @@ -46,5 +44,5 @@ mod syscore { pub(crate) use windows::*; } +pub use agnostik::*; pub use proactor::*; -pub use agnostik::*; \ No newline at end of file diff --git a/src/proactor.rs b/src/proactor.rs index 104caca..b73e2ef 100644 --- a/src/proactor.rs +++ b/src/proactor.rs @@ -1,6 +1,6 @@ +use std::task::{Context, Poll}; use std::time::Duration; -use std::task::{Poll, Context}; -use std::{pin::Pin, future::Future, io}; +use std::{future::Future, io, pin::Pin}; use lever::prelude::*; use once_cell::sync::Lazy; @@ -9,6 +9,9 @@ use super::syscore::*; use super::waker::*; use crate::spawn_blocking; +#[cfg(target_os = "windows")] +use crate::syscore::iocp::SysProactor; + pub use super::handle::*; /// @@ -18,9 +21,8 @@ pub struct Proactor(SysProactor); impl Proactor { /// Returns a reference to the proactor. pub fn get() -> &'static Proactor { - static PROACTOR: Lazy = Lazy::new(|| Proactor( - SysProactor::new().expect("cannot initialize poll backend") - )); + static PROACTOR: Lazy = + Lazy::new(|| Proactor(SysProactor::new().expect("cannot initialize poll backend"))); &PROACTOR } @@ -52,10 +54,8 @@ pub fn drive(future: impl Future) -> T { let cx = &mut Context::from_waker(&waker); futures_util::pin_mut!(future); - let driver = spawn_blocking(move || { - loop { - let _ = p.wait(1, None); - } + let driver = spawn_blocking(move || loop { + let _ = p.wait(1, None); }); futures_util::pin_mut!(driver); @@ -70,4 +70,4 @@ pub fn drive(future: impl Future) -> T { let duration = Duration::from_millis(1); driver.as_mut().poll(cx); } -} \ No newline at end of file +} diff --git a/src/submission_handler.rs b/src/submission_handler.rs index 4153a3a..27f29d1 100644 --- a/src/submission_handler.rs +++ b/src/submission_handler.rs @@ -1,23 +1,26 @@ -use futures::io::{ - AsyncRead, AsyncWrite, AsyncBufRead, AsyncSeek -}; -use std::marker::PhantomData as marker; -use std::{task::{Context, Poll}, io, pin::Pin, future::Future, ops::{DerefMut, Deref}}; use super::handle::{Handle, HandleOpRegisterer}; - +use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; +use std::marker::PhantomData as marker; +use std::{ + future::Future, + io, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, +}; pub struct SubmissionHandler(marker) where - T: Unpin; + T: Unpin; impl SubmissionHandler where - T: Unpin + HandleOpRegisterer + T: Unpin + HandleOpRegisterer, { pub fn handle_read( handle: Pin<&mut T>, cx: &mut Context, - completion_dispatcher: impl Future> + 'static + completion_dispatcher: impl Future> + 'static, ) -> Poll> { let handle = handle.get_mut(); let read_result = handle.read_registerer(); @@ -46,7 +49,7 @@ where pub fn handle_write( handle: Pin<&mut T>, cx: &mut Context, - completion_dispatcher: impl Future> + 'static + completion_dispatcher: impl Future> + 'static, ) -> Poll> { let handle = handle.get_mut(); let write_result = handle.write_registerer(); @@ -71,4 +74,4 @@ where poll } -} \ No newline at end of file +} diff --git a/src/syscore/bsd/fs.rs b/src/syscore/bsd/fs.rs index 0fd5abc..0c27ec1 100644 --- a/src/syscore/bsd/fs.rs +++ b/src/syscore/bsd/fs.rs @@ -1 +1 @@ -pub struct StoreFile; \ No newline at end of file +pub struct StoreFile; diff --git a/src/syscore/bsd/kqueue.rs b/src/syscore/bsd/kqueue.rs index b365f21..d65f615 100644 --- a/src/syscore/bsd/kqueue.rs +++ b/src/syscore/bsd/kqueue.rs @@ -1,14 +1,14 @@ -use std::mem::MaybeUninit; -use std::io::{self, Read, Write}; -use std::os::unix::io::{AsRawFd, RawFd}; -use std::{fs::File, os::unix::net::UnixStream, collections::HashMap, time::Duration}; use crate::sys::event::{kevent_ts, kqueue, KEvent}; use futures::channel::oneshot; +use lever::prelude::*; use pin_utils::unsafe_pinned; use std::future::Future; +use std::io::{self, Read, Write}; +use std::mem::MaybeUninit; +use std::os::unix::io::{AsRawFd, RawFd}; use std::pin::Pin; use std::task::{Context, Poll}; -use lever::prelude::*; +use std::{collections::HashMap, fs::File, os::unix::net::UnixStream, time::Duration}; macro_rules! syscall { ($fn:ident $args:tt) => {{ @@ -25,8 +25,8 @@ macro_rules! syscall { /////////////////// use socket2::SockAddr; -use std::os::unix::net::{SocketAddr as UnixSocketAddr}; use std::mem; +use std::os::unix::net::SocketAddr as UnixSocketAddr; fn max_len() -> usize { // The maximum read limit on most posix-like systems is `SSIZE_MAX`, @@ -44,18 +44,22 @@ fn max_len() -> usize { } } -pub(crate) fn shim_recv_from(fd: A, buf: &mut [u8], flags: libc::c_int) -> io::Result<(usize, SockAddr)> { +pub(crate) fn shim_recv_from( + fd: A, + buf: &mut [u8], + flags: libc::c_int, +) -> io::Result<(usize, SockAddr)> { let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; let mut addrlen = mem::size_of_val(&storage) as libc::socklen_t; let n = syscall!(recvfrom( - fd.as_raw_fd() as _, - buf.as_mut_ptr() as *mut libc::c_void, - std::cmp::min(buf.len(), max_len()), - flags, - &mut storage as *mut _ as *mut _, - &mut addrlen, - ))?; + fd.as_raw_fd() as _, + buf.as_mut_ptr() as *mut libc::c_void, + std::cmp::min(buf.len(), max_len()), + flags, + &mut storage as *mut _ as *mut _, + &mut addrlen, + ))?; let addr = unsafe { SockAddr::from_raw_parts(&storage as *const _ as *const _, addrlen) }; Ok((n as usize, addr)) } @@ -78,7 +82,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result let abst_sock_ident: libc::c_char = unsafe { std::slice::from_raw_parts( &addr.sun_path as *const _ as *const u8, - mem::size_of::() + mem::size_of::(), ) }[1] as libc::c_char; @@ -88,7 +92,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result // https://man7.org/linux/man-pages/man7/unix.7.html (sa, 0) if sa != 0 && sa > mem::size_of::() as libc::socklen_t => { len = mem::size_of::() as libc::socklen_t; - }, + } // If unnamed socket, then addr is always zero, // assign the offset reserved difference as length. (0, _) => { @@ -96,7 +100,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result let path = &addr.sun_path as *const _ as usize; let sun_path_offset = path - base; len = sun_path_offset as libc::socklen_t; - }, + } // Discard rest, they are not special. (_, _) => {} @@ -109,7 +113,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result std::ptr::copy_nonoverlapping( sockaddr.as_ptr(), &mut init as *mut _ as *mut _, - len as usize + len as usize, ); // Safety: We've written the init addr above. @@ -142,7 +146,7 @@ pub struct SysProactor { registered: TTas>, /// Hashmap for holding interested concrete completion callbacks - completions: TTas> + completions: TTas>, } impl SysProactor { @@ -157,7 +161,7 @@ impl SysProactor { read_stream: TTas::new(read_stream), write_stream, registered: TTas::new(HashMap::new()), - completions: TTas::new(HashMap::new()) + completions: TTas::new(HashMap::new()), }; let mut rs = proactor.read_stream.lock(); @@ -232,12 +236,13 @@ impl SysProactor { }); let mut events: Vec = Vec::with_capacity(max_event_size); - events.resize(max_event_size, unsafe { MaybeUninit::zeroed().assume_init() }); + events.resize(max_event_size, unsafe { + MaybeUninit::zeroed().assume_init() + }); let mut events: Box<[KEvent]> = events.into_boxed_slice(); // dbg!("SENDING EVENT"); - let res = - kevent_ts(self.kqueue_fd, &[], &mut events, timeout)? as isize; + let res = kevent_ts(self.kqueue_fd, &[], &mut events, timeout)? as isize; // dbg!(res); // dbg!("EVENT FINISH"); @@ -304,9 +309,7 @@ impl SysProactor { } let (tx, rx) = oneshot::channel(); - let comp = completions - .entry(fd) - .or_insert(Vec::new()); + let comp = completions.entry(fd).or_insert(Vec::new()); comp.push((evts, tx)); @@ -356,11 +359,9 @@ impl SysProactor { if ack_removal { completions.remove(&fd); } - } } - ////////////////////////////// ////////////////////////////// diff --git a/src/syscore/bsd/mod.rs b/src/syscore/bsd/mod.rs index e344747..74c99ac 100644 --- a/src/syscore/bsd/mod.rs +++ b/src/syscore/bsd/mod.rs @@ -1,7 +1,7 @@ mod fs; mod kqueue; -mod processor; mod nethandle; +mod processor; pub(crate) use fs::*; pub(crate) use kqueue::*; diff --git a/src/syscore/bsd/nethandle.rs b/src/syscore/bsd/nethandle.rs index 3b259a8..369108c 100644 --- a/src/syscore/bsd/nethandle.rs +++ b/src/syscore/bsd/nethandle.rs @@ -1,22 +1,21 @@ use std::future::Future; use std::io; use std::marker::Unpin; -use std::sync::Arc; -use std::path::Path; use std::net::{SocketAddr, ToSocketAddrs}; use std::os::unix::io::AsRawFd; +use std::path::Path; +use std::sync::Arc; use std::net::{TcpListener, TcpStream, UdpSocket}; // Unix specifics -use std::os::unix::net::{UnixListener, UnixStream, UnixDatagram, SocketAddr as UnixSocketAddr}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; -use lever::sync::prelude::*; use futures::Stream; +use lever::sync::prelude::*; -use crate::{Handle, Proactor}; use super::Processor; use crate::syscore::CompletionChan; - +use crate::{Handle, Proactor}; impl Handle { pub fn new(io: T) -> io::Result> { @@ -32,9 +31,7 @@ impl Handle { pub(crate) fn new_with_callback(io: T, evflags: usize) -> io::Result> { let fd = io.as_raw_fd(); let mut handle = Handle::new(io)?; - let register = Proactor::get() - .inner() - .register_io(fd, evflags)?; + let register = Proactor::get().inner().register_io(fd, evflags)?; handle.chan = Some(register); Ok(handle) } @@ -54,10 +51,13 @@ impl Handle { pub fn incoming( &self, ) -> impl Stream>> + Send + Unpin + '_ { - Box::pin(futures::stream::unfold(self, |listener: &Handle| async move { - let res = listener.accept().await.map(|(stream, _)| stream); - Some((res, listener)) - })) + Box::pin(futures::stream::unfold( + self, + |listener: &Handle| async move { + let res = listener.accept().await.map(|(stream, _)| stream); + Some((res, listener)) + }, + )) } } @@ -102,8 +102,7 @@ impl Handle { pub async fn send_to(&self, buf: &[u8], addr: A) -> io::Result { match addr.to_socket_addrs()?.next() { - Some(addr) => - Processor::processor_send_to(self.get_ref(), buf, addr).await, + Some(addr) => Processor::processor_send_to(self.get_ref(), buf, addr).await, None => Err(io::Error::new( io::ErrorKind::InvalidData, "given addresses can't be parsed", @@ -132,10 +131,13 @@ impl Handle { pub fn incoming( &self, ) -> impl Stream>> + Send + Unpin + '_ { - Box::pin(futures::stream::unfold(self, |listener: &Handle| async move { - let res = listener.accept().await.map(|(stream, _)| stream); - Some((res, listener)) - })) + Box::pin(futures::stream::unfold( + self, + |listener: &Handle| async move { + let res = listener.accept().await.map(|(stream, _)| stream); + Some((res, listener)) + }, + )) } } @@ -205,4 +207,4 @@ impl Handle { pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { Processor::processor_peek_from_unix(self.get_ref(), buf).await } -} \ No newline at end of file +} diff --git a/src/syscore/bsd/processor.rs b/src/syscore/bsd/processor.rs index d73378a..a244f16 100644 --- a/src/syscore/bsd/processor.rs +++ b/src/syscore/bsd/processor.rs @@ -1,18 +1,20 @@ +use std::future::Future; use std::io; use std::io::{Read, Write}; -use std::{fs::File, os::unix::io::{AsRawFd, FromRawFd}, mem::ManuallyDrop}; -use std::net::{SocketAddr, ToSocketAddrs, TcpListener}; -use std::os::unix::net::{ - SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream, -}; -use std::net::{SocketAddrV6, SocketAddrV4, Ipv4Addr, Ipv6Addr, UdpSocket}; -use std::future::Future; -use std::path::Path; use std::net::TcpStream; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, UdpSocket}; +use std::net::{SocketAddr, TcpListener, ToSocketAddrs}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; +use std::path::Path; +use std::{ + fs::File, + mem::ManuallyDrop, + os::unix::io::{AsRawFd, FromRawFd}, +}; use crate::proactor::Proactor; -use crate::Handle; use crate::syscore::shim_to_af_unix; +use crate::Handle; pub struct Processor; @@ -22,7 +24,10 @@ impl Processor { ///// Synchronous File /////////////////////////////////// - pub(crate) async fn processor_read_file(io: &R, buf: &mut [u8]) -> io::Result { + pub(crate) async fn processor_read_file( + io: &R, + buf: &mut [u8], + ) -> io::Result { // TODO: (vertexclique): Use blocking here. let mut file = unsafe { File::from_raw_fd(io.as_raw_fd()) }; let res = file.read(buf); @@ -107,10 +112,13 @@ impl Processor { ///// Commonality of TcpStream, UdpSocket /////////////////////////////////// - pub(crate) async fn processor_connect(addrs: A, mut f: F) -> io::Result - where - F: FnMut(SocketAddr) -> Fut, - Fut: Future>, + pub(crate) async fn processor_connect( + addrs: A, + mut f: F, + ) -> io::Result + where + F: FnMut(SocketAddr) -> Fut, + Fut: Future>, { // TODO connect_tcp, connect_udp let addrs = match addrs.to_socket_addrs() { @@ -127,10 +135,7 @@ impl Processor { } Err(tail_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "couldn't resolve addresses", - ) + io::Error::new(io::ErrorKind::InvalidInput, "couldn't resolve addresses") })) } @@ -147,7 +152,11 @@ impl Processor { } else { socket2::Domain::ipv4() }; - let sock = socket2::Socket::new(domain, socket2::Type::stream(), Some(socket2::Protocol::tcp()))?; + let sock = socket2::Socket::new( + domain, + socket2::Type::stream(), + Some(socket2::Protocol::tcp()), + )?; // Begin async connect and ignore the inevitable "in progress" error. sock.set_nonblocking(true)?; @@ -179,7 +188,11 @@ impl Processor { SocketAddr::V4(_) => socket2::Domain::ipv4(), SocketAddr::V6(_) => socket2::Domain::ipv6(), }; - let sock = socket2::Socket::new(domain, socket2::Type::dgram(), Some(socket2::Protocol::udp()))?; + let sock = socket2::Socket::new( + domain, + socket2::Type::dgram(), + Some(socket2::Protocol::udp()), + )?; let sockaddr = socket2::SockAddr::from(addr); let unspec = match addr { @@ -208,15 +221,18 @@ impl Processor { ///// TcpListener /////////////////////////////////// - pub(crate) async fn processor_accept_tcp_listener(listener: &R) -> io::Result<(Handle, SocketAddr)> { + pub(crate) async fn processor_accept_tcp_listener( + listener: &R, + ) -> io::Result<(Handle, SocketAddr)> { let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; let socket = socket.into_tcp_listener(); let socket = ManuallyDrop::new(socket); // Reregister on block match socket - .accept() - .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) { + .accept() + .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) + { Ok(res) => Ok(res), Err(err) if err.kind() == io::ErrorKind::WouldBlock => { let cc = Proactor::get() @@ -248,7 +264,11 @@ impl Processor { Self::send_to_dest(socket, buf, &socket2::SockAddr::from(addr)).await } - async fn send_to_dest(socket: &A, buf: &[u8], addr: &socket2::SockAddr) -> io::Result { + async fn send_to_dest( + socket: &A, + buf: &[u8], + addr: &socket2::SockAddr, + ) -> io::Result { let sock = unsafe { socket2::Socket::from_raw_fd(socket.as_raw_fd()) }; let sock = ManuallyDrop::new(sock); @@ -271,13 +291,19 @@ impl Processor { } } - pub(crate) async fn processor_recv_from(sock: &R, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub(crate) async fn processor_recv_from( + sock: &R, + buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { Self::recv_from_with_flags(sock, buf, 0) .await .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) } - pub(crate) async fn processor_peek_from(sock: &R, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub(crate) async fn processor_peek_from( + sock: &R, + buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { Self::recv_from_with_flags(sock, buf, libc::MSG_PEEK as _) .await .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) @@ -293,8 +319,8 @@ impl Processor { // let sock = ManuallyDrop::new(sock); // Reregister on block - match super::shim_recv_from(sock, buf, flags as _) - .map(|(size, sockaddr)| (size, sockaddr)) { + match super::shim_recv_from(sock, buf, flags as _).map(|(size, sockaddr)| (size, sockaddr)) + { Ok(res) => Ok(res), Err(err) if err.kind() == io::ErrorKind::WouldBlock => { let cc = Proactor::get() @@ -320,7 +346,9 @@ impl Processor { ///// UnixListener /////////////////////////////////// - pub(crate) async fn processor_accept_unix_listener(listener: &R) -> io::Result<(Handle, UnixSocketAddr)> { + pub(crate) async fn processor_accept_unix_listener( + listener: &R, + ) -> io::Result<(Handle, UnixSocketAddr)> { let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; let socket = socket.into_unix_listener(); let socket = ManuallyDrop::new(socket); @@ -328,7 +356,8 @@ impl Processor { // Reregister on block match socket .accept() - .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) { + .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) + { Ok(res) => Ok(res), Err(err) if err.kind() == io::ErrorKind::WouldBlock => { let cc = Proactor::get() @@ -352,7 +381,9 @@ impl Processor { ///// UnixStream /////////////////////////////////// - pub(crate) async fn processor_connect_unix>(path: P) -> io::Result> { + pub(crate) async fn processor_connect_unix>( + path: P, + ) -> io::Result> { let sock = socket2::Socket::new(socket2::Domain::unix(), socket2::Type::stream(), None)?; let sockaddr = socket2::SockAddr::unix(path)?; @@ -383,19 +414,29 @@ impl Processor { res.map(|_| stream) } - pub(crate) async fn processor_send_to_unix>(socket: &R, buf: &[u8], path: P) -> io::Result { + pub(crate) async fn processor_send_to_unix>( + socket: &R, + buf: &[u8], + path: P, + ) -> io::Result { Self::send_to_dest(socket, buf, &socket2::SockAddr::unix(path)?).await } - pub(crate) async fn processor_recv_from_unix(socket: &R, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { + pub(crate) async fn processor_recv_from_unix( + socket: &R, + buf: &mut [u8], + ) -> io::Result<(usize, UnixSocketAddr)> { Self::recv_from_with_flags(socket, buf, 0) .await .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) } - pub(crate) async fn processor_peek_from_unix(socket: &R, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { + pub(crate) async fn processor_peek_from_unix( + socket: &R, + buf: &mut [u8], + ) -> io::Result<(usize, UnixSocketAddr)> { Self::recv_from_with_flags(socket, buf, libc::MSG_PEEK as _) .await .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) } -} \ No newline at end of file +} diff --git a/src/syscore/linux/epoll/epoll.rs b/src/syscore/linux/epoll/epoll.rs index f41afde..4943f03 100644 --- a/src/syscore/linux/epoll/epoll.rs +++ b/src/syscore/linux/epoll/epoll.rs @@ -1,14 +1,14 @@ -use std::mem::MaybeUninit; -use std::io::{self, Read, Write}; -use std::os::unix::io::{AsRawFd, RawFd, FromRawFd}; -use std::{fs::File, os::unix::net::UnixStream, collections::HashMap, time::Duration}; +use crate::sys::epoll::*; use futures::channel::oneshot; +use lever::prelude::*; use pin_utils::unsafe_pinned; use std::future::Future; +use std::io::{self, Read, Write}; +use std::mem::MaybeUninit; +use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use std::pin::Pin; use std::task::{Context, Poll}; -use crate::sys::epoll::*; -use lever::prelude::*; +use std::{collections::HashMap, fs::File, os::unix::net::UnixStream, time::Duration}; macro_rules! syscall { ($fn:ident $args:tt) => {{ @@ -25,8 +25,8 @@ macro_rules! syscall { /////////////////// use socket2::SockAddr; -use std::os::unix::net::{SocketAddr as UnixSocketAddr}; use std::mem; +use std::os::unix::net::SocketAddr as UnixSocketAddr; fn max_len() -> usize { // The maximum read limit on most posix-like systems is `SSIZE_MAX`, @@ -44,18 +44,22 @@ fn max_len() -> usize { } } -pub(crate) fn shim_recv_from(fd: A, buf: &mut [u8], flags: libc::c_int) -> io::Result<(usize, SockAddr)> { +pub(crate) fn shim_recv_from( + fd: A, + buf: &mut [u8], + flags: libc::c_int, +) -> io::Result<(usize, SockAddr)> { let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; let mut addrlen = mem::size_of_val(&storage) as libc::socklen_t; let n = syscall!(recvfrom( - fd.as_raw_fd() as _, - buf.as_mut_ptr() as *mut libc::c_void, - std::cmp::min(buf.len(), max_len()), - flags, - &mut storage as *mut _ as *mut _, - &mut addrlen, - ))?; + fd.as_raw_fd() as _, + buf.as_mut_ptr() as *mut libc::c_void, + std::cmp::min(buf.len(), max_len()), + flags, + &mut storage as *mut _ as *mut _, + &mut addrlen, + ))?; let addr = unsafe { SockAddr::from_raw_parts(&storage as *const _ as *const _, addrlen) }; Ok((n as usize, addr)) } @@ -78,7 +82,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result let abst_sock_ident: libc::c_char = unsafe { std::slice::from_raw_parts( &addr.sun_path as *const _ as *const u8, - mem::size_of::() + mem::size_of::(), ) }[1] as libc::c_char; @@ -88,7 +92,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result // https://man7.org/linux/man-pages/man7/unix.7.html (sa, 0) if sa != 0 && sa > mem::size_of::() as libc::socklen_t => { len = mem::size_of::() as libc::socklen_t; - }, + } // If unnamed socket, then addr is always zero, // assign the offset reserved difference as length. (0, _) => { @@ -96,7 +100,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result let path = &addr.sun_path as *const _ as usize; let sun_path_offset = path - base; len = sun_path_offset as libc::socklen_t; - }, + } // Discard rest, they are not special. (_, _) => {} @@ -109,7 +113,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result std::ptr::copy_nonoverlapping( sockaddr.as_ptr(), &mut init as *mut _ as *mut _, - len as usize + len as usize, ); // Safety: We've written the init addr above. @@ -139,7 +143,7 @@ pub struct SysProactor { registered: TTas>, /// Hashmap for holding interested concrete completion callbacks - completions: TTas> + completions: TTas>, } impl SysProactor { @@ -152,11 +156,16 @@ impl SysProactor { epoll_fd, event_fd: TTas::new(event_fd), registered: TTas::new(HashMap::new()), - completions: TTas::new(HashMap::new()) + completions: TTas::new(HashMap::new()), }; let ev = &mut EpollEvent::new(libc::EPOLLIN as _, 0 as u64); - epoll_ctl(proactor.epoll_fd, EpollOp::EpollCtlAdd, event_fd_raw, Some(ev))?; + epoll_ctl( + proactor.epoll_fd, + EpollOp::EpollCtlAdd, + event_fd_raw, + Some(ev), + )?; Ok(proactor) } @@ -186,7 +195,9 @@ impl SysProactor { pub fn wait(&self, max_event_size: usize, timeout: Option) -> io::Result { // dbg!("WAIT"); let mut events: Vec = Vec::with_capacity(max_event_size); - events.resize(max_event_size, unsafe { MaybeUninit::zeroed().assume_init() }); + events.resize(max_event_size, unsafe { + MaybeUninit::zeroed().assume_init() + }); let timeout: isize = timeout.map_or(!0, |d| d.as_millis() as isize); let mut res = epoll_wait(self.epoll_fd, &mut events, timeout)? as usize; @@ -230,9 +241,7 @@ impl SysProactor { } let (tx, rx) = oneshot::channel(); - let comp = completions - .entry(fd) - .or_insert(Vec::new()); + let comp = completions.entry(fd).or_insert(Vec::new()); comp.push((events, tx)); diff --git a/src/syscore/linux/epoll/fs.rs b/src/syscore/linux/epoll/fs.rs index 0fd5abc..0c27ec1 100644 --- a/src/syscore/linux/epoll/fs.rs +++ b/src/syscore/linux/epoll/fs.rs @@ -1 +1 @@ -pub struct StoreFile; \ No newline at end of file +pub struct StoreFile; diff --git a/src/syscore/linux/epoll/mod.rs b/src/syscore/linux/epoll/mod.rs index 9f44a62..ffee752 100644 --- a/src/syscore/linux/epoll/mod.rs +++ b/src/syscore/linux/epoll/mod.rs @@ -1,9 +1,9 @@ -mod fs; mod epoll; -mod processor; +mod fs; mod nethandle; +mod processor; -pub(crate) use fs::*; pub(crate) use epoll::*; +pub(crate) use fs::*; pub(crate) use nethandle::*; pub(crate) use processor::*; diff --git a/src/syscore/linux/epoll/nethandle.rs b/src/syscore/linux/epoll/nethandle.rs index 6b09d64..f147785 100644 --- a/src/syscore/linux/epoll/nethandle.rs +++ b/src/syscore/linux/epoll/nethandle.rs @@ -1,22 +1,21 @@ use std::future::Future; use std::io; use std::marker::Unpin; -use std::sync::Arc; -use std::path::Path; use std::net::{SocketAddr, ToSocketAddrs}; use std::os::unix::io::AsRawFd; +use std::path::Path; +use std::sync::Arc; use std::net::{TcpListener, TcpStream, UdpSocket}; // Unix specifics -use std::os::unix::net::{UnixListener, UnixStream, UnixDatagram, SocketAddr as UnixSocketAddr}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; -use lever::sync::prelude::*; use futures::Stream; +use lever::sync::prelude::*; -use crate::{Handle, Proactor}; use super::Processor; use crate::syscore::CompletionChan; - +use crate::{Handle, Proactor}; impl Handle { pub fn new(io: T) -> io::Result> { @@ -32,9 +31,7 @@ impl Handle { pub(crate) fn new_with_callback(io: T, evflags: i32) -> io::Result> { let fd = io.as_raw_fd(); let mut handle = Handle::new(io)?; - let register = Proactor::get() - .inner() - .register_io(fd, evflags)?; + let register = Proactor::get().inner().register_io(fd, evflags)?; handle.chan = Some(register); Ok(handle) } @@ -54,10 +51,13 @@ impl Handle { pub fn incoming( &self, ) -> impl Stream>> + Send + Unpin + '_ { - Box::pin(futures::stream::unfold(self, |listener: &Handle| async move { - let res = listener.accept().await.map(|(stream, _)| stream); - Some((res, listener)) - })) + Box::pin(futures::stream::unfold( + self, + |listener: &Handle| async move { + let res = listener.accept().await.map(|(stream, _)| stream); + Some((res, listener)) + }, + )) } } @@ -102,8 +102,7 @@ impl Handle { pub async fn send_to(&self, buf: &[u8], addr: A) -> io::Result { match addr.to_socket_addrs()?.next() { - Some(addr) => - Processor::processor_send_to(self.get_ref(), buf, addr).await, + Some(addr) => Processor::processor_send_to(self.get_ref(), buf, addr).await, None => Err(io::Error::new( io::ErrorKind::InvalidData, "given addresses can't be parsed", @@ -132,10 +131,13 @@ impl Handle { pub fn incoming( &self, ) -> impl Stream>> + Send + Unpin + '_ { - Box::pin(futures::stream::unfold(self, |listener: &Handle| async move { - let res = listener.accept().await.map(|(stream, _)| stream); - Some((res, listener)) - })) + Box::pin(futures::stream::unfold( + self, + |listener: &Handle| async move { + let res = listener.accept().await.map(|(stream, _)| stream); + Some((res, listener)) + }, + )) } } @@ -205,4 +207,4 @@ impl Handle { pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { Processor::processor_peek_from_unix(self.get_ref(), buf).await } -} \ No newline at end of file +} diff --git a/src/syscore/linux/epoll/processor.rs b/src/syscore/linux/epoll/processor.rs index 0d98144..886def3 100644 --- a/src/syscore/linux/epoll/processor.rs +++ b/src/syscore/linux/epoll/processor.rs @@ -1,18 +1,20 @@ +use std::future::Future; use std::io; use std::io::{Read, Write}; -use std::{fs::File, os::unix::io::{AsRawFd, FromRawFd}, mem::ManuallyDrop}; -use std::net::{SocketAddr, ToSocketAddrs, TcpListener}; -use std::os::unix::net::{ - SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream, -}; -use std::net::{SocketAddrV6, SocketAddrV4, Ipv4Addr, Ipv6Addr, UdpSocket}; -use std::future::Future; -use std::path::Path; use std::net::TcpStream; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, UdpSocket}; +use std::net::{SocketAddr, TcpListener, ToSocketAddrs}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; +use std::path::Path; +use std::{ + fs::File, + mem::ManuallyDrop, + os::unix::io::{AsRawFd, FromRawFd}, +}; use crate::proactor::Proactor; -use crate::Handle; use crate::syscore::shim_to_af_unix; +use crate::Handle; pub struct Processor; @@ -22,7 +24,10 @@ impl Processor { ///// Synchronous File /////////////////////////////////// - pub(crate) async fn processor_read_file(io: &R, buf: &mut [u8]) -> io::Result { + pub(crate) async fn processor_read_file( + io: &R, + buf: &mut [u8], + ) -> io::Result { // TODO: (vertexclique): Use blocking here. let mut file = unsafe { File::from_raw_fd(io.as_raw_fd()) }; let res = file.read(buf); @@ -105,10 +110,13 @@ impl Processor { ///// Commonality of TcpStream, UdpSocket /////////////////////////////////// - pub(crate) async fn processor_connect(addrs: A, mut f: F) -> io::Result - where - F: FnMut(SocketAddr) -> Fut, - Fut: Future>, + pub(crate) async fn processor_connect( + addrs: A, + mut f: F, + ) -> io::Result + where + F: FnMut(SocketAddr) -> Fut, + Fut: Future>, { // TODO connect_tcp, connect_udp let addrs = match addrs.to_socket_addrs() { @@ -125,10 +133,7 @@ impl Processor { } Err(tail_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "couldn't resolve addresses", - ) + io::Error::new(io::ErrorKind::InvalidInput, "couldn't resolve addresses") })) } @@ -145,7 +150,11 @@ impl Processor { } else { socket2::Domain::ipv4() }; - let sock = socket2::Socket::new(domain, socket2::Type::stream(), Some(socket2::Protocol::tcp()))?; + let sock = socket2::Socket::new( + domain, + socket2::Type::stream(), + Some(socket2::Protocol::tcp()), + )?; // Begin async connect and ignore the inevitable "in progress" error. sock.set_nonblocking(true)?; @@ -178,7 +187,11 @@ impl Processor { SocketAddr::V4(_) => socket2::Domain::ipv4(), SocketAddr::V6(_) => socket2::Domain::ipv6(), }; - let sock = socket2::Socket::new(domain, socket2::Type::dgram(), Some(socket2::Protocol::udp()))?; + let sock = socket2::Socket::new( + domain, + socket2::Type::dgram(), + Some(socket2::Protocol::udp()), + )?; let sockaddr = socket2::SockAddr::from(addr); let unspec = match addr { @@ -207,7 +220,9 @@ impl Processor { ///// TcpListener /////////////////////////////////// - pub(crate) async fn processor_accept_tcp_listener(listener: &R) -> io::Result<(Handle, SocketAddr)> { + pub(crate) async fn processor_accept_tcp_listener( + listener: &R, + ) -> io::Result<(Handle, SocketAddr)> { let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; let socket = socket.into_tcp_listener(); let socket = ManuallyDrop::new(socket); @@ -215,7 +230,8 @@ impl Processor { // Reregister on block match socket .accept() - .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) { + .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) + { Ok(res) => Ok(res), Err(err) if (err.raw_os_error().unwrap() & (libc::EAGAIN | libc::EWOULDBLOCK)) != 0 => { let cc = Proactor::get() @@ -246,7 +262,11 @@ impl Processor { Self::send_to_dest(socket, buf, &socket2::SockAddr::from(addr)).await } - async fn send_to_dest(socket: &A, buf: &[u8], addr: &socket2::SockAddr) -> io::Result { + async fn send_to_dest( + socket: &A, + buf: &[u8], + addr: &socket2::SockAddr, + ) -> io::Result { let sock = unsafe { socket2::Socket::from_raw_fd(socket.as_raw_fd()) }; let sock = ManuallyDrop::new(sock); @@ -268,13 +288,19 @@ impl Processor { } } - pub(crate) async fn processor_recv_from(sock: &R, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub(crate) async fn processor_recv_from( + sock: &R, + buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { Self::recv_from_with_flags(sock, buf, 0) .await .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) } - pub(crate) async fn processor_peek_from(sock: &R, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub(crate) async fn processor_peek_from( + sock: &R, + buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { Self::recv_from_with_flags(sock, buf, libc::MSG_PEEK as _) .await .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) @@ -290,8 +316,8 @@ impl Processor { // let sock = ManuallyDrop::new(sock); // Reregister on block - match super::shim_recv_from(sock, buf, flags as _) - .map(|(size, sockaddr)| (size, sockaddr)) { + match super::shim_recv_from(sock, buf, flags as _).map(|(size, sockaddr)| (size, sockaddr)) + { Ok(res) => Ok(res), Err(err) if (err.raw_os_error().unwrap() & (libc::EAGAIN | libc::EWOULDBLOCK)) != 0 => { let cc = Proactor::get() @@ -316,7 +342,9 @@ impl Processor { ///// UnixListener /////////////////////////////////// - pub(crate) async fn processor_accept_unix_listener(listener: &R) -> io::Result<(Handle, UnixSocketAddr)> { + pub(crate) async fn processor_accept_unix_listener( + listener: &R, + ) -> io::Result<(Handle, UnixSocketAddr)> { let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; let socket = socket.into_unix_listener(); let socket = ManuallyDrop::new(socket); @@ -324,7 +352,8 @@ impl Processor { // Reregister on block match socket .accept() - .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) { + .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) + { Ok(res) => Ok(res), Err(err) if (err.raw_os_error().unwrap() & (libc::EAGAIN | libc::EWOULDBLOCK)) != 0 => { let cc = Proactor::get() @@ -347,7 +376,9 @@ impl Processor { ///// UnixStream /////////////////////////////////// - pub(crate) async fn processor_connect_unix>(path: P) -> io::Result> { + pub(crate) async fn processor_connect_unix>( + path: P, + ) -> io::Result> { let sock = socket2::Socket::new(socket2::Domain::unix(), socket2::Type::stream(), None)?; let sockaddr = socket2::SockAddr::unix(path)?; @@ -377,19 +408,29 @@ impl Processor { res.map(|_| stream) } - pub(crate) async fn processor_send_to_unix>(socket: &R, buf: &[u8], path: P) -> io::Result { + pub(crate) async fn processor_send_to_unix>( + socket: &R, + buf: &[u8], + path: P, + ) -> io::Result { Self::send_to_dest(socket, buf, &socket2::SockAddr::unix(path)?).await } - pub(crate) async fn processor_recv_from_unix(socket: &R, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { + pub(crate) async fn processor_recv_from_unix( + socket: &R, + buf: &mut [u8], + ) -> io::Result<(usize, UnixSocketAddr)> { Self::recv_from_with_flags(socket, buf, 0) .await .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) } - pub(crate) async fn processor_peek_from_unix(socket: &R, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { + pub(crate) async fn processor_peek_from_unix( + socket: &R, + buf: &mut [u8], + ) -> io::Result<(usize, UnixSocketAddr)> { Self::recv_from_with_flags(socket, buf, libc::MSG_PEEK as _) .await .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) } -} \ No newline at end of file +} diff --git a/src/syscore/linux/iouring/fs/buffer.rs b/src/syscore/linux/iouring/fs/buffer.rs index 2bb0377..96f8bb7 100644 --- a/src/syscore/linux/iouring/fs/buffer.rs +++ b/src/syscore/linux/iouring/fs/buffer.rs @@ -1,11 +1,11 @@ +use futures::ready; use std::alloc::{alloc, dealloc, handle_alloc_error, Layout}; -use std::io; use std::cmp; +use std::io; use std::mem; use std::ptr::NonNull; use std::slice; use std::task::Poll; -use futures::ready; use super::cancellation::Cancellation; @@ -54,9 +54,10 @@ impl Buffer { } #[inline] - pub fn fill_buf(&mut self, fill: impl FnOnce(&mut [u8]) -> Poll>) - -> Poll> - { + pub fn fill_buf( + &mut self, + fill: impl FnOnce(&mut [u8]) -> Poll>, + ) -> Poll> { match self.storage { Storage::Buffer => { if self.pos >= self.cap { @@ -73,7 +74,7 @@ impl Buffer { self.cap = ready!(fill(self.alloc_buf()))?; Poll::Ready(Ok(self.buffered_from_read())) } - _ => panic!("attempted to fill buf while not holding buffer"), + _ => panic!("attempted to fill buf while not holding buffer"), } } @@ -90,49 +91,43 @@ impl Buffer { pub fn cancellation(&mut self) -> Cancellation { match self.storage { - Storage::Buffer => { + Storage::Buffer => { self.clear(); self.storage = Storage::Nothing; let data = mem::replace(&mut self.data, NonNull::dangling()); unsafe { Cancellation::buffer(data.cast().as_ptr(), self.capacity as usize) } } - Storage::Statx => { + Storage::Statx => { unsafe fn callback(statx: *mut (), _: usize) { dealloc(statx as *mut u8, Layout::new::()) } self.storage = Storage::Nothing; let data = mem::replace(&mut self.data, NonNull::dangling()); - unsafe { - Cancellation::new(data.cast().as_ptr(), 0, callback) - } + unsafe { Cancellation::new(data.cast().as_ptr(), 0, callback) } } - Storage::Nothing => Cancellation::null(), + Storage::Nothing => Cancellation::null(), } } pub(crate) fn as_statx(&mut self) -> *mut libc::statx { match self.storage { - Storage::Statx => self.data.cast().as_ptr(), - Storage::Nothing => self.alloc_statx(), - _ => panic!("accessed buffer as statx when storing something else"), + Storage::Statx => self.data.cast().as_ptr(), + Storage::Nothing => self.alloc_statx(), + _ => panic!("accessed buffer as statx when storing something else"), } } fn alloc_buf(&mut self) -> &mut [u8] { self.storage = Storage::Buffer; self.alloc(); - unsafe { - slice::from_raw_parts_mut(self.data.cast().as_ptr(), self.capacity as usize) - } + unsafe { slice::from_raw_parts_mut(self.data.cast().as_ptr(), self.capacity as usize) } } fn alloc_statx(&mut self) -> &mut libc::statx { self.storage = Storage::Statx; self.alloc(); - unsafe { - &mut *self.data.cast().as_ptr() - } + unsafe { &mut *self.data.cast().as_ptr() } } fn alloc(&mut self) { @@ -150,15 +145,15 @@ impl Buffer { #[inline(always)] fn layout(&self) -> Option { match self.storage { - Storage::Statx => Some(Layout::new::()), - Storage::Buffer => Some(Layout::array::(self.capacity as usize).unwrap()), - Storage::Nothing => None, + Storage::Statx => Some(Layout::new::()), + Storage::Buffer => Some(Layout::array::(self.capacity as usize).unwrap()), + Storage::Nothing => None, } } } -unsafe impl Send for Buffer { } -unsafe impl Sync for Buffer { } +unsafe impl Send for Buffer {} +unsafe impl Sync for Buffer {} impl Drop for Buffer { fn drop(&mut self) { @@ -168,4 +163,4 @@ impl Drop for Buffer { } } } -} \ No newline at end of file +} diff --git a/src/syscore/linux/iouring/fs/cancellation.rs b/src/syscore/linux/iouring/fs/cancellation.rs index 029f22d..36bc9e1 100644 --- a/src/syscore/linux/iouring/fs/cancellation.rs +++ b/src/syscore/linux/iouring/fs/cancellation.rs @@ -25,19 +25,28 @@ impl Cancellation { /// as well. /// /// It must be safe to send the Cancellation type and references to it between threads. - pub unsafe fn new(data: *mut (), metadata: usize, drop: unsafe fn(*mut (), usize)) - -> Cancellation - { - Cancellation { data, metadata, drop } + pub unsafe fn new( + data: *mut (), + metadata: usize, + drop: unsafe fn(*mut (), usize), + ) -> Cancellation { + Cancellation { + data, + metadata, + drop, + } } /// Construct a null cancellation, which does nothing when it is dropped. pub fn null() -> Cancellation { - unsafe fn drop(_: *mut (), _: usize) { } - Cancellation { data: ptr::null_mut(), metadata: 0, drop } + unsafe fn drop(_: *mut (), _: usize) {} + Cancellation { + data: ptr::null_mut(), + metadata: 0, + drop, + } } - pub(crate) unsafe fn buffer(data: *mut u8, len: usize) -> Cancellation { unsafe fn drop(data: *mut (), len: usize) { std::mem::drop(Vec::from_raw_parts(data as *mut u8, len, len)) @@ -47,13 +56,11 @@ impl Cancellation { } } -unsafe impl Send for Cancellation { } -unsafe impl Sync for Cancellation { } +unsafe impl Send for Cancellation {} +unsafe impl Sync for Cancellation {} impl Drop for Cancellation { fn drop(&mut self) { - unsafe { - (self.drop)(self.data, self.metadata) - } + unsafe { (self.drop)(self.data, self.metadata) } } -} \ No newline at end of file +} diff --git a/src/syscore/linux/iouring/fs/store_file.rs b/src/syscore/linux/iouring/fs/store_file.rs index 3775039..901b024 100644 --- a/src/syscore/linux/iouring/fs/store_file.rs +++ b/src/syscore/linux/iouring/fs/store_file.rs @@ -1,17 +1,16 @@ -use std::pin::Pin; -use std::fs::File; use crate::Handle; -use std::io; -use std::sync::Arc; use lever::sync::prelude::TTas; +use std::fs::File; +use std::io; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use std::pin::Pin; +use std::sync::Arc; use super::buffer::Buffer; -use pin_utils::unsafe_pinned; +use crate::syscore::Processor; use lever::sync::atomics::AtomicBox; +use pin_utils::unsafe_pinned; use std::task::{Context, Poll}; -use crate::syscore::Processor; - pub struct StoreFile { fd: RawFd, @@ -105,4 +104,4 @@ impl StoreFile { self.op_state.replace_with(|_| Op::Nothing); self.buf.cancellation(); } -} \ No newline at end of file +} diff --git a/src/syscore/linux/iouring/iouring.rs b/src/syscore/linux/iouring/iouring.rs index 62155b1..3626443 100644 --- a/src/syscore/linux/iouring/iouring.rs +++ b/src/syscore/linux/iouring/iouring.rs @@ -1,16 +1,19 @@ -use std::io; -use iou::{IoUring, SubmissionQueue, CompletionQueue, SubmissionQueueEvent, CompletionQueueEvent, Registrar}; -use lever::sync::prelude::*; -use std::collections::HashMap; +use core::mem::MaybeUninit; use futures::channel::oneshot; +use iou::{ + CompletionQueue, CompletionQueueEvent, IoUring, Registrar, SubmissionQueue, + SubmissionQueueEvent, +}; +use lever::sync::prelude::*; use pin_utils::unsafe_pinned; +use std::collections::HashMap; use std::future::Future; -use core::mem::MaybeUninit; -use std::os::unix::io::{AsRawFd, RawFd, FromRawFd}; +use std::io; +use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::task::{Context, Poll}; use std::time::Duration; -use std::sync::atomic::{AtomicU64, Ordering, AtomicBool}; macro_rules! syscall { ($fn:ident $args:tt) => {{ @@ -26,11 +29,11 @@ macro_rules! syscall { /////////////////// /////////////////// -use socket2::SockAddr; -use std::os::unix::net::{SocketAddr as UnixSocketAddr}; -use std::mem; use crate::Proactor; use once_cell::sync::Lazy; +use socket2::SockAddr; +use std::mem; +use std::os::unix::net::SocketAddr as UnixSocketAddr; fn max_len() -> usize { // The maximum read limit on most posix-like systems is `SSIZE_MAX`, @@ -48,18 +51,22 @@ fn max_len() -> usize { } } -pub(crate) fn shim_recv_from(fd: A, buf: &mut [u8], flags: libc::c_int) -> io::Result<(usize, SockAddr)> { +pub(crate) fn shim_recv_from( + fd: A, + buf: &mut [u8], + flags: libc::c_int, +) -> io::Result<(usize, SockAddr)> { let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; let mut addrlen = mem::size_of_val(&storage) as libc::socklen_t; let n = syscall!(recvfrom( - fd.as_raw_fd() as _, - buf.as_mut_ptr() as *mut libc::c_void, - std::cmp::min(buf.len(), max_len()), - flags, - &mut storage as *mut _ as *mut _, - &mut addrlen, - ))?; + fd.as_raw_fd() as _, + buf.as_mut_ptr() as *mut libc::c_void, + std::cmp::min(buf.len(), max_len()), + flags, + &mut storage as *mut _ as *mut _, + &mut addrlen, + ))?; let addr = unsafe { SockAddr::from_raw_parts(&storage as *const _ as *const _, addrlen) }; Ok((n as usize, addr)) } @@ -82,7 +89,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result let abst_sock_ident: libc::c_char = unsafe { std::slice::from_raw_parts( &addr.sun_path as *const _ as *const u8, - mem::size_of::() + mem::size_of::(), ) }[1] as libc::c_char; @@ -92,7 +99,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result // https://man7.org/linux/man-pages/man7/unix.7.html (sa, 0) if sa != 0 && sa > mem::size_of::() as libc::socklen_t => { len = mem::size_of::() as libc::socklen_t; - }, + } // If unnamed socket, then addr is always zero, // assign the offset reserved difference as length. (0, _) => { @@ -100,7 +107,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result let path = &addr.sun_path as *const _ as usize; let sun_path_offset = path - base; len = sun_path_offset as libc::socklen_t; - }, + } // Discard rest, they are not special. (_, _) => {} @@ -113,7 +120,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result std::ptr::copy_nonoverlapping( sockaddr.as_ptr(), &mut init as *mut _ as *mut _, - len as usize + len as usize, ); // Safety: We've written the init addr above. @@ -141,7 +148,11 @@ pub struct SysProactor { waker: AtomicBool, } -pub type RingTypes = (SubmissionQueue<'static>, CompletionQueue<'static>, Registrar<'static>); +pub type RingTypes = ( + SubmissionQueue<'static>, + CompletionQueue<'static>, + Registrar<'static>, +); static mut IO_URING: Option = None; @@ -156,12 +167,16 @@ impl SysProactor { cq: TTas::new(cq), submitters: TTas::new(HashMap::default()), submitter_id: AtomicU64::default(), - waker: AtomicBool::default() + waker: AtomicBool::default(), }) } } - fn submitter(&self, sq: &mut SubmissionQueue<'_>, mut ring_sub: impl FnMut(&mut SubmissionQueueEvent<'_>) -> T) -> Option { + fn submitter( + &self, + sq: &mut SubmissionQueue<'_>, + mut ring_sub: impl FnMut(&mut SubmissionQueueEvent<'_>) -> T, + ) -> Option { // dbg!("SUBMITTER"); let mut sqe = match sq.next_sqe() { Some(sqe) => sqe, @@ -176,46 +191,50 @@ impl SysProactor { Some(ring_sub(&mut sqe)) } - - pub(crate) fn register_io(&self, mut io_submit: impl FnMut(&mut SubmissionQueueEvent<'_>)) -> io::Result { + pub(crate) fn register_io( + &self, + mut io_submit: impl FnMut(&mut SubmissionQueueEvent<'_>), + ) -> io::Result { // dbg!("REGISTER IO"); let sub_comp = { let mut sq = self.sq.lock(); - let cc = self.submitter(&mut sq, |sqe| { - // dbg!("SUBMITTER"); - let mut id = self.submitter_id.fetch_add(1, Ordering::Relaxed); - if id == MANUAL_TIMEOUT { - id = self.submitter_id.fetch_add(2, Ordering::Relaxed) + 2; - } - let (tx, rx) = oneshot::channel(); - - // dbg!("SUBMITTER", id); - io_submit(sqe); - sqe.set_user_data(id); - - { - let mut subguard = self.submitters.lock(); - subguard.insert(id, tx); - // dbg!("INSERTED", id); - } - - CompletionChan { rx } - }).map(|c| unsafe { - let submitted_io_evcount = sq.submit(); - // dbg!(&submitted_io_evcount); - - // sq.submit() - // .map_or_else(|_| { - // let id = self.submitter_id.load(Ordering::SeqCst); - // let mut subguard = self.submitters.lock(); - // subguard.get(&id).unwrap().send(0); - // }, |submitted_io_evcount| { - // dbg!(submitted_io_evcount); - // }); - - c - }); + let cc = self + .submitter(&mut sq, |sqe| { + // dbg!("SUBMITTER"); + let mut id = self.submitter_id.fetch_add(1, Ordering::Relaxed); + if id == MANUAL_TIMEOUT { + id = self.submitter_id.fetch_add(2, Ordering::Relaxed) + 2; + } + let (tx, rx) = oneshot::channel(); + + // dbg!("SUBMITTER", id); + io_submit(sqe); + sqe.set_user_data(id); + + { + let mut subguard = self.submitters.lock(); + subguard.insert(id, tx); + // dbg!("INSERTED", id); + } + + CompletionChan { rx } + }) + .map(|c| unsafe { + let submitted_io_evcount = sq.submit(); + // dbg!(&submitted_io_evcount); + + // sq.submit() + // .map_or_else(|_| { + // let id = self.submitter_id.load(Ordering::SeqCst); + // let mut subguard = self.submitters.lock(); + // subguard.get(&id).unwrap().send(0); + // }, |submitted_io_evcount| { + // dbg!(submitted_io_evcount); + // }); + + c + }); cc }; @@ -229,7 +248,11 @@ impl SysProactor { Ok(()) } - pub(crate) fn wait(&self, max_event_size: usize, duration: Option) -> io::Result { + pub(crate) fn wait( + &self, + max_event_size: usize, + duration: Option, + ) -> io::Result { // dbg!("WAIT ENTER"); let mut cq = self.cq.lock(); let mut acquired = 0; @@ -237,8 +260,8 @@ impl SysProactor { // dbg!("WAITING FOR CQE"); // let timeout = Duration::from_millis(1); while let Ok(cqe) = cq.wait_for_cqe() { - // while let Some(cqe) = cq.peek_for_cqe() { - // dbg!("GOT"); + // while let Some(cqe) = cq.peek_for_cqe() { + // dbg!("GOT"); let mut ready = cq.ready() as usize + 1; // dbg!(ready, cqe.user_data()); @@ -274,11 +297,7 @@ impl SysProactor { acquired += 1; // dbg!("ACQUIRED", udata); - self.submitters.lock() - .remove(&udata) - .map(|s| { - s.send(res) - }); + self.submitters.lock().remove(&udata).map(|s| s.send(res)); Ok(()) } @@ -300,4 +319,4 @@ impl Future for CompletionChan { .poll(cx) .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "sender has been cancelled")) } -} \ No newline at end of file +} diff --git a/src/syscore/linux/iouring/mod.rs b/src/syscore/linux/iouring/mod.rs index 49c02fa..bc5fbc5 100644 --- a/src/syscore/linux/iouring/mod.rs +++ b/src/syscore/linux/iouring/mod.rs @@ -1,7 +1,7 @@ pub(crate) mod fs; mod iouring; -mod processor; mod nethandle; +mod processor; pub(crate) use fs::*; pub(crate) use iouring::*; diff --git a/src/syscore/linux/iouring/nethandle.rs b/src/syscore/linux/iouring/nethandle.rs index eacd6e2..3313d13 100644 --- a/src/syscore/linux/iouring/nethandle.rs +++ b/src/syscore/linux/iouring/nethandle.rs @@ -1,26 +1,24 @@ use std::future::Future; use std::io; use std::marker::Unpin; -use std::sync::Arc; -use std::path::Path; use std::net::{SocketAddr, ToSocketAddrs}; use std::os::unix::io::AsRawFd; +use std::path::Path; +use std::sync::Arc; use std::net::{TcpListener, TcpStream, UdpSocket}; // Unix specifics -use std::os::unix::net::{UnixListener, UnixStream, UnixDatagram, SocketAddr as UnixSocketAddr}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; -use lever::sync::prelude::*; use futures::Stream; +use lever::sync::prelude::*; -use crate::{Handle, Proactor}; use super::Processor; -use crate::syscore::CompletionChan; use crate::syscore::linux::iouring::fs::store_file::StoreFile; +use crate::syscore::CompletionChan; +use crate::{Handle, Proactor}; use std::fs::File; - - impl Handle { pub fn new(io: T) -> io::Result> { let fd = io.as_raw_fd(); @@ -52,10 +50,13 @@ impl Handle { pub fn incoming( &self, ) -> impl Stream>> + Send + Unpin + '_ { - Box::pin(futures::stream::unfold(self, |listener: &Handle| async move { - let res = listener.accept().await.map(|(stream, _)| stream); - Some((res, listener)) - })) + Box::pin(futures::stream::unfold( + self, + |listener: &Handle| async move { + let res = listener.accept().await.map(|(stream, _)| stream); + Some((res, listener)) + }, + )) } } @@ -100,8 +101,7 @@ impl Handle { pub async fn send_to(&self, buf: &[u8], addr: A) -> io::Result { match addr.to_socket_addrs()?.next() { - Some(addr) => - Processor::processor_send_to(self.get_ref(), buf, addr).await, + Some(addr) => Processor::processor_send_to(self.get_ref(), buf, addr).await, None => Err(io::Error::new( io::ErrorKind::InvalidData, "given addresses can't be parsed", @@ -130,10 +130,13 @@ impl Handle { pub fn incoming( &self, ) -> impl Stream>> + Send + Unpin + '_ { - Box::pin(futures::stream::unfold(self, |listener: &Handle| async move { - let res = listener.accept().await.map(|(stream, _)| stream); - Some((res, listener)) - })) + Box::pin(futures::stream::unfold( + self, + |listener: &Handle| async move { + let res = listener.accept().await.map(|(stream, _)| stream); + Some((res, listener)) + }, + )) } } @@ -203,4 +206,4 @@ impl Handle { pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { Processor::processor_peek_from_unix(self.get_ref(), buf).await } -} \ No newline at end of file +} diff --git a/src/syscore/linux/iouring/processor.rs b/src/syscore/linux/iouring/processor.rs index 2e96cf0..4702a20 100644 --- a/src/syscore/linux/iouring/processor.rs +++ b/src/syscore/linux/iouring/processor.rs @@ -1,27 +1,28 @@ +use std::future::Future; use std::io; use std::io::{Read, Write}; -use std::{fs::File, os::unix::io::{AsRawFd, FromRawFd}, mem::ManuallyDrop}; -use std::net::{SocketAddr, ToSocketAddrs, TcpListener}; -use std::os::unix::net::{ - SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream, -}; -use std::net::{SocketAddrV6, SocketAddrV4, Ipv4Addr, Ipv6Addr, UdpSocket}; -use std::future::Future; -use std::path::Path; use std::net::TcpStream; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, UdpSocket}; +use std::net::{SocketAddr, TcpListener, ToSocketAddrs}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; +use std::path::Path; +use std::{ + fs::File, + mem::ManuallyDrop, + os::unix::io::{AsRawFd, FromRawFd}, +}; use crate::proactor::Proactor; -use crate::Handle; use crate::syscore::shim_to_af_unix; -use std::io::{IoSliceMut, IoSlice}; -use iou::{SockFlag, SockAddrStorage}; +use crate::Handle; +use iou::{SockAddrStorage, SockFlag}; +use std::ffi::CString; +use std::io::{IoSlice, IoSliceMut}; use std::mem::MaybeUninit; use std::os::unix::ffi::OsStrExt; -use std::ffi::CString; use std::os::unix::prelude::RawFd; - macro_rules! syscall { ($fn:ident $args:tt) => {{ let res = unsafe { libc::$fn $args }; @@ -33,7 +34,6 @@ macro_rules! syscall { }}; } - pub struct Processor; impl Processor { @@ -58,7 +58,11 @@ impl Processor { Ok(x) } - pub(crate) async fn processor_read_file(io: &RawFd, buf: &mut [u8], offset: usize) -> io::Result { + pub(crate) async fn processor_read_file( + io: &RawFd, + buf: &mut [u8], + offset: usize, + ) -> io::Result { let cc = Proactor::get().inner().register_io(|sqe| unsafe { sqe.prep_read(*io, buf, offset); })?; @@ -66,7 +70,11 @@ impl Processor { Ok(cc.await? as _) } - pub(crate) async fn processor_write_file(io: &RawFd, buf: &[u8], offset: usize) -> io::Result { + pub(crate) async fn processor_write_file( + io: &RawFd, + buf: &[u8], + offset: usize, + ) -> io::Result { let cc = Proactor::get().inner().register_io(|sqe| unsafe { sqe.prep_write(*io, buf, offset); })?; @@ -83,22 +91,29 @@ impl Processor { Ok(cc.await? as _) } - pub(crate) async fn processor_file_size(io: &RawFd, statx: *mut libc::statx) -> io::Result { + pub(crate) async fn processor_file_size( + io: &RawFd, + statx: *mut libc::statx, + ) -> io::Result { static EMPTY: libc::c_char = 0; let flags = libc::AT_EMPTY_PATH; let mask = libc::STATX_SIZE; - Proactor::get().inner().register_io(|sqe| unsafe { - let sqep = sqe.raw_mut(); - uring_sys::io_uring_prep_statx(sqep, *io, &EMPTY, flags, mask, statx); - })?.await?; + Proactor::get() + .inner() + .register_io(|sqe| unsafe { + let sqep = sqe.raw_mut(); + uring_sys::io_uring_prep_statx(sqep, *io, &EMPTY, flags, mask, statx); + })? + .await?; - unsafe { - Ok((*statx).stx_size as usize) - } + unsafe { Ok((*statx).stx_size as usize) } } - pub(crate) async fn processor_read_vectored(io: &RawFd, bufs: &mut [IoSliceMut<'_>]) -> io::Result { + pub(crate) async fn processor_read_vectored( + io: &RawFd, + bufs: &mut [IoSliceMut<'_>], + ) -> io::Result { let cc = Proactor::get().inner().register_io(|sqe| unsafe { sqe.prep_read_vectored(*io, bufs, 0); })?; @@ -106,7 +121,10 @@ impl Processor { Ok(cc.await? as _) } - pub(crate) async fn processor_write_vectored(io: &RawFd, bufs: &[IoSlice<'_>]) -> io::Result { + pub(crate) async fn processor_write_vectored( + io: &RawFd, + bufs: &[IoSlice<'_>], + ) -> io::Result { let cc = Proactor::get().inner().register_io(|sqe| unsafe { sqe.prep_write_vectored(*io, bufs, 0); })?; @@ -122,10 +140,13 @@ impl Processor { pub(crate) async fn processor_send(socket: &R, buf: &[u8]) -> io::Result { let fd = socket.as_raw_fd() as _; - let res = Proactor::get().inner().register_io(|sqe| unsafe { - let sqep = sqe.raw_mut(); - uring_sys::io_uring_prep_send(sqep, fd, buf.as_ptr() as _, buf.len() as _, 0); - })?.await?; + let res = Proactor::get() + .inner() + .register_io(|sqe| unsafe { + let sqep = sqe.raw_mut(); + uring_sys::io_uring_prep_send(sqep, fd, buf.as_ptr() as _, buf.len() as _, 0); + })? + .await?; Ok(res as _) } @@ -145,10 +166,19 @@ impl Processor { ) -> io::Result { let fd = socket.as_raw_fd() as _; - let res = Proactor::get().inner().register_io(|sqe| unsafe { - let sqep = sqe.raw_mut(); - uring_sys::io_uring_prep_recv(sqep as *mut _, fd, buf.as_ptr() as _, buf.len() as _, flags as _); - })?.await?; + let res = Proactor::get() + .inner() + .register_io(|sqe| unsafe { + let sqep = sqe.raw_mut(); + uring_sys::io_uring_prep_recv( + sqep as *mut _, + fd, + buf.as_ptr() as _, + buf.len() as _, + flags as _, + ); + })? + .await?; Ok(res as _) } @@ -158,10 +188,13 @@ impl Processor { ///// Commonality of TcpStream, UdpSocket /////////////////////////////////// - pub(crate) async fn processor_connect(addrs: A, mut f: F) -> io::Result - where - F: FnMut(SocketAddr) -> Fut, - Fut: Future>, + pub(crate) async fn processor_connect( + addrs: A, + mut f: F, + ) -> io::Result + where + F: FnMut(SocketAddr) -> Fut, + Fut: Future>, { // TODO connect_tcp, connect_udp let addrs = match addrs.to_socket_addrs() { @@ -178,10 +211,7 @@ impl Processor { } Err(tail_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "couldn't resolve addresses", - ) + io::Error::new(io::ErrorKind::InvalidInput, "couldn't resolve addresses") })) } @@ -197,23 +227,29 @@ impl Processor { } else { socket2::Domain::ipv4() }; - let sock = socket2::Socket::new(domain, socket2::Type::stream(), Some(socket2::Protocol::tcp()))?; + let sock = socket2::Socket::new( + domain, + socket2::Type::stream(), + Some(socket2::Protocol::tcp()), + )?; sock.set_nonblocking(true)?; // FIXME: (vcq): iou uses nix, i use socket2, conversions happens over libc. // Propose std conversion for nix. - let nixsaddr = - unsafe { - &iou::SockAddr::from_libc_sockaddr(sock.local_addr().unwrap().as_ptr()).unwrap() - }; + let nixsaddr = unsafe { + &iou::SockAddr::from_libc_sockaddr(sock.local_addr().unwrap().as_ptr()).unwrap() + }; let mut stream = sock.into_tcp_stream(); stream.set_nodelay(true)?; let fd = stream.as_raw_fd() as _; - Proactor::get().inner().register_io(|sqe| unsafe { - sqe.prep_connect(fd, nixsaddr); - })?.await?; + Proactor::get() + .inner() + .register_io(|sqe| unsafe { + sqe.prep_connect(fd, nixsaddr); + })? + .await?; Ok(Handle::new(stream)?) } @@ -223,7 +259,11 @@ impl Processor { SocketAddr::V4(_) => socket2::Domain::ipv4(), SocketAddr::V6(_) => socket2::Domain::ipv6(), }; - let sock = socket2::Socket::new(domain, socket2::Type::dgram(), Some(socket2::Protocol::udp()))?; + let sock = socket2::Socket::new( + domain, + socket2::Type::dgram(), + Some(socket2::Protocol::udp()), + )?; let sockaddr = socket2::SockAddr::from(addr); let unspec = match addr { @@ -338,7 +378,9 @@ impl Processor { // Ok((Handle::new(stream)?, addr)) // } - pub(crate) async fn processor_accept_tcp_listener(listener: &R) -> io::Result<(Handle, SocketAddr)> { + pub(crate) async fn processor_accept_tcp_listener( + listener: &R, + ) -> io::Result<(Handle, SocketAddr)> { let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; let socket = socket.into_tcp_listener(); let socket = ManuallyDrop::new(socket); @@ -360,7 +402,11 @@ impl Processor { Self::send_to_dest(socket, buf, &socket2::SockAddr::from(addr)).await } - async fn send_to_dest(socket: &A, buf: &[u8], addr: &socket2::SockAddr) -> io::Result { + async fn send_to_dest( + socket: &A, + buf: &[u8], + addr: &socket2::SockAddr, + ) -> io::Result { // FIXME: (vcq): Wrap into vec? let mut iov = IoSlice::new(buf); @@ -372,21 +418,30 @@ impl Processor { let fd = socket.as_raw_fd() as _; - let res = Proactor::get().inner().register_io(|sqe| unsafe { - let sqep = sqe.raw_mut(); - uring_sys::io_uring_prep_sendmsg(sqep, fd, &sendmsg as *const _ as *const _, 0); - })?.await?; + let res = Proactor::get() + .inner() + .register_io(|sqe| unsafe { + let sqep = sqe.raw_mut(); + uring_sys::io_uring_prep_sendmsg(sqep, fd, &sendmsg as *const _ as *const _, 0); + })? + .await?; Ok(res as _) } - pub(crate) async fn processor_recv_from(sock: &R, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub(crate) async fn processor_recv_from( + sock: &R, + buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { Self::recv_from_with_flags(sock, buf, 0) .await .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) } - pub(crate) async fn processor_peek_from(sock: &R, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub(crate) async fn processor_peek_from( + sock: &R, + buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { Self::recv_from_with_flags(sock, buf, libc::MSG_PEEK as _) .await .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) @@ -397,7 +452,8 @@ impl Processor { buf: &mut [u8], flags: u32, ) -> io::Result<(usize, socket2::SockAddr)> { - let mut sockaddr_raw = unsafe { MaybeUninit::::zeroed().assume_init() }; + let mut sockaddr_raw = + unsafe { MaybeUninit::::zeroed().assume_init() }; // FIXME: (vcq): Wrap into vec? let mut iov = IoSliceMut::new(buf); @@ -410,15 +466,18 @@ impl Processor { let fd = socket.as_raw_fd() as _; - let res = Proactor::get().inner().register_io(|sqe| unsafe { - let sqep = sqe.raw_mut(); - uring_sys::io_uring_prep_recvmsg( - sqep, - fd, - &mut recvmsg as *mut _ as *mut _, - flags as _, - ); - })?.await?; + let res = Proactor::get() + .inner() + .register_io(|sqe| unsafe { + let sqep = sqe.raw_mut(); + uring_sys::io_uring_prep_recvmsg( + sqep, + fd, + &mut recvmsg as *mut _ as *mut _, + flags as _, + ); + })? + .await?; let sockaddr = unsafe { socket2::SockAddr::from_raw_parts( @@ -434,7 +493,9 @@ impl Processor { ///// UnixListener /////////////////////////////////// - pub(crate) async fn processor_accept_unix_listener(listener: &R) -> io::Result<(Handle, UnixSocketAddr)> { + pub(crate) async fn processor_accept_unix_listener( + listener: &R, + ) -> io::Result<(Handle, UnixSocketAddr)> { let fd = listener.as_raw_fd() as _; let mut saddrstor = SockAddrStorage::uninit(); @@ -457,7 +518,9 @@ impl Processor { ///// UnixStream /////////////////////////////////// - pub(crate) async fn processor_connect_unix>(path: P) -> io::Result> { + pub(crate) async fn processor_connect_unix>( + path: P, + ) -> io::Result> { let sock = socket2::Socket::new(socket2::Domain::unix(), socket2::Type::stream(), None)?; let sockaddr = socket2::SockAddr::unix(path)?; @@ -465,34 +528,44 @@ impl Processor { // FIXME: (vcq): iou uses nix, i use socket2, conversions happens over libc. // Propose std conversion for nix. - let nixsaddr = - unsafe { - &iou::SockAddr::from_libc_sockaddr(sock.local_addr().unwrap().as_ptr()).unwrap() - }; + let nixsaddr = unsafe { + &iou::SockAddr::from_libc_sockaddr(sock.local_addr().unwrap().as_ptr()).unwrap() + }; let stream = sock.into_unix_stream(); let fd = stream.as_raw_fd() as _; - Proactor::get().inner().register_io(|sqe| unsafe { - sqe.prep_connect(fd, nixsaddr) - })?.await?; + Proactor::get() + .inner() + .register_io(|sqe| unsafe { sqe.prep_connect(fd, nixsaddr) })? + .await?; Ok(Handle::new(stream)?) } - pub(crate) async fn processor_send_to_unix>(socket: &R, buf: &[u8], path: P) -> io::Result { + pub(crate) async fn processor_send_to_unix>( + socket: &R, + buf: &[u8], + path: P, + ) -> io::Result { Self::send_to_dest(socket, buf, &socket2::SockAddr::unix(path)?).await } - pub(crate) async fn processor_recv_from_unix(socket: &R, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { + pub(crate) async fn processor_recv_from_unix( + socket: &R, + buf: &mut [u8], + ) -> io::Result<(usize, UnixSocketAddr)> { Self::recv_from_with_flags(socket, buf, 0) .await .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) } - pub(crate) async fn processor_peek_from_unix(socket: &R, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { + pub(crate) async fn processor_peek_from_unix( + socket: &R, + buf: &mut [u8], + ) -> io::Result<(usize, UnixSocketAddr)> { Self::recv_from_with_flags(socket, buf, libc::MSG_PEEK as _) .await .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) } -} \ No newline at end of file +} diff --git a/src/syscore/linux/mod.rs b/src/syscore/linux/mod.rs index 05e0f45..362678a 100644 --- a/src/syscore/linux/mod.rs +++ b/src/syscore/linux/mod.rs @@ -6,4 +6,4 @@ pub(crate) use epoll::*; #[cfg(feature = "iouring")] mod iouring; #[cfg(feature = "iouring")] -pub(crate) use iouring::*; \ No newline at end of file +pub(crate) use iouring::*; diff --git a/src/syscore/windows/iocp.rs b/src/syscore/windows/iocp.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/syscore/windows/iocp/iocp.rs b/src/syscore/windows/iocp/iocp.rs new file mode 100644 index 0000000..08c5f70 --- /dev/null +++ b/src/syscore/windows/iocp/iocp.rs @@ -0,0 +1,352 @@ +use super::Processor; +use crate::handle::Handle; +use crate::submission_handler::SubmissionHandler; +use futures::channel::oneshot; +use futures::io::{AsyncRead, AsyncWrite}; +use std::collections::HashMap; +use std::fs::File; +use std::io::Read; +use std::mem::ManuallyDrop; +use std::net::TcpStream; +use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle}; +use std::ptr; +use std::time::Duration; +use std::{io, task}; +use std::{ + pin::Pin, + sync::atomic::{AtomicUsize, Ordering}, + task::Context, + task::Poll, +}; +use winapi::um::handleapi; +use winapi::um::handleapi::INVALID_HANDLE_VALUE; +use winapi::um::ioapiset; +use winapi::um::minwinbase::OVERLAPPED_ENTRY; +use winapi::um::winbase::INFINITE; +use winapi::um::winnt::HANDLE; + +pub struct SysProactor { + iocp_handle: WinHandle, + completions: HashMap>, + // ULONG_PTR + completion_key: AtomicUsize, +} + +#[repr(C)] +#[derive(Copy, Clone)] +pub struct WinOverlappedEntry(OVERLAPPED_ENTRY); + +impl std::ops::Deref for WinOverlappedEntry { + type Target = OVERLAPPED_ENTRY; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for WinOverlappedEntry { + fn from(h: OVERLAPPED_ENTRY) -> Self { + Self(h) + } +} + +#[repr(C)] +#[derive(Copy, Clone)] +pub struct WinHandle(HANDLE); + +// TODO [igni]: SUPER CURSED THIS NEEDS TO GO AWAY +unsafe impl Send for WinHandle {} +unsafe impl Sync for WinHandle {} +// TODO [igni]: SUPER CURSED THIS NEEDS TO GO AWAY +unsafe impl Send for WinRawHandle {} +unsafe impl Sync for WinRawHandle {} + +#[repr(C)] +#[derive(Copy, Clone)] +pub struct WinRawHandle(RawHandle); + +impl std::ops::Deref for WinHandle { + type Target = HANDLE; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for WinHandle { + fn from(h: HANDLE) -> Self { + Self(h) + } +} + +impl std::ops::Deref for WinRawHandle { + type Target = RawHandle; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for WinRawHandle { + fn from(h: RawHandle) -> Self { + Self(h) + } +} + +impl SysProactor { + /// Returns a reference to the proactor. + pub fn new() -> Result { + let completion_key = AtomicUsize::default(); + let iocp_handle = try_create_iocp( + WinRawHandle(INVALID_HANDLE_VALUE as _), + WinHandle(ptr::null_mut()), + completion_key.fetch_add(1, Ordering::SeqCst), + ) + .unwrap(); + + std::thread::spawn(|| loop {}); + + Ok(Self { + iocp_handle, + completion_key, + completions: Default::default(), + }) + } + + /// Wakes the thread waiting on proactor. + pub fn wake(&self) -> io::Result<()> { + Ok(()) + } + + /// Wait for completion of IO object + pub fn wait(&self, max_event_size: usize, duration: Option) -> io::Result { + // Wait for completion + let mut bytes_transferred = 0; + let mut actual_completion_key = 0; + let mut entries_removed = ptr::null_mut(); + + let mut entries_stuff = &mut self.completions.iter().map(|stuff| stuff.key()); + + let done = unsafe { + ioapiset::GetQueuedCompletionStatusEx( + *handle, + entries_stuff, + &mut stuff.key(), + entries_removed, + 1, // TODO: Play around with that on poll or something. + ) + }; + + if done == 0 { + return Err(io::Error::last_os_error()); + } + + // Process removed entries + + // return nb removed entries + Ok(entries_removed) + } + + /// Get underlying proactor instance. + pub(crate) fn inner(&self) -> &SysProactor { + &self + } + + pub(crate) fn register_io( + &self, + file: RawHandle, + mut on_ready: impl FnMut() + Send, + ) -> io::Result> { + let (sender, receiver) = oneshot::channel(); + // Send stuff + let mut completion_key = self.completion_key(); + let handle = try_create_iocp(WinRawHandle(file) as _, self.iocp_handle, completion_key)?; + + // TODO [igni]: threads + std::thread::spawn(move || { + // Wait for completion + let mut bytes_transferred = 0; + let mut actual_completion_key = 0; + let mut overlapped = ptr::null_mut(); + let done = unsafe { + ioapiset::GetQueuedCompletionStatus( + *handle, + &mut bytes_transferred, + &mut completion_key, + overlapped, + 1, // TODO: Play around with that on poll or something. + ) + }; + + if done == 0 { + println!("something terrible happened"); + } + + on_ready(); + // cast is safe here because bytes_transferred is a u32 + sender.send(bytes_transferred as usize); + }); + Ok(receiver) + } + + fn completion_key(&self) -> usize { + self.completion_key.fetch_add(1, Ordering::SeqCst) + } +} + +fn try_create_iocp( + file_handle: WinRawHandle, + existing_completion_port: WinHandle, + completion_key: usize, +) -> io::Result { + let task_handle = unsafe { + ioapiset::CreateIoCompletionPort( + *file_handle as _, + *existing_completion_port, + completion_key, + 0, + ) + }; + + if task_handle.is_null() { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Couldn't create iocp - {}", io::Error::last_os_error()), + )) + } else { + Ok(task_handle.into()) + } +} + +impl Drop for SysProactor { + fn drop(&mut self) { + // If the function succeeds, the return value is nonzero. + if unsafe { handleapi::CloseHandle(*self.iocp_handle) } == 0 { + println!( + "warning : couldn't drop iocp handle - {}", + io::Error::last_os_error() + ); + }; + } +} + +/////////////////////////////////// +///// File +/////////////////////////////////// + +#[cfg(windows)] +impl AsyncRead for &Handle { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let raw_handle = self.as_raw_handle(); + let buf_len = buf.len(); + let buf = buf.as_mut_ptr(); + + // leak shit + let completion_dispatcher = async move { + let mut file = unsafe { File::from_raw_handle(raw_handle) }; + let buf = unsafe { std::slice::from_raw_parts_mut(buf, buf_len) }; + + let size = Processor::processor_read_file(&mut file, buf).await?; + let _ = ManuallyDrop::new(file); + Ok(size) + }; + + SubmissionHandler::::handle_read(self, cx, completion_dispatcher) + } +} + +#[cfg(windows)] +impl AsyncWrite for &Handle { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let raw_handle = self.as_raw_handle(); + let buf_len = buf.len(); + let buf = buf.as_ptr(); + + let completion_dispatcher = async move { + let mut file = unsafe { File::from_raw_handle(raw_handle) }; + let mut buf = unsafe { std::slice::from_raw_parts(buf, buf_len) }; + + let size = Processor::processor_write_file(&mut file, &mut buf).await?; + let _ = ManuallyDrop::new(file); + Ok(size) + }; + + SubmissionHandler::::handle_write(self, cx, completion_dispatcher) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +/////////////////////////////////// +///// TcpStream +/////////////////////////////////// + +#[cfg(windows)] +impl AsyncRead for &Handle { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut [u8], + ) -> Poll> { + todo!(); + // let raw_fd = self.as_raw_fd(); + // let buf_len = buf.len(); + // let buf = buf.as_mut_ptr(); + + // let completion_dispatcher = async move { + // let sock = unsafe { TcpStream::from_raw_fd(raw_fd) }; + + // let buf = unsafe { std::slice::from_raw_parts_mut(buf, buf_len) }; + // let size = Processor::processor_recv(&sock, buf).await?; + + // let _ = ManuallyDrop::new(sock); + // Ok(size) + // }; + + // SubmissionHandler::::handle_read(self, cx, completion_dispatcher) + } +} + +#[cfg(windows)] +impl AsyncWrite for &Handle { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + todo!(); + // let raw_fd = self.as_raw_fd(); + // let buf_len = buf.len(); + // let buf = buf.as_ptr(); + + // let completion_dispatcher = async move { + // let sock = unsafe { TcpStream::from_raw_fd(raw_fd) }; + + // let buf = unsafe { std::slice::from_raw_parts(buf, buf_len) }; + // let size = Processor::processor_send(&sock, buf).await?; + + // let _ = ManuallyDrop::new(sock); + // Ok(size) + // }; + + // SubmissionHandler::::handle_write(self, cx, completion_dispatcher) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } +} diff --git a/src/syscore/windows/iocp/mod.rs b/src/syscore/windows/iocp/mod.rs new file mode 100644 index 0000000..4afd955 --- /dev/null +++ b/src/syscore/windows/iocp/mod.rs @@ -0,0 +1,5 @@ +mod iocp; +mod processor; + +pub(crate) use iocp::*; +pub(crate) use processor::*; diff --git a/src/syscore/windows/iocp/processor.rs b/src/syscore/windows/iocp/processor.rs new file mode 100644 index 0000000..eed7a23 --- /dev/null +++ b/src/syscore/windows/iocp/processor.rs @@ -0,0 +1,501 @@ +use crate::proactor::Proactor; +use crate::Handle; +use std::future::Future; +use std::io; +use std::io::{IoSlice, IoSliceMut}; +use std::io::{Read, Write}; +use std::mem::MaybeUninit; +use std::net::TcpStream; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, UdpSocket}; +use std::net::{SocketAddr, TcpListener, ToSocketAddrs}; +use std::os::windows::io::{AsRawHandle, FromRawHandle}; +use std::path::Path; +use std::{fs::File, mem::ManuallyDrop}; + +pub struct Processor; + +impl Processor { + /////////////////////////////////// + ///// Read Write + ///// Synchronous File + /////////////////////////////////// + + pub(crate) async fn processor_read_file(io: &mut R, mut buf: &mut [u8]) -> io::Result + where + R: std::io::Read + AsRawHandle + Send, + { + let fd = io.as_raw_handle(); + let mut cc = Proactor::get().inner().register_io(fd, || { + // copy io into bufs + io.read(&mut buf); + })?; + + Ok(cc + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?) + } + + pub(crate) async fn processor_write_file( + io: &mut R, + mut buf: &mut [u8], + ) -> io::Result + where + R: std::io::Write + AsRawHandle + Send, + { + let fd = io.as_raw_handle(); + let bufs = &[IoSlice::new(buf)]; + + let mut cc = Proactor::get().inner().register_io(fd, || { + io.write_all(&mut buf); + })?; + + Ok(cc + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?) + } + + // /////////////////////////////////// + // ///// Send, Recv, Peek + // ///// Commonality of TcpStream, UdpSocket, UnixStream, UnixDatagram + // /////////////////////////////////// + + // pub(crate) async fn processor_send( + // socket: &R, + // buf: &[u8], + // ) -> io::Result { + // let fd = socket.as_raw_fd() as _; + + // let res = Proactor::get() + // .inner() + // .register_io(|sqe| unsafe { + // let sqep = sqe.raw_mut(); + // uring_sys::io_uring_prep_send(sqep, fd, buf.as_ptr() as _, buf.len() as _, 0); + // })? + // .await?; + + // Ok(res as _) + // } + + // pub(crate) async fn processor_recv( + // sock: &R, + // buf: &mut [u8], + // ) -> io::Result { + // Self::recv_with_flags(sock, buf, 0).await + // } + + // pub(crate) async fn processor_peek( + // sock: &R, + // buf: &mut [u8], + // ) -> io::Result { + // Self::recv_with_flags(sock, buf, libc::MSG_PEEK as _).await + // } + + // async fn recv_with_flags( + // socket: &R, + // buf: &mut [u8], + // flags: u32, + // ) -> io::Result { + // let fd = socket.as_raw_fd() as _; + + // let res = Proactor::get() + // .inner() + // .register_io(|sqe| unsafe { + // let sqep = sqe.raw_mut(); + // uring_sys::io_uring_prep_recv( + // sqep as *mut _, + // fd, + // buf.as_ptr() as _, + // buf.len() as _, + // flags as _, + // ); + // })? + // .await?; + + // Ok(res as _) + // } + + // /////////////////////////////////// + // ///// Connect + // ///// Commonality of TcpStream, UdpSocket + // /////////////////////////////////// + + // pub(crate) async fn processor_connect( + // addrs: A, + // mut f: F, + // ) -> io::Result + // where + // F: FnMut(SocketAddr) -> Fut, + // Fut: Future>, + // { + // // TODO connect_tcp, connect_udp + // let addrs = match addrs.to_socket_addrs() { + // Ok(addrs) => addrs, + // Err(e) => return Err(e), + // }; + + // let mut tail_err = None; + // for addr in addrs { + // match f(addr).await { + // Ok(l) => return Ok(l), + // Err(e) => tail_err = Some(e), + // } + // } + + // Err(tail_err.unwrap_or_else(|| { + // io::Error::new(io::ErrorKind::InvalidInput, "couldn't resolve addresses") + // })) + // } + + // pub(crate) async fn processor_connect_tcp(addr: SocketAddr) -> io::Result> { + // let addr = addr.to_string(); + // // FIXME: address resolution is always blocking. + // let addr = addr.to_socket_addrs()?.next().ok_or_else(|| { + // io::Error::new(io::ErrorKind::InvalidInput, "could not resolve the address") + // })?; + + // let domain = if addr.is_ipv6() { + // socket2::Domain::ipv6() + // } else { + // socket2::Domain::ipv4() + // }; + // let sock = socket2::Socket::new( + // domain, + // socket2::Type::stream(), + // Some(socket2::Protocol::tcp()), + // )?; + + // sock.set_nonblocking(true)?; + + // // FIXME: (vcq): iou uses nix, i use socket2, conversions happens over libc. + // // Propose std conversion for nix. + // let nixsaddr = unsafe { + // &iou::SockAddr::from_libc_sockaddr(sock.local_addr().unwrap().as_ptr()).unwrap() + // }; + // let stream = sock.into_tcp_stream(); + // let fd = stream.as_raw_fd() as _; + + // Proactor::get() + // .inner() + // .register_io(|sqe| unsafe { + // sqe.prep_connect(fd, nixsaddr); + // })? + // .await?; + + // Ok(Handle::new(stream)?) + // } + + // pub(crate) async fn processor_connect_udp(addr: SocketAddr) -> io::Result> { + // let domain = match addr { + // SocketAddr::V4(_) => socket2::Domain::ipv4(), + // SocketAddr::V6(_) => socket2::Domain::ipv6(), + // }; + // let sock = socket2::Socket::new( + // domain, + // socket2::Type::dgram(), + // Some(socket2::Protocol::udp()), + // )?; + // let sockaddr = socket2::SockAddr::from(addr); + + // let unspec = match addr { + // SocketAddr::V4(_) => { + // let unspecv4 = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0); + // socket2::SockAddr::from(unspecv4) + // } + // SocketAddr::V6(_) => { + // let unspecv6 = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0); + // socket2::SockAddr::from(unspecv6) + // } + // }; + + // // Try to bind to the datagram socket. + // sock.bind(&unspec)?; + // sock.set_nonblocking(true)?; + + // // Try to connect over the socket + // sock.connect(&sockaddr)?; + + // // Make into udp type and init handler. + // Ok(Handle::new(sock.into_udp_socket())?) + // } + + // /////////////////////////////////// + // ///// TcpListener + // /////////////////////////////////// + + // // TODO: (vcq): need to fix the accept + // // pub(crate) async fn processor_accept_tcp_listener(listener: &R) -> io::Result<(Handle, SocketAddr)> { + // // let fd = listener.as_raw_fd() as _; + // // // let flags = syscall!(fcntl(fd, libc::F_GETFL))?; + // // // syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?; + // // + // // // let mut sockaddr = MaybeUninit::::uninit(); + // // // let mut sockaddr_len = std::mem::size_of::() as _; + // // // + // // // + // // // let mut sockaddr = unsafe { + // // // let mut saddr = sockaddr.assume_init(); + // // // saddr.ss_family = libc::AF_INET as libc::sa_family_t; + // // // saddr + // // // }; + // // + // // // let mut saddrstor = SockAddrStorage::uninit(); + // // + // // let cc = Proactor::get().inner().register_io(|sqe| unsafe { + // // // let sqep = sqe.raw_mut(); + // // // dbg!(&sqe.user_data()); + // // // dbg!(&sqep.user_data); + // // // sqe.prep_accept(fd, Some(&mut saddrstor), SockFlag::SOCK_NONBLOCK); + // // sqe.prep_accept(fd, None, iou::SockFlag::empty()); + // // // uring_sys::io_uring_prep_accept(sqep as *mut _, + // // // fd, + // // // &mut sockaddr as *mut _ as *mut _, + // // // &mut sockaddr_len, + // // // 0); + // // })?; + // // + // // // let cc = Proactor::get().inner().register_io(|sqe| unsafe { + // // // dbg!("SQE CAME"); + // // // let sqep = sqe.raw_mut(); + // // // // sqe.prep_accept(fd, Some(&mut saddrstor), SockFlag::empty()); + // // // uring_sys::io_uring_prep_accept(sqep, + // // // fd, + // // // sockaddr.as_mut_ptr() as *mut _, + // // // &mut sockaddr_len, + // // // 0); + // // // })?; + // // + // // // let mut saddrstor = SockAddrStorage::uninit(); + // // // + // // // let cc = Proactor::get().inner().register_io(|sqe| unsafe { + // // // sqe.prep_accept(fd, Some(&mut saddrstor), SockFlag::empty()); + // // // })?; + // // dbg!("TCP LISTENER"); + // // + // // let stream = unsafe { TcpStream::from_raw_fd(cc.await?) }; + // // dbg!("TCP LISTENER RECEIVED"); + // // // let addr = unsafe { + // // // let nixsa = saddrstor.as_socket_addr()?; + // // // let (saddr, saddr_len) = nixsa.as_ffi_pair(); + // // // socket2::SockAddr::from_raw_parts(saddr as *const _, saddr_len as _) + // // // .as_std() + // // // .unwrap() + // // // }; + // // + // // let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; + // // let socket = socket.into_tcp_listener(); + // // let addr = socket.local_addr().unwrap(); + // // // let res = socket + // // // .accept() + // // // .map(|(_, sockaddr)| (Handle::new(stream).unwrap(), sockaddr))?; + // // + // // // let addr = unsafe { + // // // socket + // // // .local_addr() + // // // .unwrap() + // // // }; + // // + // // // unsafe { + // // // let mut sockaddr = sockaddr.assume_init(); + // // // sockaddr.ss_family = libc::AF_INET as libc::sa_family_t; + // // // } + // // + // // // let addr = unsafe { + // // // socket2::SockAddr::from_raw_parts(&sockaddr as *const _ as *const _, sockaddr_len as _) + // // // .as_std() + // // // .unwrap() + // // // }; + // // + // // Ok((Handle::new(stream)?, addr)) + // // } + + // pub(crate) async fn processor_accept_tcp_listener( + // listener: &R, + // ) -> io::Result<(Handle, SocketAddr)> { + // let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; + // let socket = socket.into_tcp_listener(); + // let socket = ManuallyDrop::new(socket); + + // socket + // .accept() + // .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) + // } + + // /////////////////////////////////// + // ///// UdpSocket + // /////////////////////////////////// + + // pub(crate) async fn processor_send_to( + // socket: &R, + // buf: &[u8], + // addr: SocketAddr, + // ) -> io::Result { + // Self::send_to_dest(socket, buf, &socket2::SockAddr::from(addr)).await + // } + + // async fn send_to_dest( + // socket: &A, + // buf: &[u8], + // addr: &socket2::SockAddr, + // ) -> io::Result { + // // FIXME: (vcq): Wrap into vec? + // let mut iov = IoSlice::new(buf); + + // let mut sendmsg = unsafe { MaybeUninit::::zeroed().assume_init() }; + // sendmsg.msg_name = addr.as_ptr() as *mut _; + // sendmsg.msg_namelen = addr.len(); + // sendmsg.msg_iov = iov.as_ptr() as *mut _; + // sendmsg.msg_iovlen = iov.len(); + + // let fd = socket.as_raw_fd() as _; + + // let res = Proactor::get() + // .inner() + // .register_io(|sqe| unsafe { + // let sqep = sqe.raw_mut(); + // uring_sys::io_uring_prep_sendmsg(sqep, fd, &sendmsg as *const _ as *const _, 0); + // })? + // .await?; + + // Ok(res as _) + // } + + // pub(crate) async fn processor_recv_from( + // sock: &R, + // buf: &mut [u8], + // ) -> io::Result<(usize, SocketAddr)> { + // Self::recv_from_with_flags(sock, buf, 0) + // .await + // .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) + // } + + // pub(crate) async fn processor_peek_from( + // sock: &R, + // buf: &mut [u8], + // ) -> io::Result<(usize, SocketAddr)> { + // Self::recv_from_with_flags(sock, buf, libc::MSG_PEEK as _) + // .await + // .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) + // } + + // async fn recv_from_with_flags( + // socket: &R, + // buf: &mut [u8], + // flags: u32, + // ) -> io::Result<(usize, socket2::SockAddr)> { + // let mut sockaddr_raw = + // unsafe { MaybeUninit::::zeroed().assume_init() }; + + // // FIXME: (vcq): Wrap into vec? + // let mut iov = IoSliceMut::new(buf); + + // let mut recvmsg = unsafe { MaybeUninit::::zeroed().assume_init() }; + // recvmsg.msg_name = &mut sockaddr_raw as *mut _ as _; + // recvmsg.msg_namelen = std::mem::size_of::() as _; + // recvmsg.msg_iov = iov.as_ptr() as *mut _; + // recvmsg.msg_iovlen = iov.len(); + + // let fd = socket.as_raw_fd() as _; + + // let res = Proactor::get() + // .inner() + // .register_io(|sqe| unsafe { + // let sqep = sqe.raw_mut(); + // uring_sys::io_uring_prep_recvmsg( + // sqep, + // fd, + // &mut recvmsg as *mut _ as *mut _, + // flags as _, + // ); + // })? + // .await?; + + // let sockaddr = unsafe { + // socket2::SockAddr::from_raw_parts( + // &sockaddr_raw as *const _ as *const _, + // recvmsg.msg_namelen, + // ) + // }; + + // Ok((res as _, sockaddr)) + // } + + // /////////////////////////////////// + // ///// UnixListener + // /////////////////////////////////// + + // pub(crate) async fn processor_accept_unix_listener( + // listener: &R, + // ) -> io::Result<(Handle, UnixSocketAddr)> { + // let fd = listener.as_raw_fd() as _; + // let mut saddrstor = SockAddrStorage::uninit(); + + // let cc = Proactor::get().inner().register_io(|sqe| unsafe { + // sqe.prep_accept(fd, Some(&mut saddrstor), SockFlag::empty()) + // })?; + + // let stream = unsafe { UnixStream::from_raw_fd(cc.await?) }; + // let addr = unsafe { + // let nixsa = saddrstor.as_socket_addr()?; + // let (saddr, saddr_len) = nixsa.as_ffi_pair(); + // socket2::SockAddr::from_raw_parts(saddr as *const _, saddr_len as _) + // }; + // let addr = shim_to_af_unix(&addr)?; + + // Ok((Handle::new(stream)?, addr)) + // } + + // /////////////////////////////////// + // ///// UnixStream + // /////////////////////////////////// + + // pub(crate) async fn processor_connect_unix>( + // path: P, + // ) -> io::Result> { + // let sock = socket2::Socket::new(socket2::Domain::unix(), socket2::Type::stream(), None)?; + // let sockaddr = socket2::SockAddr::unix(path)?; + + // sock.set_nonblocking(true)?; + + // // FIXME: (vcq): iou uses nix, i use socket2, conversions happens over libc. + // // Propose std conversion for nix. + // let nixsaddr = unsafe { + // &iou::SockAddr::from_libc_sockaddr(sock.local_addr().unwrap().as_ptr()).unwrap() + // }; + + // let stream = sock.into_unix_stream(); + // let fd = stream.as_raw_fd() as _; + + // Proactor::get() + // .inner() + // .register_io(|sqe| unsafe { sqe.prep_connect(fd, nixsaddr) })? + // .await?; + + // Ok(Handle::new(stream)?) + // } + + // pub(crate) async fn processor_send_to_unix>( + // socket: &R, + // buf: &[u8], + // path: P, + // ) -> io::Result { + // Self::send_to_dest(socket, buf, &socket2::SockAddr::unix(path)?).await + // } + + // pub(crate) async fn processor_recv_from_unix( + // socket: &R, + // buf: &mut [u8], + // ) -> io::Result<(usize, UnixSocketAddr)> { + // Self::recv_from_with_flags(socket, buf, 0) + // .await + // .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) + // } + + // pub(crate) async fn processor_peek_from_unix( + // socket: &R, + // buf: &mut [u8], + // ) -> io::Result<(usize, UnixSocketAddr)> { + // Self::recv_from_with_flags(socket, buf, libc::MSG_PEEK as _) + // .await + // .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) + // } +} diff --git a/src/syscore/windows/mod.rs b/src/syscore/windows/mod.rs index ec0838b..2b63a28 100644 --- a/src/syscore/windows/mod.rs +++ b/src/syscore/windows/mod.rs @@ -1,2 +1,2 @@ -mod nethandle; -mod iocp; +pub mod iocp; +pub mod nethandle; diff --git a/src/syscore/windows/nethandle.rs b/src/syscore/windows/nethandle.rs index e69de29..b53a645 100644 --- a/src/syscore/windows/nethandle.rs +++ b/src/syscore/windows/nethandle.rs @@ -0,0 +1,3 @@ +struct SysProactor {} + +impl SysProactor {} diff --git a/src/utils.rs b/src/utils.rs index e69de29..8b13789 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -0,0 +1 @@ + diff --git a/src/waker/mod.rs b/src/waker/mod.rs index fb9fd8b..7871fda 100644 --- a/src/waker/mod.rs +++ b/src/waker/mod.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; use core::mem::{self, ManuallyDrop}; use core::task::{RawWaker, RawWakerVTable, Waker}; +use std::sync::Arc; /// Creates a waker from a wake function. ///