From bcf930153781588cfd75d19886b9ee65b29910ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Tue, 12 Mar 2024 21:16:47 +0800 Subject: [PATCH 1/2] fix(driver,iour): remove outer squeue --- compio-driver/src/iour/mod.rs | 137 ++++++++++++++-------------------- 1 file changed, 57 insertions(+), 80 deletions(-) diff --git a/compio-driver/src/iour/mod.rs b/compio-driver/src/iour/mod.rs index 13df621f..43ad0446 100644 --- a/compio-driver/src/iour/mod.rs +++ b/compio-driver/src/iour/mod.rs @@ -1,10 +1,7 @@ #[cfg_attr(all(doc, docsrs), doc(cfg(all())))] #[allow(unused_imports)] pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; -use std::{ - collections::VecDeque, io, os::fd::OwnedFd, pin::Pin, ptr::NonNull, sync::Arc, task::Poll, - time::Duration, -}; +use std::{io, os::fd::OwnedFd, pin::Pin, ptr::NonNull, sync::Arc, task::Poll, time::Duration}; use compio_log::{instrument, trace}; use crossbeam_queue::SegQueue; @@ -74,7 +71,6 @@ pub trait OpCode { /// Low-level driver of io-uring. pub(crate) struct Driver { inner: IoUring, - squeue: VecDeque, notifier: Notifier, pool: AsyncifyPool, pool_completed: Arc>, @@ -87,19 +83,23 @@ impl Driver { pub fn new(builder: &ProactorBuilder) -> io::Result { instrument!(compio_log::Level::TRACE, "new", ?builder); trace!("new iour driver"); - let mut squeue = VecDeque::with_capacity(builder.capacity as usize); let notifier = Notifier::new()?; + let mut inner = IoUring::builder().build(builder.capacity)?; #[allow(clippy::useless_conversion)] - squeue.push_back( - PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _) - .multi(true) - .build() - .user_data(Self::NOTIFY) - .into(), - ); + unsafe { + inner + .submission() + .push( + &PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _) + .multi(true) + .build() + .user_data(Self::NOTIFY) + .into(), + ) + .expect("the squeue sould not be empty"); + } Ok(Self { - inner: IoUring::builder().build(builder.capacity)?, - squeue, + inner, notifier, pool: builder.create_or_get_thread_pool(), pool_completed: Arc::new(SegQueue::new()), @@ -107,9 +107,9 @@ impl Driver { } // Auto means that it choose to wait or not automatically. - fn submit_auto(&mut self, timeout: Option, wait: bool) -> io::Result<()> { + fn submit_auto(&mut self, timeout: Option) -> io::Result<()> { instrument!(compio_log::Level::TRACE, "submit_auto", ?timeout, wait); - let res = if wait { + let res = { // Last part of submission queue, wait till timeout. if let Some(duration) = timeout { let timespec = timespec(duration); @@ -118,70 +118,32 @@ impl Driver { } else { self.inner.submit_and_wait(1) } - } else { - self.inner.submit() }; trace!("submit result: {res:?}"); match res { Ok(_) => { if self.inner.completion().is_empty() { - Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)) + Err(io::ErrorKind::TimedOut.into()) } else { Ok(()) } } Err(e) => match e.raw_os_error() { - Some(libc::ETIME) => Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)), - Some(libc::EBUSY) | Some(libc::EAGAIN) => Ok(()), + Some(libc::ETIME) => Err(io::ErrorKind::TimedOut.into()), + Some(libc::EBUSY) | Some(libc::EAGAIN) => Err(io::ErrorKind::Interrupted.into()), _ => Err(e), }, } } - fn flush_submissions(&mut self) -> bool { - instrument!(compio_log::Level::TRACE, "flush_submissions"); - - let mut ended_ops = false; - - let mut inner_squeue = self.inner.submission(); - - while !inner_squeue.is_full() { - if self.squeue.len() <= inner_squeue.capacity() - inner_squeue.len() { - trace!("inner_squeue have enough space, flush all entries"); - let (s1, s2) = self.squeue.as_slices(); - unsafe { - inner_squeue - .push_multiple(s1) - .expect("queue has enough space"); - inner_squeue - .push_multiple(s2) - .expect("queue has enough space"); - } - self.squeue.clear(); - ended_ops = true; - break; - } else if let Some(entry) = self.squeue.pop_front() { - trace!("inner_squeue have not enough space, flush an entry"); - unsafe { inner_squeue.push(&entry) }.expect("queue has enough space"); - } else { - trace!("self.squeue is empty, skip"); - ended_ops = true; - break; - } - } - - inner_squeue.sync(); - - ended_ops - } - - fn poll_entries(&mut self, entries: &mut impl Extend) { + fn poll_entries(&mut self, entries: &mut impl Extend) -> bool { while let Some(entry) = self.pool_completed.pop() { entries.extend(Some(entry)); } let mut cqueue = self.inner.completion(); cqueue.sync(); + let has_entry = !cqueue.is_empty(); let completed_entries = cqueue.filter_map(|entry| match entry.user_data() { Self::CANCEL => None, Self::NOTIFY => { @@ -194,6 +156,7 @@ impl Driver { _ => Some(create_entry(entry)), }); entries.extend(completed_entries); + has_entry } pub fn create_op(&self, user_data: usize, op: T) -> RawOp { @@ -207,13 +170,34 @@ impl Driver { pub fn cancel(&mut self, user_data: usize, _registry: &mut Slab) { instrument!(compio_log::Level::TRACE, "cancel", user_data); trace!("cancel RawOp"); - #[allow(clippy::useless_conversion)] - self.squeue.push_back( - AsyncCancel::new(user_data as _) - .build() - .user_data(Self::CANCEL) - .into(), - ); + unsafe { + #[allow(clippy::useless_conversion)] + self.inner + .submission() + .push( + &AsyncCancel::new(user_data as _) + .build() + .user_data(Self::CANCEL) + .into(), + ) + .ok(); + } + } + + fn push_raw(&mut self, entry: SEntry) -> io::Result<()> { + loop { + let mut squeue = self.inner.submission(); + match unsafe { squeue.push(&entry) } { + Ok(()) => { + squeue.sync(); + break Ok(()); + } + Err(_) => { + drop(squeue); + self.inner.submit()?; + } + } + } } pub fn push(&mut self, user_data: usize, op: &mut RawOp) -> Poll> { @@ -223,13 +207,12 @@ impl Driver { match op_pin.create_entry() { OpEntry::Submission(entry) => { #[allow(clippy::useless_conversion)] - self.squeue - .push_back(entry.user_data(user_data as _).into()); + self.push_raw(entry.user_data(user_data as _).into())?; Poll::Pending } #[cfg(feature = "io-uring-sqe128")] - OpEntry::Submission128(_entry) => { - self.squeue.push_back(_entry.user_data(user_data as _)); + OpEntry::Submission128(entry) => { + self.push_raw(entry.user_data(user_data as _))?; Poll::Pending } OpEntry::Blocking => { @@ -273,18 +256,12 @@ impl Driver { instrument!(compio_log::Level::TRACE, "poll", ?timeout); // Anyway we need to submit once, no matter there are entries in squeue. trace!("start polling"); - loop { - let ended = self.flush_submissions(); - - self.submit_auto(timeout, ended)?; + if !self.poll_entries(&mut entries) { + self.submit_auto(timeout)?; self.poll_entries(&mut entries); - - if ended { - trace!("polling ended"); - break; - } } + Ok(()) } From f5284ea43ac3182447386714ca4bd03cb8a417ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Tue, 12 Mar 2024 21:51:09 +0800 Subject: [PATCH 2/2] fix(driver,iour): log cancel fail and fix comments --- compio-driver/src/iour/mod.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/compio-driver/src/iour/mod.rs b/compio-driver/src/iour/mod.rs index 43ad0446..366169e7 100644 --- a/compio-driver/src/iour/mod.rs +++ b/compio-driver/src/iour/mod.rs @@ -3,7 +3,7 @@ pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::{io, os::fd::OwnedFd, pin::Pin, ptr::NonNull, sync::Arc, task::Poll, time::Duration}; -use compio_log::{instrument, trace}; +use compio_log::{instrument, trace, warn}; use crossbeam_queue::SegQueue; cfg_if::cfg_if! { if #[cfg(feature = "io-uring-cqe32")] { @@ -96,7 +96,7 @@ impl Driver { .user_data(Self::NOTIFY) .into(), ) - .expect("the squeue sould not be empty"); + .expect("the squeue sould not be full"); } Ok(Self { inner, @@ -108,7 +108,7 @@ impl Driver { // Auto means that it choose to wait or not automatically. fn submit_auto(&mut self, timeout: Option) -> io::Result<()> { - instrument!(compio_log::Level::TRACE, "submit_auto", ?timeout, wait); + instrument!(compio_log::Level::TRACE, "submit_auto", ?timeout); let res = { // Last part of submission queue, wait till timeout. if let Some(duration) = timeout { @@ -172,7 +172,8 @@ impl Driver { trace!("cancel RawOp"); unsafe { #[allow(clippy::useless_conversion)] - self.inner + if self + .inner .submission() .push( &AsyncCancel::new(user_data as _) @@ -180,7 +181,10 @@ impl Driver { .user_data(Self::CANCEL) .into(), ) - .ok(); + .is_err() + { + warn!("could not push AsyncCancel entry"); + } } }