Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(driver,iour): remove outer squeue #227

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 63 additions & 82 deletions compio-driver/src/iour/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
#[allow(unused_imports)]
pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::{
collections::VecDeque, io, os::fd::OwnedFd, pin::Pin, ptr::NonNull, sync::Arc, task::Poll,
time::Duration,
};
use std::{io, os::fd::OwnedFd, pin::Pin, ptr::NonNull, sync::Arc, task::Poll, time::Duration};

use compio_log::{instrument, trace};
use compio_log::{instrument, trace, warn};
use crossbeam_queue::SegQueue;
cfg_if::cfg_if! {
if #[cfg(feature = "io-uring-cqe32")] {
Expand Down Expand Up @@ -74,7 +71,6 @@ pub trait OpCode {
/// Low-level driver of io-uring.
pub(crate) struct Driver {
inner: IoUring<SEntry, CEntry>,
squeue: VecDeque<SEntry>,
notifier: Notifier,
pool: AsyncifyPool,
pool_completed: Arc<SegQueue<Entry>>,
Expand All @@ -87,29 +83,33 @@ impl Driver {
pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
instrument!(compio_log::Level::TRACE, "new", ?builder);
trace!("new iour driver");
let mut squeue = VecDeque::with_capacity(builder.capacity as usize);
let notifier = Notifier::new()?;
let mut inner = IoUring::builder().build(builder.capacity)?;
#[allow(clippy::useless_conversion)]
squeue.push_back(
PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _)
.multi(true)
.build()
.user_data(Self::NOTIFY)
.into(),
);
unsafe {
inner
.submission()
.push(
&PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _)
.multi(true)
.build()
.user_data(Self::NOTIFY)
.into(),
)
.expect("the squeue sould not be full");
}
Ok(Self {
inner: IoUring::builder().build(builder.capacity)?,
squeue,
inner,
notifier,
pool: builder.create_or_get_thread_pool(),
pool_completed: Arc::new(SegQueue::new()),
})
}

// Auto means that it choose to wait or not automatically.
fn submit_auto(&mut self, timeout: Option<Duration>, wait: bool) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "submit_auto", ?timeout, wait);
let res = if wait {
fn submit_auto(&mut self, timeout: Option<Duration>) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "submit_auto", ?timeout);
let res = {
// Last part of submission queue, wait till timeout.
if let Some(duration) = timeout {
let timespec = timespec(duration);
Expand All @@ -118,70 +118,32 @@ impl Driver {
} else {
self.inner.submit_and_wait(1)
}
} else {
self.inner.submit()
};
trace!("submit result: {res:?}");
match res {
Ok(_) => {
if self.inner.completion().is_empty() {
Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
Err(io::ErrorKind::TimedOut.into())
} else {
Ok(())
}
}
Err(e) => match e.raw_os_error() {
Some(libc::ETIME) => Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)),
Some(libc::EBUSY) | Some(libc::EAGAIN) => Ok(()),
Some(libc::ETIME) => Err(io::ErrorKind::TimedOut.into()),
Some(libc::EBUSY) | Some(libc::EAGAIN) => Err(io::ErrorKind::Interrupted.into()),
_ => Err(e),
},
}
}

fn flush_submissions(&mut self) -> bool {
instrument!(compio_log::Level::TRACE, "flush_submissions");

let mut ended_ops = false;

let mut inner_squeue = self.inner.submission();

while !inner_squeue.is_full() {
if self.squeue.len() <= inner_squeue.capacity() - inner_squeue.len() {
trace!("inner_squeue have enough space, flush all entries");
let (s1, s2) = self.squeue.as_slices();
unsafe {
inner_squeue
.push_multiple(s1)
.expect("queue has enough space");
inner_squeue
.push_multiple(s2)
.expect("queue has enough space");
}
self.squeue.clear();
ended_ops = true;
break;
} else if let Some(entry) = self.squeue.pop_front() {
trace!("inner_squeue have not enough space, flush an entry");
unsafe { inner_squeue.push(&entry) }.expect("queue has enough space");
} else {
trace!("self.squeue is empty, skip");
ended_ops = true;
break;
}
}

inner_squeue.sync();

ended_ops
}

fn poll_entries(&mut self, entries: &mut impl Extend<Entry>) {
fn poll_entries(&mut self, entries: &mut impl Extend<Entry>) -> bool {
while let Some(entry) = self.pool_completed.pop() {
entries.extend(Some(entry));
}

let mut cqueue = self.inner.completion();
cqueue.sync();
let has_entry = !cqueue.is_empty();
let completed_entries = cqueue.filter_map(|entry| match entry.user_data() {
Self::CANCEL => None,
Self::NOTIFY => {
Expand All @@ -194,6 +156,7 @@ impl Driver {
_ => Some(create_entry(entry)),
});
entries.extend(completed_entries);
has_entry
}

pub fn create_op<T: crate::sys::OpCode + 'static>(&self, user_data: usize, op: T) -> RawOp {
Expand All @@ -207,13 +170,38 @@ impl Driver {
pub fn cancel(&mut self, user_data: usize, _registry: &mut Slab<RawOp>) {
instrument!(compio_log::Level::TRACE, "cancel", user_data);
trace!("cancel RawOp");
#[allow(clippy::useless_conversion)]
self.squeue.push_back(
AsyncCancel::new(user_data as _)
.build()
.user_data(Self::CANCEL)
.into(),
);
unsafe {
#[allow(clippy::useless_conversion)]
if self
.inner
.submission()
.push(
&AsyncCancel::new(user_data as _)
.build()
.user_data(Self::CANCEL)
.into(),
)
.is_err()
{
warn!("could not push AsyncCancel entry");
}
}
}

fn push_raw(&mut self, entry: SEntry) -> io::Result<()> {
loop {
let mut squeue = self.inner.submission();
match unsafe { squeue.push(&entry) } {
Ok(()) => {
squeue.sync();
break Ok(());
}
Err(_) => {
drop(squeue);
self.inner.submit()?;
}
}
}
}

pub fn push(&mut self, user_data: usize, op: &mut RawOp) -> Poll<io::Result<usize>> {
Expand All @@ -223,13 +211,12 @@ impl Driver {
match op_pin.create_entry() {
OpEntry::Submission(entry) => {
#[allow(clippy::useless_conversion)]
self.squeue
.push_back(entry.user_data(user_data as _).into());
self.push_raw(entry.user_data(user_data as _).into())?;
Poll::Pending
}
#[cfg(feature = "io-uring-sqe128")]
OpEntry::Submission128(_entry) => {
self.squeue.push_back(_entry.user_data(user_data as _));
OpEntry::Submission128(entry) => {
self.push_raw(entry.user_data(user_data as _))?;
Poll::Pending
}
OpEntry::Blocking => {
Expand Down Expand Up @@ -273,18 +260,12 @@ impl Driver {
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
// Anyway we need to submit once, no matter there are entries in squeue.
trace!("start polling");
loop {
let ended = self.flush_submissions();

self.submit_auto(timeout, ended)?;

if !self.poll_entries(&mut entries) {
self.submit_auto(timeout)?;
self.poll_entries(&mut entries);

if ended {
trace!("polling ended");
break;
}
}

Ok(())
}

Expand Down