From d9e856481d1ae608014040c646009bb5a055c482 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Mon, 14 Oct 2024 11:29:12 +0200 Subject: [PATCH] feat: Faster UDP/IO on Apple platforms This uses Apple's private sendmsg_x and recvmsg_x system calls for multi-packet UDP I/O. --- quinn-udp/Cargo.toml | 5 + quinn-udp/benches/throughput.rs | 2 +- quinn-udp/build.rs | 26 +++ quinn-udp/src/cmsg/unix.rs | 23 +++ quinn-udp/src/unix.rs | 285 +++++++++++++++++++------------- 5 files changed, 222 insertions(+), 119 deletions(-) create mode 100644 quinn-udp/build.rs diff --git a/quinn-udp/Cargo.toml b/quinn-udp/Cargo.toml index b7790001a..919137938 100644 --- a/quinn-udp/Cargo.toml +++ b/quinn-udp/Cargo.toml @@ -18,6 +18,8 @@ default = ["tracing", "log"] # Configure `tracing` to log events via `log` if no `tracing` subscriber exists. log = ["tracing/log"] direct-log = ["dep:log"] +# Use private Apple APIs to send multiple packets in a single syscall. +fast-apple-datapath = [] [dependencies] libc = "0.2.158" @@ -33,6 +35,9 @@ windows-sys = { workspace = true } criterion = { version = "0.5", default-features = false, features = ["async_tokio"] } tokio = { workspace = true, features = ["rt", "rt-multi-thread", "net"] } +[build-dependencies] +cfg_aliases = "0.2" + [lib] # See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options bench = false diff --git a/quinn-udp/benches/throughput.rs b/quinn-udp/benches/throughput.rs index 73f1fea15..446ccc12d 100644 --- a/quinn-udp/benches/throughput.rs +++ b/quinn-udp/benches/throughput.rs @@ -23,7 +23,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut permutations = vec![]; for gso_enabled in [ false, - #[cfg(any(target_os = "linux", target_os = "windows"))] + #[cfg(any(target_os = "linux", target_os = "windows", apple))] true, ] { for gro_enabled in [false, true] { diff --git a/quinn-udp/build.rs b/quinn-udp/build.rs new file mode 100644 index 000000000..ef89ff70f --- /dev/null +++ b/quinn-udp/build.rs @@ -0,0 +1,26 @@ +use cfg_aliases::cfg_aliases; + +fn main() { + // Setup cfg aliases + cfg_aliases! { + // Platforms + apple: { + any( + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "visionos" + ) + }, + bsd: { + any( + target_os = "freebsd", + target_os = "openbsd", + target_os = "netbsd" + ) + }, + // Convenience aliases + apple_fast: { all(apple, feature = "fast-apple-datapath") }, + apple_slow: { all(apple, not(feature = "fast-apple-datapath")) }, + } +} diff --git a/quinn-udp/src/cmsg/unix.rs b/quinn-udp/src/cmsg/unix.rs index 112bd5ebe..93ac76ba8 100644 --- a/quinn-udp/src/cmsg/unix.rs +++ b/quinn-udp/src/cmsg/unix.rs @@ -32,6 +32,29 @@ impl MsgHdr for libc::msghdr { } } +#[cfg(apple_fast)] +impl MsgHdr for crate::imp::msghdr_x { + type ControlMessage = libc::cmsghdr; + + fn cmsg_first_hdr(&self) -> *mut Self::ControlMessage { + let selfp = self as *const _ as *mut libc::msghdr; + unsafe { libc::CMSG_FIRSTHDR(selfp) } + } + + fn cmsg_nxt_hdr(&self, cmsg: &Self::ControlMessage) -> *mut Self::ControlMessage { + let selfp = self as *const _ as *mut libc::msghdr; + unsafe { libc::CMSG_NXTHDR(selfp, cmsg) } + } + + fn set_control_len(&mut self, len: usize) { + self.msg_controllen = len as _; + } + + fn control_len(&self) -> usize { + self.msg_controllen as _ + } +} + /// Helpers for [`libc::cmsghdr`] impl CMsgHdr for libc::cmsghdr { fn cmsg_len(length: usize) -> usize { diff --git a/quinn-udp/src/unix.rs b/quinn-udp/src/unix.rs index 5517053f0..9bd6e49f6 100644 --- a/quinn-udp/src/unix.rs +++ b/quinn-udp/src/unix.rs @@ -1,11 +1,4 @@ -#[cfg(not(any( - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "visionos", - target_os = "openbsd", - target_os = "solaris", -)))] +#[cfg(not(any(apple, target_os = "openbsd", target_os = "solaris")))] use std::ptr; use std::{ io::{self, IoSliceMut}, @@ -26,6 +19,38 @@ use super::{ IO_ERROR_LOG_INTERVAL, }; +// Adapted from https://github.com/apple-oss-distributions/xnu/blob/8d741a5de7ff4191bf97d57b9f54c2f6d4a15585/bsd/sys/socket_private.h +#[cfg(apple_fast)] +#[repr(C)] +#[allow(non_camel_case_types)] +pub(crate) struct msghdr_x { + pub msg_name: *mut libc::c_void, + pub msg_namelen: libc::socklen_t, + pub msg_iov: *mut libc::iovec, + pub msg_iovlen: libc::c_int, + pub msg_control: *mut libc::c_void, + pub msg_controllen: libc::socklen_t, + pub msg_flags: libc::c_int, + pub msg_datalen: usize, +} + +#[cfg(apple_fast)] +extern "C" { + fn recvmsg_x( + s: libc::c_int, + msgp: *const msghdr_x, + cnt: libc::c_uint, + flags: libc::c_int, + ) -> isize; + + fn sendmsg_x( + s: libc::c_int, + msgp: *const msghdr_x, + cnt: libc::c_uint, + flags: libc::c_int, + ) -> isize; +} + // Defined in netinet6/in6.h on OpenBSD, this is not yet exported by the libc crate // directly. See https://github.com/rust-lang/libc/issues/3704 for when we might be able to // rely on this from the libc crate. @@ -63,13 +88,8 @@ impl UdpSocketState { let io = sock.0; let mut cmsg_platform_space = 0; if cfg!(target_os = "linux") - || cfg!(target_os = "freebsd") - || cfg!(target_os = "openbsd") - || cfg!(target_os = "netbsd") - || cfg!(target_os = "macos") - || cfg!(target_os = "ios") - || cfg!(target_os = "tvos") - || cfg!(target_os = "visionos") + || cfg!(bsd) + || cfg!(apple) || cfg!(target_os = "android") || cfg!(target_os = "solaris") { @@ -131,13 +151,7 @@ impl UdpSocketState { )?; } } - #[cfg(any( - target_os = "freebsd", - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "visionos" - ))] + #[cfg(any(target_os = "freebsd", apple))] { if is_ipv4 { // Set `may_fragment` to `true` if this option is not supported on the platform. @@ -149,16 +163,7 @@ impl UdpSocketState { )?; } } - #[cfg(any( - target_os = "freebsd", - target_os = "openbsd", - target_os = "netbsd", - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "visionos", - target_os = "solaris", - ))] + #[cfg(any(bsd, apple, target_os = "solaris"))] // IP_RECVDSTADDR == IP_SENDSRCADDR on FreeBSD // macOS uses only IP_RECVDSTADDR, no IP_SENDSRCADDR on macOS (the same on Solaris) // macOS also supports IP_PKTINFO @@ -236,27 +241,13 @@ impl UdpSocketState { } /// Sets the flag indicating we got EINVAL error from `sendmsg` syscall. - #[cfg(not(any( - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "visionos", - target_os = "openbsd", - target_os = "netbsd" - )))] + #[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))] fn set_sendmsg_einval(&self) { self.sendmsg_einval.store(true, Ordering::Relaxed) } } -#[cfg(not(any( - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "visionos", - target_os = "openbsd", - target_os = "netbsd" -)))] +#[cfg(not(any(apple, target_os = "openbsd", target_os = "netbsd")))] fn send( #[allow(unused_variables)] // only used on Linux state: &UdpSocketState, @@ -341,14 +332,67 @@ fn send( } } -#[cfg(any( - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "visionos", - target_os = "openbsd", - target_os = "netbsd" -))] +#[cfg(apple_fast)] +fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> { + let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() }; + let mut iovs = unsafe { mem::zeroed::<[libc::iovec; BATCH_SIZE]>() }; + let mut ctrls = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE]; + let addr = socket2::SockAddr::from(transmit.destination); + let segment_size = transmit.segment_size.unwrap_or(transmit.contents.len()); + let mut cnt = 0; + debug_assert!(transmit.contents.len().div_ceil(segment_size) <= BATCH_SIZE); + for (i, chunk) in transmit + .contents + .chunks(segment_size) + .enumerate() + .take(BATCH_SIZE) + { + prepare_msg( + &Transmit { + destination: transmit.destination, + ecn: transmit.ecn, + contents: chunk, + segment_size: Some(chunk.len()), + src_ip: transmit.src_ip, + }, + &addr, + &mut hdrs[i], + &mut iovs[i], + &mut ctrls[i], + true, + state.sendmsg_einval(), + ); + hdrs[i].msg_datalen = chunk.len(); + cnt += 1; + } + let n = unsafe { sendmsg_x(io.as_raw_fd(), hdrs.as_ptr(), cnt as u32, 0) }; + if n >= 0 { + return Ok(()); + } + let e = io::Error::last_os_error(); + match e.kind() { + io::ErrorKind::Interrupted => { + // Retry the transmission + } + io::ErrorKind::WouldBlock => return Err(e), + _ => { + // Other errors are ignored, since they will usually be handled + // by higher level retransmits and timeouts. + // - PermissionDenied errors have been observed due to iptable rules. + // Those are not fatal errors, since the + // configuration can be dynamically changed. + // - Destination unreachable errors have been observed for other + // - EMSGSIZE is expected for MTU probes. Future work might be able to avoid + // these by automatically clamping the MTUD upper bound to the interface MTU. + if e.raw_os_error() != Some(libc::EMSGSIZE) { + log_sendmsg_error(&state.last_send_error, e, transmit); + } + } + } + Ok(()) +} + +#[cfg(any(target_os = "openbsd", target_os = "netbsd", apple_slow))] fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io::Result<()> { let mut hdr: libc::msghdr = unsafe { mem::zeroed() }; let mut iov: libc::iovec = unsafe { mem::zeroed() }; @@ -360,12 +404,7 @@ fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io: &mut hdr, &mut iov, &mut ctrl, - cfg!(target_os = "macos") - || cfg!(target_os = "ios") - || cfg!(target_os = "tvos") - || cfg!(target_os = "visionos") - || cfg!(target_os = "openbsd") - || cfg!(target_os = "netbsd"), + cfg!(apple) || cfg!(target_os = "openbsd") || cfg!(target_os = "netbsd"), state.sendmsg_einval(), ); let n = unsafe { libc::sendmsg(io.as_raw_fd(), &hdr, 0) }; @@ -394,14 +433,7 @@ fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit<'_>) -> io: Ok(()) } -#[cfg(not(any( - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "visionos", - target_os = "openbsd", - target_os = "solaris", -)))] +#[cfg(not(any(apple, target_os = "openbsd", target_os = "solaris")))] fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result { let mut names = [MaybeUninit::::uninit(); BATCH_SIZE]; let mut ctrls = [cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); BATCH_SIZE]; @@ -440,14 +472,35 @@ fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> Ok(msg_count as usize) } -#[cfg(any( - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "visionos", - target_os = "openbsd", - target_os = "solaris", -))] +#[cfg(apple_fast)] +fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result { + let mut names = [MaybeUninit::::uninit(); BATCH_SIZE]; + let mut ctrls = [cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); BATCH_SIZE]; + let mut hdrs = unsafe { mem::zeroed::<[msghdr_x; BATCH_SIZE]>() }; + let max_msg_count = bufs.len().min(BATCH_SIZE); + for i in 0..max_msg_count { + prepare_recv(&mut bufs[i], &mut names[i], &mut ctrls[i], &mut hdrs[i]); + } + let msg_count = loop { + let n = unsafe { recvmsg_x(io.as_raw_fd(), hdrs.as_mut_ptr(), max_msg_count as _, 0) }; + match n { + -1 => { + let e = io::Error::last_os_error(); + if e.kind() == io::ErrorKind::Interrupted { + continue; + } + return Err(e); + } + n => break n, + } + }; + for i in 0..(msg_count as usize) { + meta[i] = decode_recv(&names[i], &hdrs[i], hdrs[i].msg_datalen as usize); + } + Ok(msg_count as usize) +} + +#[cfg(any(target_os = "openbsd", target_os = "netbsd", apple_slow))] fn recv(io: SockRef<'_>, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta]) -> io::Result { let mut name = MaybeUninit::::uninit(); let mut ctrl = cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit()); @@ -476,7 +529,8 @@ const CMSG_LEN: usize = 88; fn prepare_msg( transmit: &Transmit<'_>, dst_addr: &socket2::SockAddr, - hdr: &mut libc::msghdr, + #[cfg(not(apple_fast))] hdr: &mut libc::msghdr, + #[cfg(apple_fast)] hdr: &mut msghdr_x, iov: &mut libc::iovec, ctrl: &mut cmsg::Aligned<[u8; CMSG_LEN]>, #[allow(unused_variables)] // only used on FreeBSD & macOS @@ -534,16 +588,7 @@ fn prepare_msg( }; encoder.push(libc::IPPROTO_IP, libc::IP_PKTINFO, pktinfo); } - #[cfg(any( - target_os = "freebsd", - target_os = "openbsd", - target_os = "netbsd", - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "visionos", - target_os = "solaris", - ))] + #[cfg(any(bsd, apple, target_os = "solaris"))] { if encode_src_ip { let addr = libc::in_addr { @@ -568,6 +613,7 @@ fn prepare_msg( encoder.finish(); } +#[cfg(not(apple_fast))] fn prepare_recv( buf: &mut IoSliceMut, name: &mut MaybeUninit, @@ -583,9 +629,27 @@ fn prepare_recv( hdr.msg_flags = 0; } +#[cfg(apple_fast)] +fn prepare_recv( + buf: &mut IoSliceMut, + name: &mut MaybeUninit, + ctrl: &mut cmsg::Aligned>, + hdr: &mut msghdr_x, +) { + hdr.msg_name = name.as_mut_ptr() as _; + hdr.msg_namelen = mem::size_of::() as _; + hdr.msg_iov = buf as *mut IoSliceMut as *mut libc::iovec; + hdr.msg_iovlen = 1; + hdr.msg_control = ctrl.0.as_mut_ptr() as _; + hdr.msg_controllen = CMSG_LEN as _; + hdr.msg_flags = 0; + hdr.msg_datalen = buf.len(); +} + fn decode_recv( name: &MaybeUninit, - hdr: &libc::msghdr, + #[cfg(not(apple_fast))] hdr: &libc::msghdr, + #[cfg(apple_fast)] hdr: &msghdr_x, len: usize, ) -> RecvMeta { let name = unsafe { name.assume_init() }; @@ -609,10 +673,7 @@ fn decode_recv( // Temporary hack around broken macos ABI. Remove once upstream fixes it. // https://bugreport.apple.com/web/?problemID=48761855 #[allow(clippy::unnecessary_cast)] // cmsg.cmsg_len defined as size_t - if (cfg!(target_os = "macos") - || cfg!(target_os = "ios") - || cfg!(target_os = "tvos") - || cfg!(target_os = "visionos")) + if cfg!(apple) && cmsg.cmsg_len as usize == libc::CMSG_LEN(mem::size_of::() as _) as usize { ecn_bits = cmsg::decode::(cmsg); @@ -627,15 +688,7 @@ fn decode_recv( pktinfo.ipi_addr.s_addr.to_ne_bytes(), ))); } - #[cfg(any( - target_os = "freebsd", - target_os = "openbsd", - target_os = "netbsd", - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "visionos", - ))] + #[cfg(any(bsd, apple))] (libc::IPPROTO_IP, libc::IP_RECVDSTADDR) => { let in_addr = unsafe { cmsg::decode::(cmsg) }; dst_ip = Some(IpAddr::V4(Ipv4Addr::from(in_addr.s_addr.to_ne_bytes()))); @@ -685,21 +738,11 @@ fn decode_recv( } } -#[cfg(not(any( - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "visionos" -)))] +#[cfg(not(apple_slow))] // Chosen somewhat arbitrarily; might benefit from additional tuning. pub(crate) const BATCH_SIZE: usize = 32; -#[cfg(any( - target_os = "macos", - target_os = "ios", - target_os = "tvos", - target_os = "visionos" -))] +#[cfg(apple_slow)] pub(crate) const BATCH_SIZE: usize = 1; #[cfg(target_os = "linux")] @@ -731,16 +774,22 @@ mod gso { } } +// On Apple platforms using the `sendmsg_x` call, UDP datagram segmentation is not +// offloaded to the NIC or even the kernel, but instead done here in user space in +// [`send`]) and then passed to the OS as individual `iovec`s (up to `BATCH_SIZE`). #[cfg(not(target_os = "linux"))] mod gso { use super::*; pub(super) fn max_gso_segments() -> usize { - 1 + BATCH_SIZE } - pub(super) fn set_segment_size(_encoder: &mut cmsg::Encoder, _segment_size: u16) { - panic!("Setting a segment size is not supported on current platform"); + pub(super) fn set_segment_size( + #[cfg(not(apple_fast))] _encoder: &mut cmsg::Encoder, + #[cfg(apple_fast)] _encoder: &mut cmsg::Encoder, + _segment_size: u16, + ) { } }