From fcb630136d07721d269b3dabdf5b6cf97fce77b3 Mon Sep 17 00:00:00 2001 From: Scott Hutton Date: Tue, 9 Jul 2024 16:39:41 -0700 Subject: [PATCH] Track last error atomically Eliminate the need for UdpSocket to be passed mutably into send_mmsg(). --- src/lib.rs | 10 +++---- src/unix.rs | 79 +++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 63 insertions(+), 26 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 29e034e..4db566a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,10 +2,10 @@ use std::{ net::{IpAddr, Ipv6Addr, SocketAddr}, sync::atomic::{AtomicUsize, Ordering}, - time::{Duration, Instant}, }; pub use crate::cmsg::{AsPtr, EcnCodepoint, Source, Transmit}; +use imp::LastSendError; use tracing::warn; mod cmsg; @@ -94,20 +94,18 @@ impl Default for RecvMeta { } /// Log at most 1 IO error per minute -const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60); +const IO_ERROR_LOG_INTERVAL: u64 = 60; /// Logs a warning message when sendmsg fails /// /// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`] /// has elapsed since the last error was logged. fn log_sendmsg_error>( - last_send_error: &mut Instant, + last_send_error: LastSendError, err: impl core::fmt::Debug, transmit: &Transmit, ) { - let now = Instant::now(); - if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL { - *last_send_error = now; + if last_send_error.should_log() { warn!( "sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}", err, transmit.dst, transmit.src, transmit.ecn, transmit.contents.len(), transmit.segment_size); diff --git a/src/unix.rs b/src/unix.rs index d7024d4..89be2cd 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -4,9 +4,12 @@ use std::{ mem::{self, MaybeUninit}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, os::{fd::AsFd, unix::io::AsRawFd}, - sync::atomic::AtomicUsize, + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, + }, task::{Context, Poll}, - time::Instant, + time::SystemTime, }; use crate::cmsg::{AsPtr, EcnCodepoint, Source, Transmit}; @@ -31,7 +34,7 @@ type IpTosTy = libc::c_int; #[derive(Debug)] pub struct UdpSocket { io: tokio::net::UdpSocket, - last_send_error: Instant, + last_send_error: LastSendError, } impl AsRawFd for UdpSocket { @@ -46,16 +49,51 @@ impl AsFd for UdpSocket { } } +#[derive(Clone, Debug)] +pub(crate) struct LastSendError(Arc); + +impl Default for LastSendError { + fn default() -> Self { + let now = Self::now(); + Self(Arc::new(AtomicU64::new( + now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now), + ))) + } +} + +impl LastSendError { + fn now() -> u64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() + } + + /// Determine whether the last error was more than `IO_ERROR_LOG_INTERVAL` + /// seconds ago. If so, update the last error time and return true. + /// + /// Note: if the system clock regresses more tha `IO_ERROR_LOG_INTERVAL`, + /// this function may impose an additional delay on log message emission. + /// Similarly, if it advances, messages may be emitted prematurely. + pub(crate) fn should_log(&self) -> bool { + let now = Self::now(); + self.0 + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |cur| { + (now.saturating_sub(cur) > IO_ERROR_LOG_INTERVAL).then_some(now) + }) + .is_ok() + } +} + impl UdpSocket { /// Creates a new UDP socket from a previously created `std::net::UdpSocket` pub fn from_std(socket: std::net::UdpSocket) -> io::Result { socket.set_nonblocking(true)?; init(SockRef::from(&socket))?; - let now = Instant::now(); Ok(UdpSocket { io: tokio::net::UdpSocket::from_std(socket)?, - last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now), + last_send_error: LastSendError::default(), }) } @@ -67,10 +105,9 @@ impl UdpSocket { pub async fn bind(addr: A) -> io::Result { let io = tokio::net::UdpSocket::bind(addr).await?; init(SockRef::from(&io))?; - let now = Instant::now(); Ok(UdpSocket { io, - last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now), + last_send_error: LastSendError::default(), }) } @@ -195,13 +232,13 @@ impl UdpSocket { /// /// [`sendmmsg`]: https://linux.die.net/man/2/sendmmsg pub async fn send_mmsg>( - &mut self, + &self, state: &UdpState, transmits: &[Transmit], ) -> Result { let n = loop { self.io.writable().await?; - let last_send_error = &mut self.last_send_error; + let last_send_error = self.last_send_error.clone(); let io = &self.io; match io.try_io(Interest::WRITABLE, || { send(state, SockRef::from(io), last_send_error, transmits) @@ -278,11 +315,15 @@ impl UdpSocket { transmits: &[Transmit], ) -> Poll> { loop { - let last_send_error = &mut self.last_send_error; ready!(self.io.poll_send_ready(cx))?; let io = &self.io; if let Ok(res) = io.try_io(Interest::WRITABLE, || { - send(state, SockRef::from(io), last_send_error, transmits) + send( + state, + SockRef::from(io), + self.last_send_error.clone(), + transmits, + ) }) { return Poll::Ready(Ok(res)); } @@ -353,7 +394,7 @@ pub mod sync { #[derive(Debug)] pub struct UdpSocket { io: std::net::UdpSocket, - last_send_error: Instant, + last_send_error: LastSendError, } impl AsRawFd for UdpSocket { @@ -372,10 +413,9 @@ pub mod sync { pub fn from_std(socket: std::net::UdpSocket) -> io::Result { init(SockRef::from(&socket))?; socket.set_nonblocking(false)?; - let now = Instant::now(); Ok(Self { io: socket, - last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now), + last_send_error: LastSendError::default(), }) } /// create a new UDP socket and attempt to bind to `addr` @@ -383,10 +423,9 @@ pub mod sync { let io = std::net::UdpSocket::bind(addr)?; init(SockRef::from(&io))?; io.set_nonblocking(false)?; - let now = Instant::now(); Ok(Self { io, - last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now), + last_send_error: LastSendError::default(), }) } /// sets nonblocking mode @@ -474,7 +513,7 @@ pub mod sync { send( state, SockRef::from(&self.io), - &mut self.last_send_error, + self.last_send_error.clone(), transmits, ) } @@ -681,7 +720,7 @@ fn send_msg>( fn send>( state: &UdpState, io: SockRef<'_>, - last_send_error: &mut Instant, + last_send_error: LastSendError, transmits: &[Transmit], ) -> io::Result { use std::ptr; @@ -802,7 +841,7 @@ fn send_msg>( fn send>( _state: &UdpState, io: SockRef<'_>, - last_send_error: &mut Instant, + last_send_error: LastSendError, transmits: &[Transmit], ) -> io::Result { let mut hdr: libc::msghdr = unsafe { mem::zeroed() }; @@ -828,7 +867,7 @@ fn send>( // Those are not fatal errors, since the // configuration can be dynamically changed. // - Destination unreachable errors have been observed for other - log_sendmsg_error(last_send_error, e, &transmits[sent]); + log_sendmsg_error(last_send_error.clone(), e, &transmits[sent]); sent += 1; } }