diff --git a/examples/fread.rs b/examples/fread.rs index b8e4e91..6661d40 100644 --- a/examples/fread.rs +++ b/examples/fread.rs @@ -1,13 +1,12 @@ use nuclei::*; -use std::io; -use std::time::Duration; use std::fs::File; +use std::io; use std::path::PathBuf; +use std::time::Duration; use futures::AsyncRead; use futures_util::io::AsyncReadExt; - fn main() -> io::Result<()> { let x = drive(async { let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); @@ -23,4 +22,4 @@ fn main() -> io::Result<()> { println!("{}", x); Ok(()) -} \ No newline at end of file +} diff --git a/examples/h1-server.rs b/examples/h1-server.rs index 0d36dcd..a04152a 100644 --- a/examples/h1-server.rs +++ b/examples/h1-server.rs @@ -2,10 +2,10 @@ use nuclei::*; use std::net::TcpListener; use anyhow::Result; -use futures::prelude::*; -use http_types::{Request, Response, StatusCode}; use async_dup::Arc; use futures::pending; +use futures::prelude::*; +use http_types::{Request, Response, StatusCode}; /// Serves a request and returns a response. async fn serve(req: Request) -> http_types::Result { @@ -30,7 +30,7 @@ async fn listen(listener: Handle) -> Result<()> { // Spawn a background task serving this connection. let stream = Arc::new(stream); spawn(async move { - if let Err(err) = async_h1::accept( stream, serve).await { + if let Err(err) = async_h1::accept(stream, serve).await { println!("Connection error: {:#?}", err); } }); diff --git a/examples/tcp-server.rs b/examples/tcp-server.rs index ce954dd..7e884a5 100644 --- a/examples/tcp-server.rs +++ b/examples/tcp-server.rs @@ -1,6 +1,6 @@ +use futures::io; use nuclei::*; use std::net::{TcpListener, TcpStream}; -use futures::io; async fn echo(stream: Handle) -> io::Result<()> { io::copy(&stream, &mut &stream).await?; @@ -23,4 +23,4 @@ fn main() -> io::Result<()> { spawn_blocking(|| echo(stream)); } }) -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index 33891a0..f6c8192 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,11 @@ -mod runtime; +mod async_io; mod handle; +mod proactor; +mod runtime; mod submission_handler; -mod async_io; mod sys; -mod waker; -mod proactor; mod utils; +mod waker; #[cfg(not(any( target_os = "linux", // epoll, iouring @@ -21,7 +21,6 @@ mod utils; )))] compile_error!("Target OS is not supported"); - #[cfg(any( target_os = "macos", target_os = "ios", @@ -48,4 +47,4 @@ mod syscore { } pub use proactor::*; -pub use runtime::runtime::*; \ No newline at end of file +pub use runtime::runtime::*; diff --git a/src/runtime.rs b/src/runtime.rs index 24e3278..f7e460b 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -4,25 +4,25 @@ macro_rules! runtime_methods { use std::future::Future; pub fn spawn(future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, + where + F: Future + Send + 'static, + F::Output: Send + 'static, { RUNTIME.spawn(future) } pub fn spawn_blocking(task: F) -> JoinHandle - where - F: FnOnce() -> T + Send + 'static, - T: Send + 'static, + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, { RUNTIME.spawn_blocking(task) } pub fn block_on(future: F) -> F::Output - where - F: Future + Send + 'static, - F::Output: Send + 'static, + where + F: Future + Send + 'static, + F::Output: Send + 'static, { RUNTIME.block_on(future) } @@ -31,54 +31,50 @@ macro_rules! runtime_methods { #[cfg(feature = "bastion")] pub mod runtime { + use agnostik::executors::BastionExecutor; + use agnostik::{Agnostik, AgnostikExecutor}; use once_cell::sync::Lazy; - use agnostik::{AgnostikExecutor, Agnostik}; - use agnostik::executors::{BastionExecutor}; - static RUNTIME: Lazy = Lazy::new(|| unsafe { - std::mem::transmute(Agnostik::bastion()) - }); + static RUNTIME: Lazy = + Lazy::new(|| unsafe { std::mem::transmute(Agnostik::bastion()) }); runtime_methods!(); } #[cfg(feature = "tokio")] pub mod runtime { - use once_cell::sync::Lazy; - use agnostik::{Agnostik, LocalAgnostikExecutor}; use agnostik::executors::TokioExecutor; + use agnostik::{Agnostik, LocalAgnostikExecutor}; + use once_cell::sync::Lazy; - static RUNTIME: Lazy = Lazy::new(|| unsafe { - std::mem::transmute(Agnostik::tokio()) - }); + static RUNTIME: Lazy = + Lazy::new(|| unsafe { std::mem::transmute(Agnostik::tokio()) }); runtime_methods!(); } #[cfg(feature = "asyncstd")] pub mod runtime { - use once_cell::sync::Lazy; - use agnostik::{Agnostik, LocalAgnostikExecutor}; use agnostik::executors::AsyncStdExecutor; + use agnostik::{Agnostik, LocalAgnostikExecutor}; + use once_cell::sync::Lazy; - static RUNTIME: Lazy = Lazy::new(|| unsafe { - std::mem::transmute(Agnostik::async_std()) - }); + static RUNTIME: Lazy = + Lazy::new(|| unsafe { std::mem::transmute(Agnostik::async_std()) }); runtime_methods!(); } #[cfg(feature = "smol")] pub mod runtime { - use once_cell::sync::Lazy; - use agnostik::{Agnostik, LocalAgnostikExecutor}; use agnostik::executors::SmolExecutor; + use agnostik::{Agnostik, LocalAgnostikExecutor}; + use once_cell::sync::Lazy; - static RUNTIME: Lazy = Lazy::new(|| unsafe { - std::mem::transmute(Agnostik::smol()) - }); + static RUNTIME: Lazy = + Lazy::new(|| unsafe { std::mem::transmute(Agnostik::smol()) }); runtime_methods!(); } -pub use runtime::*; \ No newline at end of file +pub use runtime::*; diff --git a/src/submission_handler.rs b/src/submission_handler.rs index 4153a3a..27f29d1 100644 --- a/src/submission_handler.rs +++ b/src/submission_handler.rs @@ -1,23 +1,26 @@ -use futures::io::{ - AsyncRead, AsyncWrite, AsyncBufRead, AsyncSeek -}; -use std::marker::PhantomData as marker; -use std::{task::{Context, Poll}, io, pin::Pin, future::Future, ops::{DerefMut, Deref}}; use super::handle::{Handle, HandleOpRegisterer}; - +use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; +use std::marker::PhantomData as marker; +use std::{ + future::Future, + io, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, +}; pub struct SubmissionHandler(marker) where - T: Unpin; + T: Unpin; impl SubmissionHandler where - T: Unpin + HandleOpRegisterer + T: Unpin + HandleOpRegisterer, { pub fn handle_read( handle: Pin<&mut T>, cx: &mut Context, - completion_dispatcher: impl Future> + 'static + completion_dispatcher: impl Future> + 'static, ) -> Poll> { let handle = handle.get_mut(); let read_result = handle.read_registerer(); @@ -46,7 +49,7 @@ where pub fn handle_write( handle: Pin<&mut T>, cx: &mut Context, - completion_dispatcher: impl Future> + 'static + completion_dispatcher: impl Future> + 'static, ) -> Poll> { let handle = handle.get_mut(); let write_result = handle.write_registerer(); @@ -71,4 +74,4 @@ where poll } -} \ No newline at end of file +} diff --git a/src/syscore/bsd/kqueue.rs b/src/syscore/bsd/kqueue.rs index b365f21..d65f615 100644 --- a/src/syscore/bsd/kqueue.rs +++ b/src/syscore/bsd/kqueue.rs @@ -1,14 +1,14 @@ -use std::mem::MaybeUninit; -use std::io::{self, Read, Write}; -use std::os::unix::io::{AsRawFd, RawFd}; -use std::{fs::File, os::unix::net::UnixStream, collections::HashMap, time::Duration}; use crate::sys::event::{kevent_ts, kqueue, KEvent}; use futures::channel::oneshot; +use lever::prelude::*; use pin_utils::unsafe_pinned; use std::future::Future; +use std::io::{self, Read, Write}; +use std::mem::MaybeUninit; +use std::os::unix::io::{AsRawFd, RawFd}; use std::pin::Pin; use std::task::{Context, Poll}; -use lever::prelude::*; +use std::{collections::HashMap, fs::File, os::unix::net::UnixStream, time::Duration}; macro_rules! syscall { ($fn:ident $args:tt) => {{ @@ -25,8 +25,8 @@ macro_rules! syscall { /////////////////// use socket2::SockAddr; -use std::os::unix::net::{SocketAddr as UnixSocketAddr}; use std::mem; +use std::os::unix::net::SocketAddr as UnixSocketAddr; fn max_len() -> usize { // The maximum read limit on most posix-like systems is `SSIZE_MAX`, @@ -44,18 +44,22 @@ fn max_len() -> usize { } } -pub(crate) fn shim_recv_from(fd: A, buf: &mut [u8], flags: libc::c_int) -> io::Result<(usize, SockAddr)> { +pub(crate) fn shim_recv_from( + fd: A, + buf: &mut [u8], + flags: libc::c_int, +) -> io::Result<(usize, SockAddr)> { let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; let mut addrlen = mem::size_of_val(&storage) as libc::socklen_t; let n = syscall!(recvfrom( - fd.as_raw_fd() as _, - buf.as_mut_ptr() as *mut libc::c_void, - std::cmp::min(buf.len(), max_len()), - flags, - &mut storage as *mut _ as *mut _, - &mut addrlen, - ))?; + fd.as_raw_fd() as _, + buf.as_mut_ptr() as *mut libc::c_void, + std::cmp::min(buf.len(), max_len()), + flags, + &mut storage as *mut _ as *mut _, + &mut addrlen, + ))?; let addr = unsafe { SockAddr::from_raw_parts(&storage as *const _ as *const _, addrlen) }; Ok((n as usize, addr)) } @@ -78,7 +82,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result let abst_sock_ident: libc::c_char = unsafe { std::slice::from_raw_parts( &addr.sun_path as *const _ as *const u8, - mem::size_of::() + mem::size_of::(), ) }[1] as libc::c_char; @@ -88,7 +92,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result // https://man7.org/linux/man-pages/man7/unix.7.html (sa, 0) if sa != 0 && sa > mem::size_of::() as libc::socklen_t => { len = mem::size_of::() as libc::socklen_t; - }, + } // If unnamed socket, then addr is always zero, // assign the offset reserved difference as length. (0, _) => { @@ -96,7 +100,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result let path = &addr.sun_path as *const _ as usize; let sun_path_offset = path - base; len = sun_path_offset as libc::socklen_t; - }, + } // Discard rest, they are not special. (_, _) => {} @@ -109,7 +113,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result std::ptr::copy_nonoverlapping( sockaddr.as_ptr(), &mut init as *mut _ as *mut _, - len as usize + len as usize, ); // Safety: We've written the init addr above. @@ -142,7 +146,7 @@ pub struct SysProactor { registered: TTas>, /// Hashmap for holding interested concrete completion callbacks - completions: TTas> + completions: TTas>, } impl SysProactor { @@ -157,7 +161,7 @@ impl SysProactor { read_stream: TTas::new(read_stream), write_stream, registered: TTas::new(HashMap::new()), - completions: TTas::new(HashMap::new()) + completions: TTas::new(HashMap::new()), }; let mut rs = proactor.read_stream.lock(); @@ -232,12 +236,13 @@ impl SysProactor { }); let mut events: Vec = Vec::with_capacity(max_event_size); - events.resize(max_event_size, unsafe { MaybeUninit::zeroed().assume_init() }); + events.resize(max_event_size, unsafe { + MaybeUninit::zeroed().assume_init() + }); let mut events: Box<[KEvent]> = events.into_boxed_slice(); // dbg!("SENDING EVENT"); - let res = - kevent_ts(self.kqueue_fd, &[], &mut events, timeout)? as isize; + let res = kevent_ts(self.kqueue_fd, &[], &mut events, timeout)? as isize; // dbg!(res); // dbg!("EVENT FINISH"); @@ -304,9 +309,7 @@ impl SysProactor { } let (tx, rx) = oneshot::channel(); - let comp = completions - .entry(fd) - .or_insert(Vec::new()); + let comp = completions.entry(fd).or_insert(Vec::new()); comp.push((evts, tx)); @@ -356,11 +359,9 @@ impl SysProactor { if ack_removal { completions.remove(&fd); } - } } - ////////////////////////////// ////////////////////////////// diff --git a/src/syscore/bsd/mod.rs b/src/syscore/bsd/mod.rs index 0518ce4..d3722e5 100644 --- a/src/syscore/bsd/mod.rs +++ b/src/syscore/bsd/mod.rs @@ -1,6 +1,6 @@ mod kqueue; -mod processor; mod nethandle; +mod processor; pub(crate) use kqueue::*; pub(crate) use nethandle::*; diff --git a/src/syscore/bsd/nethandle.rs b/src/syscore/bsd/nethandle.rs index 1c15163..69429a7 100644 --- a/src/syscore/bsd/nethandle.rs +++ b/src/syscore/bsd/nethandle.rs @@ -1,22 +1,21 @@ use std::future::Future; use std::io; use std::marker::Unpin; -use std::sync::Arc; -use std::path::Path; use std::net::{SocketAddr, ToSocketAddrs}; use std::os::unix::io::AsRawFd; +use std::path::Path; +use std::sync::Arc; use std::net::{TcpListener, TcpStream, UdpSocket}; // Unix specifics -use std::os::unix::net::{UnixListener, UnixStream, UnixDatagram, SocketAddr as UnixSocketAddr}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; -use lever::sync::prelude::*; use futures::Stream; +use lever::sync::prelude::*; -use crate::{Handle, Proactor}; use super::Processor; use crate::syscore::CompletionChan; - +use crate::{Handle, Proactor}; impl Handle { pub fn new(io: T) -> io::Result> { @@ -31,9 +30,7 @@ impl Handle { pub(crate) fn new_with_callback(io: T, evflags: usize) -> io::Result> { let fd = io.as_raw_fd(); let mut handle = Handle::new(io)?; - let register = Proactor::get() - .inner() - .register_io(fd, evflags)?; + let register = Proactor::get().inner().register_io(fd, evflags)?; handle.chan = Some(register); Ok(handle) } @@ -53,10 +50,13 @@ impl Handle { pub fn incoming( &self, ) -> impl Stream>> + Send + Unpin + '_ { - Box::pin(futures::stream::unfold(self, |listener: &Handle| async move { - let res = listener.accept().await.map(|(stream, _)| stream); - Some((res, listener)) - })) + Box::pin(futures::stream::unfold( + self, + |listener: &Handle| async move { + let res = listener.accept().await.map(|(stream, _)| stream); + Some((res, listener)) + }, + )) } } @@ -101,8 +101,7 @@ impl Handle { pub async fn send_to(&self, buf: &[u8], addr: A) -> io::Result { match addr.to_socket_addrs()?.next() { - Some(addr) => - Processor::processor_send_to(self.get_ref(), buf, addr).await, + Some(addr) => Processor::processor_send_to(self.get_ref(), buf, addr).await, None => Err(io::Error::new( io::ErrorKind::InvalidData, "given addresses can't be parsed", @@ -131,10 +130,13 @@ impl Handle { pub fn incoming( &self, ) -> impl Stream>> + Send + Unpin + '_ { - Box::pin(futures::stream::unfold(self, |listener: &Handle| async move { - let res = listener.accept().await.map(|(stream, _)| stream); - Some((res, listener)) - })) + Box::pin(futures::stream::unfold( + self, + |listener: &Handle| async move { + let res = listener.accept().await.map(|(stream, _)| stream); + Some((res, listener)) + }, + )) } } @@ -204,4 +206,4 @@ impl Handle { pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { Processor::processor_peek_from_unix(self.get_ref(), buf).await } -} \ No newline at end of file +} diff --git a/src/syscore/bsd/processor.rs b/src/syscore/bsd/processor.rs index fd1d8ac..ff68c49 100644 --- a/src/syscore/bsd/processor.rs +++ b/src/syscore/bsd/processor.rs @@ -1,18 +1,20 @@ +use std::future::Future; use std::io; use std::io::{Read, Write}; -use std::{fs::File, os::unix::io::{AsRawFd, FromRawFd}, mem::ManuallyDrop}; -use std::net::{SocketAddr, ToSocketAddrs, TcpListener}; -use std::os::unix::net::{ - SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream, -}; -use std::net::{SocketAddrV6, SocketAddrV4, Ipv4Addr, Ipv6Addr, UdpSocket}; -use std::future::Future; -use std::path::Path; use std::net::TcpStream; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, UdpSocket}; +use std::net::{SocketAddr, TcpListener, ToSocketAddrs}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; +use std::path::Path; +use std::{ + fs::File, + mem::ManuallyDrop, + os::unix::io::{AsRawFd, FromRawFd}, +}; use crate::proactor::Proactor; -use crate::Handle; use crate::syscore::shim_to_af_unix; +use crate::Handle; pub struct Processor; @@ -22,7 +24,10 @@ impl Processor { ///// Synchronous File /////////////////////////////////// - pub(crate) async fn processor_read_file(io: &R, buf: &mut [u8]) -> io::Result { + pub(crate) async fn processor_read_file( + io: &R, + buf: &mut [u8], + ) -> io::Result { // TODO: (vertexclique): Use blocking here. let mut file = unsafe { File::from_raw_fd(io.as_raw_fd()) }; let res = file.read(buf); @@ -107,10 +112,13 @@ impl Processor { ///// Commonality of TcpStream, UdpSocket /////////////////////////////////// - pub(crate) async fn processor_connect(addrs: A, mut f: F) -> io::Result - where - F: FnMut(SocketAddr) -> Fut, - Fut: Future>, + pub(crate) async fn processor_connect( + addrs: A, + mut f: F, + ) -> io::Result + where + F: FnMut(SocketAddr) -> Fut, + Fut: Future>, { // TODO connect_tcp, connect_udp let addrs = match addrs.to_socket_addrs() { @@ -127,10 +135,7 @@ impl Processor { } Err(tail_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "couldn't resolve addresses", - ) + io::Error::new(io::ErrorKind::InvalidInput, "couldn't resolve addresses") })) } @@ -147,7 +152,11 @@ impl Processor { } else { socket2::Domain::ipv4() }; - let sock = socket2::Socket::new(domain, socket2::Type::stream(), Some(socket2::Protocol::tcp()))?; + let sock = socket2::Socket::new( + domain, + socket2::Type::stream(), + Some(socket2::Protocol::tcp()), + )?; // Begin async connect and ignore the inevitable "in progress" error. sock.set_nonblocking(true)?; @@ -177,7 +186,11 @@ impl Processor { SocketAddr::V4(_) => socket2::Domain::ipv4(), SocketAddr::V6(_) => socket2::Domain::ipv6(), }; - let sock = socket2::Socket::new(domain, socket2::Type::dgram(), Some(socket2::Protocol::udp()))?; + let sock = socket2::Socket::new( + domain, + socket2::Type::dgram(), + Some(socket2::Protocol::udp()), + )?; let sockaddr = socket2::SockAddr::from(addr); let unspec = match addr { @@ -206,15 +219,18 @@ impl Processor { ///// TcpListener /////////////////////////////////// - pub(crate) async fn processor_accept_tcp_listener(listener: &R) -> io::Result<(Handle, SocketAddr)> { + pub(crate) async fn processor_accept_tcp_listener( + listener: &R, + ) -> io::Result<(Handle, SocketAddr)> { let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; let socket = socket.into_tcp_listener(); let socket = ManuallyDrop::new(socket); // Reregister on block match socket - .accept() - .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) { + .accept() + .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) + { Ok(res) => Ok(res), Err(err) if err.kind() == io::ErrorKind::WouldBlock => { let cc = Proactor::get() @@ -246,7 +262,11 @@ impl Processor { Self::send_to_dest(socket, buf, &socket2::SockAddr::from(addr)).await } - async fn send_to_dest(socket: &A, buf: &[u8], addr: &socket2::SockAddr) -> io::Result { + async fn send_to_dest( + socket: &A, + buf: &[u8], + addr: &socket2::SockAddr, + ) -> io::Result { let sock = unsafe { socket2::Socket::from_raw_fd(socket.as_raw_fd()) }; let sock = ManuallyDrop::new(sock); @@ -269,13 +289,19 @@ impl Processor { } } - pub(crate) async fn processor_recv_from(sock: &R, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub(crate) async fn processor_recv_from( + sock: &R, + buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { Self::recv_from_with_flags(sock, buf, 0) .await .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) } - pub(crate) async fn processor_peek_from(sock: &R, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub(crate) async fn processor_peek_from( + sock: &R, + buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { Self::recv_from_with_flags(sock, buf, libc::MSG_PEEK as _) .await .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) @@ -291,8 +317,8 @@ impl Processor { // let sock = ManuallyDrop::new(sock); // Reregister on block - match super::shim_recv_from(sock, buf, flags as _) - .map(|(size, sockaddr)| (size, sockaddr)) { + match super::shim_recv_from(sock, buf, flags as _).map(|(size, sockaddr)| (size, sockaddr)) + { Ok(res) => Ok(res), Err(err) if err.kind() == io::ErrorKind::WouldBlock => { let cc = Proactor::get() @@ -318,7 +344,9 @@ impl Processor { ///// UnixListener /////////////////////////////////// - pub(crate) async fn processor_accept_unix_listener(listener: &R) -> io::Result<(Handle, UnixSocketAddr)> { + pub(crate) async fn processor_accept_unix_listener( + listener: &R, + ) -> io::Result<(Handle, UnixSocketAddr)> { let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; let socket = socket.into_unix_listener(); let socket = ManuallyDrop::new(socket); @@ -326,7 +354,8 @@ impl Processor { // Reregister on block match socket .accept() - .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) { + .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) + { Ok(res) => Ok(res), Err(err) if err.kind() == io::ErrorKind::WouldBlock => { let cc = Proactor::get() @@ -350,7 +379,9 @@ impl Processor { ///// UnixStream /////////////////////////////////// - pub(crate) async fn processor_connect_unix>(path: P) -> io::Result> { + pub(crate) async fn processor_connect_unix>( + path: P, + ) -> io::Result> { let sock = socket2::Socket::new(socket2::Domain::unix(), socket2::Type::stream(), None)?; let sockaddr = socket2::SockAddr::unix(path)?; @@ -381,19 +412,29 @@ impl Processor { res.map(|_| stream) } - pub(crate) async fn processor_send_to_unix>(socket: &R, buf: &[u8], path: P) -> io::Result { + pub(crate) async fn processor_send_to_unix>( + socket: &R, + buf: &[u8], + path: P, + ) -> io::Result { Self::send_to_dest(socket, buf, &socket2::SockAddr::unix(path)?).await } - pub(crate) async fn processor_recv_from_unix(socket: &R, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { + pub(crate) async fn processor_recv_from_unix( + socket: &R, + buf: &mut [u8], + ) -> io::Result<(usize, UnixSocketAddr)> { Self::recv_from_with_flags(socket, buf, 0) .await .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) } - pub(crate) async fn processor_peek_from_unix(socket: &R, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { + pub(crate) async fn processor_peek_from_unix( + socket: &R, + buf: &mut [u8], + ) -> io::Result<(usize, UnixSocketAddr)> { Self::recv_from_with_flags(socket, buf, libc::MSG_PEEK as _) .await .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) } -} \ No newline at end of file +} diff --git a/src/syscore/linux/epoll/epoll.rs b/src/syscore/linux/epoll/epoll.rs index f41afde..4943f03 100644 --- a/src/syscore/linux/epoll/epoll.rs +++ b/src/syscore/linux/epoll/epoll.rs @@ -1,14 +1,14 @@ -use std::mem::MaybeUninit; -use std::io::{self, Read, Write}; -use std::os::unix::io::{AsRawFd, RawFd, FromRawFd}; -use std::{fs::File, os::unix::net::UnixStream, collections::HashMap, time::Duration}; +use crate::sys::epoll::*; use futures::channel::oneshot; +use lever::prelude::*; use pin_utils::unsafe_pinned; use std::future::Future; +use std::io::{self, Read, Write}; +use std::mem::MaybeUninit; +use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use std::pin::Pin; use std::task::{Context, Poll}; -use crate::sys::epoll::*; -use lever::prelude::*; +use std::{collections::HashMap, fs::File, os::unix::net::UnixStream, time::Duration}; macro_rules! syscall { ($fn:ident $args:tt) => {{ @@ -25,8 +25,8 @@ macro_rules! syscall { /////////////////// use socket2::SockAddr; -use std::os::unix::net::{SocketAddr as UnixSocketAddr}; use std::mem; +use std::os::unix::net::SocketAddr as UnixSocketAddr; fn max_len() -> usize { // The maximum read limit on most posix-like systems is `SSIZE_MAX`, @@ -44,18 +44,22 @@ fn max_len() -> usize { } } -pub(crate) fn shim_recv_from(fd: A, buf: &mut [u8], flags: libc::c_int) -> io::Result<(usize, SockAddr)> { +pub(crate) fn shim_recv_from( + fd: A, + buf: &mut [u8], + flags: libc::c_int, +) -> io::Result<(usize, SockAddr)> { let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; let mut addrlen = mem::size_of_val(&storage) as libc::socklen_t; let n = syscall!(recvfrom( - fd.as_raw_fd() as _, - buf.as_mut_ptr() as *mut libc::c_void, - std::cmp::min(buf.len(), max_len()), - flags, - &mut storage as *mut _ as *mut _, - &mut addrlen, - ))?; + fd.as_raw_fd() as _, + buf.as_mut_ptr() as *mut libc::c_void, + std::cmp::min(buf.len(), max_len()), + flags, + &mut storage as *mut _ as *mut _, + &mut addrlen, + ))?; let addr = unsafe { SockAddr::from_raw_parts(&storage as *const _ as *const _, addrlen) }; Ok((n as usize, addr)) } @@ -78,7 +82,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result let abst_sock_ident: libc::c_char = unsafe { std::slice::from_raw_parts( &addr.sun_path as *const _ as *const u8, - mem::size_of::() + mem::size_of::(), ) }[1] as libc::c_char; @@ -88,7 +92,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result // https://man7.org/linux/man-pages/man7/unix.7.html (sa, 0) if sa != 0 && sa > mem::size_of::() as libc::socklen_t => { len = mem::size_of::() as libc::socklen_t; - }, + } // If unnamed socket, then addr is always zero, // assign the offset reserved difference as length. (0, _) => { @@ -96,7 +100,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result let path = &addr.sun_path as *const _ as usize; let sun_path_offset = path - base; len = sun_path_offset as libc::socklen_t; - }, + } // Discard rest, they are not special. (_, _) => {} @@ -109,7 +113,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result std::ptr::copy_nonoverlapping( sockaddr.as_ptr(), &mut init as *mut _ as *mut _, - len as usize + len as usize, ); // Safety: We've written the init addr above. @@ -139,7 +143,7 @@ pub struct SysProactor { registered: TTas>, /// Hashmap for holding interested concrete completion callbacks - completions: TTas> + completions: TTas>, } impl SysProactor { @@ -152,11 +156,16 @@ impl SysProactor { epoll_fd, event_fd: TTas::new(event_fd), registered: TTas::new(HashMap::new()), - completions: TTas::new(HashMap::new()) + completions: TTas::new(HashMap::new()), }; let ev = &mut EpollEvent::new(libc::EPOLLIN as _, 0 as u64); - epoll_ctl(proactor.epoll_fd, EpollOp::EpollCtlAdd, event_fd_raw, Some(ev))?; + epoll_ctl( + proactor.epoll_fd, + EpollOp::EpollCtlAdd, + event_fd_raw, + Some(ev), + )?; Ok(proactor) } @@ -186,7 +195,9 @@ impl SysProactor { pub fn wait(&self, max_event_size: usize, timeout: Option) -> io::Result { // dbg!("WAIT"); let mut events: Vec = Vec::with_capacity(max_event_size); - events.resize(max_event_size, unsafe { MaybeUninit::zeroed().assume_init() }); + events.resize(max_event_size, unsafe { + MaybeUninit::zeroed().assume_init() + }); let timeout: isize = timeout.map_or(!0, |d| d.as_millis() as isize); let mut res = epoll_wait(self.epoll_fd, &mut events, timeout)? as usize; @@ -230,9 +241,7 @@ impl SysProactor { } let (tx, rx) = oneshot::channel(); - let comp = completions - .entry(fd) - .or_insert(Vec::new()); + let comp = completions.entry(fd).or_insert(Vec::new()); comp.push((events, tx)); diff --git a/src/syscore/linux/epoll/mod.rs b/src/syscore/linux/epoll/mod.rs index dcf1f32..ed8af30 100644 --- a/src/syscore/linux/epoll/mod.rs +++ b/src/syscore/linux/epoll/mod.rs @@ -1,6 +1,6 @@ mod epoll; -mod processor; mod nethandle; +mod processor; pub(crate) use epoll::*; pub(crate) use nethandle::*; diff --git a/src/syscore/linux/epoll/nethandle.rs b/src/syscore/linux/epoll/nethandle.rs index 465b838..f9df0da 100644 --- a/src/syscore/linux/epoll/nethandle.rs +++ b/src/syscore/linux/epoll/nethandle.rs @@ -1,22 +1,21 @@ use std::future::Future; use std::io; use std::marker::Unpin; -use std::sync::Arc; -use std::path::Path; use std::net::{SocketAddr, ToSocketAddrs}; use std::os::unix::io::AsRawFd; +use std::path::Path; +use std::sync::Arc; use std::net::{TcpListener, TcpStream, UdpSocket}; // Unix specifics -use std::os::unix::net::{UnixListener, UnixStream, UnixDatagram, SocketAddr as UnixSocketAddr}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; -use lever::sync::prelude::*; use futures::Stream; +use lever::sync::prelude::*; -use crate::{Handle, Proactor}; use super::Processor; use crate::syscore::CompletionChan; - +use crate::{Handle, Proactor}; impl Handle { pub fn new(io: T) -> io::Result> { @@ -31,9 +30,7 @@ impl Handle { pub(crate) fn new_with_callback(io: T, evflags: i32) -> io::Result> { let fd = io.as_raw_fd(); let mut handle = Handle::new(io)?; - let register = Proactor::get() - .inner() - .register_io(fd, evflags)?; + let register = Proactor::get().inner().register_io(fd, evflags)?; handle.chan = Some(register); Ok(handle) } @@ -53,10 +50,13 @@ impl Handle { pub fn incoming( &self, ) -> impl Stream>> + Send + Unpin + '_ { - Box::pin(futures::stream::unfold(self, |listener: &Handle| async move { - let res = listener.accept().await.map(|(stream, _)| stream); - Some((res, listener)) - })) + Box::pin(futures::stream::unfold( + self, + |listener: &Handle| async move { + let res = listener.accept().await.map(|(stream, _)| stream); + Some((res, listener)) + }, + )) } } @@ -101,8 +101,7 @@ impl Handle { pub async fn send_to(&self, buf: &[u8], addr: A) -> io::Result { match addr.to_socket_addrs()?.next() { - Some(addr) => - Processor::processor_send_to(self.get_ref(), buf, addr).await, + Some(addr) => Processor::processor_send_to(self.get_ref(), buf, addr).await, None => Err(io::Error::new( io::ErrorKind::InvalidData, "given addresses can't be parsed", @@ -131,10 +130,13 @@ impl Handle { pub fn incoming( &self, ) -> impl Stream>> + Send + Unpin + '_ { - Box::pin(futures::stream::unfold(self, |listener: &Handle| async move { - let res = listener.accept().await.map(|(stream, _)| stream); - Some((res, listener)) - })) + Box::pin(futures::stream::unfold( + self, + |listener: &Handle| async move { + let res = listener.accept().await.map(|(stream, _)| stream); + Some((res, listener)) + }, + )) } } @@ -204,4 +206,4 @@ impl Handle { pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { Processor::processor_peek_from_unix(self.get_ref(), buf).await } -} \ No newline at end of file +} diff --git a/src/syscore/linux/epoll/processor.rs b/src/syscore/linux/epoll/processor.rs index fb44b95..dd8d4be 100644 --- a/src/syscore/linux/epoll/processor.rs +++ b/src/syscore/linux/epoll/processor.rs @@ -1,18 +1,20 @@ +use std::future::Future; use std::io; use std::io::{Read, Write}; -use std::{fs::File, os::unix::io::{AsRawFd, FromRawFd}, mem::ManuallyDrop}; -use std::net::{SocketAddr, ToSocketAddrs, TcpListener}; -use std::os::unix::net::{ - SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream, -}; -use std::net::{SocketAddrV6, SocketAddrV4, Ipv4Addr, Ipv6Addr, UdpSocket}; -use std::future::Future; -use std::path::Path; use std::net::TcpStream; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, UdpSocket}; +use std::net::{SocketAddr, TcpListener, ToSocketAddrs}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; +use std::path::Path; +use std::{ + fs::File, + mem::ManuallyDrop, + os::unix::io::{AsRawFd, FromRawFd}, +}; use crate::proactor::Proactor; -use crate::Handle; use crate::syscore::shim_to_af_unix; +use crate::Handle; pub struct Processor; @@ -22,7 +24,10 @@ impl Processor { ///// Synchronous File /////////////////////////////////// - pub(crate) async fn processor_read_file(io: &R, buf: &mut [u8]) -> io::Result { + pub(crate) async fn processor_read_file( + io: &R, + buf: &mut [u8], + ) -> io::Result { // TODO: (vertexclique): Use blocking here. let mut file = unsafe { File::from_raw_fd(io.as_raw_fd()) }; let res = file.read(buf); @@ -105,10 +110,13 @@ impl Processor { ///// Commonality of TcpStream, UdpSocket /////////////////////////////////// - pub(crate) async fn processor_connect(addrs: A, mut f: F) -> io::Result - where - F: FnMut(SocketAddr) -> Fut, - Fut: Future>, + pub(crate) async fn processor_connect( + addrs: A, + mut f: F, + ) -> io::Result + where + F: FnMut(SocketAddr) -> Fut, + Fut: Future>, { // TODO connect_tcp, connect_udp let addrs = match addrs.to_socket_addrs() { @@ -125,10 +133,7 @@ impl Processor { } Err(tail_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "couldn't resolve addresses", - ) + io::Error::new(io::ErrorKind::InvalidInput, "couldn't resolve addresses") })) } @@ -145,7 +150,11 @@ impl Processor { } else { socket2::Domain::ipv4() }; - let sock = socket2::Socket::new(domain, socket2::Type::stream(), Some(socket2::Protocol::tcp()))?; + let sock = socket2::Socket::new( + domain, + socket2::Type::stream(), + Some(socket2::Protocol::tcp()), + )?; // Begin async connect and ignore the inevitable "in progress" error. sock.set_nonblocking(true)?; @@ -175,7 +184,11 @@ impl Processor { SocketAddr::V4(_) => socket2::Domain::ipv4(), SocketAddr::V6(_) => socket2::Domain::ipv6(), }; - let sock = socket2::Socket::new(domain, socket2::Type::dgram(), Some(socket2::Protocol::udp()))?; + let sock = socket2::Socket::new( + domain, + socket2::Type::dgram(), + Some(socket2::Protocol::udp()), + )?; let sockaddr = socket2::SockAddr::from(addr); let unspec = match addr { @@ -204,7 +217,9 @@ impl Processor { ///// TcpListener /////////////////////////////////// - pub(crate) async fn processor_accept_tcp_listener(listener: &R) -> io::Result<(Handle, SocketAddr)> { + pub(crate) async fn processor_accept_tcp_listener( + listener: &R, + ) -> io::Result<(Handle, SocketAddr)> { let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; let socket = socket.into_tcp_listener(); let socket = ManuallyDrop::new(socket); @@ -212,7 +227,8 @@ impl Processor { // Reregister on block match socket .accept() - .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) { + .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) + { Ok(res) => Ok(res), Err(err) if (err.raw_os_error().unwrap() & (libc::EAGAIN | libc::EWOULDBLOCK)) != 0 => { let cc = Proactor::get() @@ -243,7 +259,11 @@ impl Processor { Self::send_to_dest(socket, buf, &socket2::SockAddr::from(addr)).await } - async fn send_to_dest(socket: &A, buf: &[u8], addr: &socket2::SockAddr) -> io::Result { + async fn send_to_dest( + socket: &A, + buf: &[u8], + addr: &socket2::SockAddr, + ) -> io::Result { let sock = unsafe { socket2::Socket::from_raw_fd(socket.as_raw_fd()) }; let sock = ManuallyDrop::new(sock); @@ -265,13 +285,19 @@ impl Processor { } } - pub(crate) async fn processor_recv_from(sock: &R, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub(crate) async fn processor_recv_from( + sock: &R, + buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { Self::recv_from_with_flags(sock, buf, 0) .await .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) } - pub(crate) async fn processor_peek_from(sock: &R, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub(crate) async fn processor_peek_from( + sock: &R, + buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { Self::recv_from_with_flags(sock, buf, libc::MSG_PEEK as _) .await .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) @@ -287,8 +313,8 @@ impl Processor { // let sock = ManuallyDrop::new(sock); // Reregister on block - match super::shim_recv_from(sock, buf, flags as _) - .map(|(size, sockaddr)| (size, sockaddr)) { + match super::shim_recv_from(sock, buf, flags as _).map(|(size, sockaddr)| (size, sockaddr)) + { Ok(res) => Ok(res), Err(err) if (err.raw_os_error().unwrap() & (libc::EAGAIN | libc::EWOULDBLOCK)) != 0 => { let cc = Proactor::get() @@ -313,7 +339,9 @@ impl Processor { ///// UnixListener /////////////////////////////////// - pub(crate) async fn processor_accept_unix_listener(listener: &R) -> io::Result<(Handle, UnixSocketAddr)> { + pub(crate) async fn processor_accept_unix_listener( + listener: &R, + ) -> io::Result<(Handle, UnixSocketAddr)> { let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; let socket = socket.into_unix_listener(); let socket = ManuallyDrop::new(socket); @@ -321,7 +349,8 @@ impl Processor { // Reregister on block match socket .accept() - .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) { + .map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr)) + { Ok(res) => Ok(res), Err(err) if (err.raw_os_error().unwrap() & (libc::EAGAIN | libc::EWOULDBLOCK)) != 0 => { let cc = Proactor::get() @@ -344,7 +373,9 @@ impl Processor { ///// UnixStream /////////////////////////////////// - pub(crate) async fn processor_connect_unix>(path: P) -> io::Result> { + pub(crate) async fn processor_connect_unix>( + path: P, + ) -> io::Result> { let sock = socket2::Socket::new(socket2::Domain::unix(), socket2::Type::stream(), None)?; let sockaddr = socket2::SockAddr::unix(path)?; @@ -374,19 +405,29 @@ impl Processor { res.map(|_| stream) } - pub(crate) async fn processor_send_to_unix>(socket: &R, buf: &[u8], path: P) -> io::Result { + pub(crate) async fn processor_send_to_unix>( + socket: &R, + buf: &[u8], + path: P, + ) -> io::Result { Self::send_to_dest(socket, buf, &socket2::SockAddr::unix(path)?).await } - pub(crate) async fn processor_recv_from_unix(socket: &R, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { + pub(crate) async fn processor_recv_from_unix( + socket: &R, + buf: &mut [u8], + ) -> io::Result<(usize, UnixSocketAddr)> { Self::recv_from_with_flags(socket, buf, 0) .await .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) } - pub(crate) async fn processor_peek_from_unix(socket: &R, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { + pub(crate) async fn processor_peek_from_unix( + socket: &R, + buf: &mut [u8], + ) -> io::Result<(usize, UnixSocketAddr)> { Self::recv_from_with_flags(socket, buf, libc::MSG_PEEK as _) .await .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) } -} \ No newline at end of file +} diff --git a/src/syscore/linux/iouring/iouring.rs b/src/syscore/linux/iouring/iouring.rs index 8b7c274..5ed7da0 100644 --- a/src/syscore/linux/iouring/iouring.rs +++ b/src/syscore/linux/iouring/iouring.rs @@ -1,16 +1,19 @@ -use std::io; -use iou::{IoUring, SubmissionQueue, CompletionQueue, SubmissionQueueEvent, CompletionQueueEvent, Registrar}; -use lever::sync::prelude::*; -use std::collections::HashMap; +use core::mem::MaybeUninit; use futures::channel::oneshot; +use iou::{ + CompletionQueue, CompletionQueueEvent, IoUring, Registrar, SubmissionQueue, + SubmissionQueueEvent, +}; +use lever::sync::prelude::*; use pin_utils::unsafe_pinned; +use std::collections::HashMap; use std::future::Future; -use core::mem::MaybeUninit; -use std::os::unix::io::{AsRawFd, RawFd, FromRawFd}; +use std::io; +use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::task::{Context, Poll}; use std::time::Duration; -use std::sync::atomic::{AtomicU64, Ordering, AtomicBool}; macro_rules! syscall { ($fn:ident $args:tt) => {{ @@ -26,11 +29,11 @@ macro_rules! syscall { /////////////////// /////////////////// -use socket2::SockAddr; -use std::os::unix::net::{SocketAddr as UnixSocketAddr}; -use std::mem; use crate::Proactor; use once_cell::sync::Lazy; +use socket2::SockAddr; +use std::mem; +use std::os::unix::net::SocketAddr as UnixSocketAddr; fn max_len() -> usize { // The maximum read limit on most posix-like systems is `SSIZE_MAX`, @@ -48,18 +51,22 @@ fn max_len() -> usize { } } -pub(crate) fn shim_recv_from(fd: A, buf: &mut [u8], flags: libc::c_int) -> io::Result<(usize, SockAddr)> { +pub(crate) fn shim_recv_from( + fd: A, + buf: &mut [u8], + flags: libc::c_int, +) -> io::Result<(usize, SockAddr)> { let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; let mut addrlen = mem::size_of_val(&storage) as libc::socklen_t; let n = syscall!(recvfrom( - fd.as_raw_fd() as _, - buf.as_mut_ptr() as *mut libc::c_void, - std::cmp::min(buf.len(), max_len()), - flags, - &mut storage as *mut _ as *mut _, - &mut addrlen, - ))?; + fd.as_raw_fd() as _, + buf.as_mut_ptr() as *mut libc::c_void, + std::cmp::min(buf.len(), max_len()), + flags, + &mut storage as *mut _ as *mut _, + &mut addrlen, + ))?; let addr = unsafe { SockAddr::from_raw_parts(&storage as *const _ as *const _, addrlen) }; Ok((n as usize, addr)) } @@ -82,7 +89,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result let abst_sock_ident: libc::c_char = unsafe { std::slice::from_raw_parts( &addr.sun_path as *const _ as *const u8, - mem::size_of::() + mem::size_of::(), ) }[1] as libc::c_char; @@ -92,7 +99,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result // https://man7.org/linux/man-pages/man7/unix.7.html (sa, 0) if sa != 0 && sa > mem::size_of::() as libc::socklen_t => { len = mem::size_of::() as libc::socklen_t; - }, + } // If unnamed socket, then addr is always zero, // assign the offset reserved difference as length. (0, _) => { @@ -100,7 +107,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result let path = &addr.sun_path as *const _ as usize; let sun_path_offset = path - base; len = sun_path_offset as libc::socklen_t; - }, + } // Discard rest, they are not special. (_, _) => {} @@ -113,7 +120,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result std::ptr::copy_nonoverlapping( sockaddr.as_ptr(), &mut init as *mut _ as *mut _, - len as usize + len as usize, ); // Safety: We've written the init addr above. @@ -141,7 +148,11 @@ pub struct SysProactor { waker: AtomicBool, } -pub type RingTypes = (SubmissionQueue<'static>, CompletionQueue<'static>, Registrar<'static>); +pub type RingTypes = ( + SubmissionQueue<'static>, + CompletionQueue<'static>, + Registrar<'static>, +); static mut IO_URING: Option = None; @@ -156,12 +167,16 @@ impl SysProactor { cq: TTas::new(cq), submitters: TTas::new(HashMap::default()), submitter_id: AtomicU64::default(), - waker: AtomicBool::default() + waker: AtomicBool::default(), }) } } - fn submitter(&self, sq: &mut SubmissionQueue<'_>, mut ring_sub: impl FnMut(&mut SubmissionQueueEvent<'_>) -> T) -> Option { + fn submitter( + &self, + sq: &mut SubmissionQueue<'_>, + mut ring_sub: impl FnMut(&mut SubmissionQueueEvent<'_>) -> T, + ) -> Option { // dbg!("SUBMITTER"); let mut sqe = match sq.next_sqe() { Some(sqe) => sqe, @@ -176,37 +191,41 @@ impl SysProactor { Some(ring_sub(&mut sqe)) } - - pub(crate) fn register_io(&self, mut io_submit: impl FnMut(&mut SubmissionQueueEvent<'_>)) -> io::Result { + pub(crate) fn register_io( + &self, + mut io_submit: impl FnMut(&mut SubmissionQueueEvent<'_>), + ) -> io::Result { // dbg!("REGISTER IO"); let sub_comp = { let mut sq = self.sq.lock(); - let cc = self.submitter(&mut sq, |sqe| { - // dbg!("SUBMITTER"); - let mut id = self.submitter_id.fetch_add(1, Ordering::Relaxed); - if id == MANUAL_TIMEOUT { - id = self.submitter_id.fetch_add(2, Ordering::Relaxed) + 2; - } - let (tx, rx) = oneshot::channel(); - - // dbg!("SUBMITTER", id); - io_submit(sqe); - sqe.set_user_data(id); - - { - let mut subguard = self.submitters.lock(); - subguard.insert(id, tx); - // dbg!("INSERTED", id); - } - - CompletionChan { rx } - }).map(|c| unsafe { - let submitted_io_evcount = sq.submit().unwrap(); - // dbg!(submitted_io_evcount); - - c - }); + let cc = self + .submitter(&mut sq, |sqe| { + // dbg!("SUBMITTER"); + let mut id = self.submitter_id.fetch_add(1, Ordering::Relaxed); + if id == MANUAL_TIMEOUT { + id = self.submitter_id.fetch_add(2, Ordering::Relaxed) + 2; + } + let (tx, rx) = oneshot::channel(); + + // dbg!("SUBMITTER", id); + io_submit(sqe); + sqe.set_user_data(id); + + { + let mut subguard = self.submitters.lock(); + subguard.insert(id, tx); + // dbg!("INSERTED", id); + } + + CompletionChan { rx } + }) + .map(|c| unsafe { + let submitted_io_evcount = sq.submit().unwrap(); + // dbg!(submitted_io_evcount); + + c + }); cc }; @@ -220,7 +239,11 @@ impl SysProactor { Ok(()) } - pub(crate) fn wait(&self, max_event_size: usize, duration: Option) -> io::Result { + pub(crate) fn wait( + &self, + max_event_size: usize, + duration: Option, + ) -> io::Result { // dbg!("WAIT ENTER"); let mut cq = self.cq.lock(); let mut acquired = 0; @@ -259,11 +282,7 @@ impl SysProactor { acquired += 1; // dbg!("ACQUIRED", udata); - self.submitters.lock() - .remove(&udata) - .map(|s| { - s.send(res) - }); + self.submitters.lock().remove(&udata).map(|s| s.send(res)); Ok(()) } @@ -285,4 +304,4 @@ impl Future for CompletionChan { .poll(cx) .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "sender has been cancelled")) } -} \ No newline at end of file +} diff --git a/src/syscore/linux/iouring/mod.rs b/src/syscore/linux/iouring/mod.rs index 9189c98..03d94dc 100644 --- a/src/syscore/linux/iouring/mod.rs +++ b/src/syscore/linux/iouring/mod.rs @@ -1,6 +1,6 @@ mod iouring; -mod processor; mod nethandle; +mod processor; pub(crate) use iouring::*; pub(crate) use nethandle::*; diff --git a/src/syscore/linux/iouring/nethandle.rs b/src/syscore/linux/iouring/nethandle.rs index d308784..c41eba1 100644 --- a/src/syscore/linux/iouring/nethandle.rs +++ b/src/syscore/linux/iouring/nethandle.rs @@ -1,22 +1,21 @@ use std::future::Future; use std::io; use std::marker::Unpin; -use std::sync::Arc; -use std::path::Path; use std::net::{SocketAddr, ToSocketAddrs}; use std::os::unix::io::AsRawFd; +use std::path::Path; +use std::sync::Arc; use std::net::{TcpListener, TcpStream, UdpSocket}; // Unix specifics -use std::os::unix::net::{UnixListener, UnixStream, UnixDatagram, SocketAddr as UnixSocketAddr}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; -use lever::sync::prelude::*; use futures::Stream; +use lever::sync::prelude::*; -use crate::{Handle, Proactor}; use super::Processor; use crate::syscore::CompletionChan; - +use crate::{Handle, Proactor}; impl Handle { pub fn new(io: T) -> io::Result> { @@ -46,10 +45,13 @@ impl Handle { pub fn incoming( &self, ) -> impl Stream>> + Send + Unpin + '_ { - Box::pin(futures::stream::unfold(self, |listener: &Handle| async move { - let res = listener.accept().await.map(|(stream, _)| stream); - Some((res, listener)) - })) + Box::pin(futures::stream::unfold( + self, + |listener: &Handle| async move { + let res = listener.accept().await.map(|(stream, _)| stream); + Some((res, listener)) + }, + )) } } @@ -94,8 +96,7 @@ impl Handle { pub async fn send_to(&self, buf: &[u8], addr: A) -> io::Result { match addr.to_socket_addrs()?.next() { - Some(addr) => - Processor::processor_send_to(self.get_ref(), buf, addr).await, + Some(addr) => Processor::processor_send_to(self.get_ref(), buf, addr).await, None => Err(io::Error::new( io::ErrorKind::InvalidData, "given addresses can't be parsed", @@ -124,10 +125,13 @@ impl Handle { pub fn incoming( &self, ) -> impl Stream>> + Send + Unpin + '_ { - Box::pin(futures::stream::unfold(self, |listener: &Handle| async move { - let res = listener.accept().await.map(|(stream, _)| stream); - Some((res, listener)) - })) + Box::pin(futures::stream::unfold( + self, + |listener: &Handle| async move { + let res = listener.accept().await.map(|(stream, _)| stream); + Some((res, listener)) + }, + )) } } @@ -197,4 +201,4 @@ impl Handle { pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { Processor::processor_peek_from_unix(self.get_ref(), buf).await } -} \ No newline at end of file +} diff --git a/src/syscore/linux/iouring/processor.rs b/src/syscore/linux/iouring/processor.rs index 146a9f3..4421c06 100644 --- a/src/syscore/linux/iouring/processor.rs +++ b/src/syscore/linux/iouring/processor.rs @@ -1,24 +1,25 @@ +use std::future::Future; use std::io; use std::io::{Read, Write}; -use std::{fs::File, os::unix::io::{AsRawFd, FromRawFd}, mem::ManuallyDrop}; -use std::net::{SocketAddr, ToSocketAddrs, TcpListener}; -use std::os::unix::net::{ - SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream, -}; -use std::net::{SocketAddrV6, SocketAddrV4, Ipv4Addr, Ipv6Addr, UdpSocket}; -use std::future::Future; -use std::path::Path; use std::net::TcpStream; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, UdpSocket}; +use std::net::{SocketAddr, TcpListener, ToSocketAddrs}; +use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}; +use std::path::Path; +use std::{ + fs::File, + mem::ManuallyDrop, + os::unix::io::{AsRawFd, FromRawFd}, +}; use crate::proactor::Proactor; -use crate::Handle; use crate::syscore::shim_to_af_unix; -use std::io::{IoSliceMut, IoSlice}; -use iou::{SockFlag, SockAddrStorage}; +use crate::Handle; +use iou::{SockAddrStorage, SockFlag}; +use std::io::{IoSlice, IoSliceMut}; use std::mem::MaybeUninit; - macro_rules! syscall { ($fn:ident $args:tt) => {{ let res = unsafe { libc::$fn $args }; @@ -30,7 +31,6 @@ macro_rules! syscall { }}; } - pub struct Processor; impl Processor { @@ -39,7 +39,10 @@ impl Processor { ///// Synchronous File /////////////////////////////////// - pub(crate) async fn processor_read_file(io: &R, buf: &mut [u8]) -> io::Result { + pub(crate) async fn processor_read_file( + io: &R, + buf: &mut [u8], + ) -> io::Result { let fd = io.as_raw_fd() as _; let mut bufs = [IoSliceMut::new(buf)]; @@ -69,10 +72,13 @@ impl Processor { pub(crate) async fn processor_send(socket: &R, buf: &[u8]) -> io::Result { let fd = socket.as_raw_fd() as _; - let res = Proactor::get().inner().register_io(|sqe| unsafe { - let sqep = sqe.raw_mut(); - uring_sys::io_uring_prep_send(sqep, fd, buf.as_ptr() as _, buf.len() as _, 0); - })?.await?; + let res = Proactor::get() + .inner() + .register_io(|sqe| unsafe { + let sqep = sqe.raw_mut(); + uring_sys::io_uring_prep_send(sqep, fd, buf.as_ptr() as _, buf.len() as _, 0); + })? + .await?; Ok(res as _) } @@ -92,10 +98,19 @@ impl Processor { ) -> io::Result { let fd = socket.as_raw_fd() as _; - let res = Proactor::get().inner().register_io(|sqe| unsafe { - let sqep = sqe.raw_mut(); - uring_sys::io_uring_prep_recv(sqep as *mut _, fd, buf.as_ptr() as _, buf.len() as _, flags as _); - })?.await?; + let res = Proactor::get() + .inner() + .register_io(|sqe| unsafe { + let sqep = sqe.raw_mut(); + uring_sys::io_uring_prep_recv( + sqep as *mut _, + fd, + buf.as_ptr() as _, + buf.len() as _, + flags as _, + ); + })? + .await?; Ok(res as _) } @@ -105,10 +120,13 @@ impl Processor { ///// Commonality of TcpStream, UdpSocket /////////////////////////////////// - pub(crate) async fn processor_connect(addrs: A, mut f: F) -> io::Result - where - F: FnMut(SocketAddr) -> Fut, - Fut: Future>, + pub(crate) async fn processor_connect( + addrs: A, + mut f: F, + ) -> io::Result + where + F: FnMut(SocketAddr) -> Fut, + Fut: Future>, { // TODO connect_tcp, connect_udp let addrs = match addrs.to_socket_addrs() { @@ -125,10 +143,7 @@ impl Processor { } Err(tail_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "couldn't resolve addresses", - ) + io::Error::new(io::ErrorKind::InvalidInput, "couldn't resolve addresses") })) } @@ -144,22 +159,28 @@ impl Processor { } else { socket2::Domain::ipv4() }; - let sock = socket2::Socket::new(domain, socket2::Type::stream(), Some(socket2::Protocol::tcp()))?; + let sock = socket2::Socket::new( + domain, + socket2::Type::stream(), + Some(socket2::Protocol::tcp()), + )?; sock.set_nonblocking(true)?; // FIXME: (vcq): iou uses nix, i use socket2, conversions happens over libc. // Propose std conversion for nix. - let nixsaddr = - unsafe { - &iou::SockAddr::from_libc_sockaddr(sock.local_addr().unwrap().as_ptr()).unwrap() - }; + let nixsaddr = unsafe { + &iou::SockAddr::from_libc_sockaddr(sock.local_addr().unwrap().as_ptr()).unwrap() + }; let stream = sock.into_tcp_stream(); let fd = stream.as_raw_fd() as _; - Proactor::get().inner().register_io(|sqe| unsafe { - sqe.prep_connect(fd, nixsaddr); - })?.await?; + Proactor::get() + .inner() + .register_io(|sqe| unsafe { + sqe.prep_connect(fd, nixsaddr); + })? + .await?; Ok(Handle::new(stream)?) } @@ -169,7 +190,11 @@ impl Processor { SocketAddr::V4(_) => socket2::Domain::ipv4(), SocketAddr::V6(_) => socket2::Domain::ipv6(), }; - let sock = socket2::Socket::new(domain, socket2::Type::dgram(), Some(socket2::Protocol::udp()))?; + let sock = socket2::Socket::new( + domain, + socket2::Type::dgram(), + Some(socket2::Protocol::udp()), + )?; let sockaddr = socket2::SockAddr::from(addr); let unspec = match addr { @@ -284,7 +309,9 @@ impl Processor { // Ok((Handle::new(stream)?, addr)) // } - pub(crate) async fn processor_accept_tcp_listener(listener: &R) -> io::Result<(Handle, SocketAddr)> { + pub(crate) async fn processor_accept_tcp_listener( + listener: &R, + ) -> io::Result<(Handle, SocketAddr)> { let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) }; let socket = socket.into_tcp_listener(); let socket = ManuallyDrop::new(socket); @@ -306,7 +333,11 @@ impl Processor { Self::send_to_dest(socket, buf, &socket2::SockAddr::from(addr)).await } - async fn send_to_dest(socket: &A, buf: &[u8], addr: &socket2::SockAddr) -> io::Result { + async fn send_to_dest( + socket: &A, + buf: &[u8], + addr: &socket2::SockAddr, + ) -> io::Result { // FIXME: (vcq): Wrap into vec? let mut iov = IoSlice::new(buf); @@ -318,21 +349,30 @@ impl Processor { let fd = socket.as_raw_fd() as _; - let res = Proactor::get().inner().register_io(|sqe| unsafe { - let sqep = sqe.raw_mut(); - uring_sys::io_uring_prep_sendmsg(sqep, fd, &sendmsg as *const _ as *const _, 0); - })?.await?; + let res = Proactor::get() + .inner() + .register_io(|sqe| unsafe { + let sqep = sqe.raw_mut(); + uring_sys::io_uring_prep_sendmsg(sqep, fd, &sendmsg as *const _ as *const _, 0); + })? + .await?; Ok(res as _) } - pub(crate) async fn processor_recv_from(sock: &R, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub(crate) async fn processor_recv_from( + sock: &R, + buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { Self::recv_from_with_flags(sock, buf, 0) .await .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) } - pub(crate) async fn processor_peek_from(sock: &R, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub(crate) async fn processor_peek_from( + sock: &R, + buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { Self::recv_from_with_flags(sock, buf, libc::MSG_PEEK as _) .await .map(|(size, sockaddr)| (size, sockaddr.as_std().unwrap())) @@ -343,7 +383,8 @@ impl Processor { buf: &mut [u8], flags: u32, ) -> io::Result<(usize, socket2::SockAddr)> { - let mut sockaddr_raw = unsafe { MaybeUninit::::zeroed().assume_init() }; + let mut sockaddr_raw = + unsafe { MaybeUninit::::zeroed().assume_init() }; // FIXME: (vcq): Wrap into vec? let mut iov = IoSliceMut::new(buf); @@ -356,15 +397,18 @@ impl Processor { let fd = socket.as_raw_fd() as _; - let res = Proactor::get().inner().register_io(|sqe| unsafe { - let sqep = sqe.raw_mut(); - uring_sys::io_uring_prep_recvmsg( - sqep, - fd, - &mut recvmsg as *mut _ as *mut _, - flags as _, - ); - })?.await?; + let res = Proactor::get() + .inner() + .register_io(|sqe| unsafe { + let sqep = sqe.raw_mut(); + uring_sys::io_uring_prep_recvmsg( + sqep, + fd, + &mut recvmsg as *mut _ as *mut _, + flags as _, + ); + })? + .await?; let sockaddr = unsafe { socket2::SockAddr::from_raw_parts( @@ -380,7 +424,9 @@ impl Processor { ///// UnixListener /////////////////////////////////// - pub(crate) async fn processor_accept_unix_listener(listener: &R) -> io::Result<(Handle, UnixSocketAddr)> { + pub(crate) async fn processor_accept_unix_listener( + listener: &R, + ) -> io::Result<(Handle, UnixSocketAddr)> { let fd = listener.as_raw_fd() as _; let mut saddrstor = SockAddrStorage::uninit(); @@ -403,7 +449,9 @@ impl Processor { ///// UnixStream /////////////////////////////////// - pub(crate) async fn processor_connect_unix>(path: P) -> io::Result> { + pub(crate) async fn processor_connect_unix>( + path: P, + ) -> io::Result> { let sock = socket2::Socket::new(socket2::Domain::unix(), socket2::Type::stream(), None)?; let sockaddr = socket2::SockAddr::unix(path)?; @@ -411,34 +459,44 @@ impl Processor { // FIXME: (vcq): iou uses nix, i use socket2, conversions happens over libc. // Propose std conversion for nix. - let nixsaddr = - unsafe { - &iou::SockAddr::from_libc_sockaddr(sock.local_addr().unwrap().as_ptr()).unwrap() - }; + let nixsaddr = unsafe { + &iou::SockAddr::from_libc_sockaddr(sock.local_addr().unwrap().as_ptr()).unwrap() + }; let stream = sock.into_unix_stream(); let fd = stream.as_raw_fd() as _; - Proactor::get().inner().register_io(|sqe| unsafe { - sqe.prep_connect(fd, nixsaddr) - })?.await?; + Proactor::get() + .inner() + .register_io(|sqe| unsafe { sqe.prep_connect(fd, nixsaddr) })? + .await?; Ok(Handle::new(stream)?) } - pub(crate) async fn processor_send_to_unix>(socket: &R, buf: &[u8], path: P) -> io::Result { + pub(crate) async fn processor_send_to_unix>( + socket: &R, + buf: &[u8], + path: P, + ) -> io::Result { Self::send_to_dest(socket, buf, &socket2::SockAddr::unix(path)?).await } - pub(crate) async fn processor_recv_from_unix(socket: &R, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { + pub(crate) async fn processor_recv_from_unix( + socket: &R, + buf: &mut [u8], + ) -> io::Result<(usize, UnixSocketAddr)> { Self::recv_from_with_flags(socket, buf, 0) .await .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) } - pub(crate) async fn processor_peek_from_unix(socket: &R, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> { + pub(crate) async fn processor_peek_from_unix( + socket: &R, + buf: &mut [u8], + ) -> io::Result<(usize, UnixSocketAddr)> { Self::recv_from_with_flags(socket, buf, libc::MSG_PEEK as _) .await .map(|(size, sockaddr)| (size, shim_to_af_unix(&sockaddr).unwrap())) } -} \ No newline at end of file +} diff --git a/src/syscore/linux/mod.rs b/src/syscore/linux/mod.rs index 05e0f45..362678a 100644 --- a/src/syscore/linux/mod.rs +++ b/src/syscore/linux/mod.rs @@ -6,4 +6,4 @@ pub(crate) use epoll::*; #[cfg(feature = "iouring")] mod iouring; #[cfg(feature = "iouring")] -pub(crate) use iouring::*; \ No newline at end of file +pub(crate) use iouring::*; diff --git a/src/utils.rs b/src/utils.rs index e69de29..8b13789 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -0,0 +1 @@ + diff --git a/src/waker/mod.rs b/src/waker/mod.rs index fb9fd8b..7871fda 100644 --- a/src/waker/mod.rs +++ b/src/waker/mod.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; use core::mem::{self, ManuallyDrop}; use core::task::{RawWaker, RawWakerVTable, Waker}; +use std::sync::Arc; /// Creates a waker from a wake function. ///