Skip to content

Commit

Permalink
fix(driver,iour): remove outer squeue
Browse files Browse the repository at this point in the history
  • Loading branch information
Berrysoft committed Mar 12, 2024
1 parent c8902b8 commit bcf9301
Showing 1 changed file with 57 additions and 80 deletions.
137 changes: 57 additions & 80 deletions compio-driver/src/iour/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
#[allow(unused_imports)]
pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::{
collections::VecDeque, io, os::fd::OwnedFd, pin::Pin, ptr::NonNull, sync::Arc, task::Poll,
time::Duration,
};
use std::{io, os::fd::OwnedFd, pin::Pin, ptr::NonNull, sync::Arc, task::Poll, time::Duration};

use compio_log::{instrument, trace};
use crossbeam_queue::SegQueue;
Expand Down Expand Up @@ -74,7 +71,6 @@ pub trait OpCode {
/// Low-level driver of io-uring.
pub(crate) struct Driver {
inner: IoUring<SEntry, CEntry>,
squeue: VecDeque<SEntry>,
notifier: Notifier,
pool: AsyncifyPool,
pool_completed: Arc<SegQueue<Entry>>,
Expand All @@ -87,29 +83,33 @@ impl Driver {
pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
instrument!(compio_log::Level::TRACE, "new", ?builder);
trace!("new iour driver");
let mut squeue = VecDeque::with_capacity(builder.capacity as usize);
let notifier = Notifier::new()?;
let mut inner = IoUring::builder().build(builder.capacity)?;
#[allow(clippy::useless_conversion)]
squeue.push_back(
PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _)
.multi(true)
.build()
.user_data(Self::NOTIFY)
.into(),
);
unsafe {
inner
.submission()
.push(
&PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _)
.multi(true)
.build()
.user_data(Self::NOTIFY)
.into(),
)
.expect("the squeue sould not be empty");
}
Ok(Self {
inner: IoUring::builder().build(builder.capacity)?,
squeue,
inner,
notifier,
pool: builder.create_or_get_thread_pool(),
pool_completed: Arc::new(SegQueue::new()),
})
}

// Auto means that it choose to wait or not automatically.
fn submit_auto(&mut self, timeout: Option<Duration>, wait: bool) -> io::Result<()> {
fn submit_auto(&mut self, timeout: Option<Duration>) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "submit_auto", ?timeout, wait);
let res = if wait {
let res = {
// Last part of submission queue, wait till timeout.
if let Some(duration) = timeout {
let timespec = timespec(duration);
Expand All @@ -118,70 +118,32 @@ impl Driver {
} else {
self.inner.submit_and_wait(1)
}
} else {
self.inner.submit()
};
trace!("submit result: {res:?}");
match res {
Ok(_) => {
if self.inner.completion().is_empty() {
Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
Err(io::ErrorKind::TimedOut.into())
} else {
Ok(())
}
}
Err(e) => match e.raw_os_error() {
Some(libc::ETIME) => Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)),
Some(libc::EBUSY) | Some(libc::EAGAIN) => Ok(()),
Some(libc::ETIME) => Err(io::ErrorKind::TimedOut.into()),
Some(libc::EBUSY) | Some(libc::EAGAIN) => Err(io::ErrorKind::Interrupted.into()),
_ => Err(e),
},
}
}

fn flush_submissions(&mut self) -> bool {
instrument!(compio_log::Level::TRACE, "flush_submissions");

let mut ended_ops = false;

let mut inner_squeue = self.inner.submission();

while !inner_squeue.is_full() {
if self.squeue.len() <= inner_squeue.capacity() - inner_squeue.len() {
trace!("inner_squeue have enough space, flush all entries");
let (s1, s2) = self.squeue.as_slices();
unsafe {
inner_squeue
.push_multiple(s1)
.expect("queue has enough space");
inner_squeue
.push_multiple(s2)
.expect("queue has enough space");
}
self.squeue.clear();
ended_ops = true;
break;
} else if let Some(entry) = self.squeue.pop_front() {
trace!("inner_squeue have not enough space, flush an entry");
unsafe { inner_squeue.push(&entry) }.expect("queue has enough space");
} else {
trace!("self.squeue is empty, skip");
ended_ops = true;
break;
}
}

inner_squeue.sync();

ended_ops
}

fn poll_entries(&mut self, entries: &mut impl Extend<Entry>) {
fn poll_entries(&mut self, entries: &mut impl Extend<Entry>) -> bool {
while let Some(entry) = self.pool_completed.pop() {
entries.extend(Some(entry));
}

let mut cqueue = self.inner.completion();
cqueue.sync();
let has_entry = !cqueue.is_empty();
let completed_entries = cqueue.filter_map(|entry| match entry.user_data() {
Self::CANCEL => None,
Self::NOTIFY => {
Expand All @@ -194,6 +156,7 @@ impl Driver {
_ => Some(create_entry(entry)),
});
entries.extend(completed_entries);
has_entry
}

pub fn create_op<T: crate::sys::OpCode + 'static>(&self, user_data: usize, op: T) -> RawOp {
Expand All @@ -207,13 +170,34 @@ impl Driver {
pub fn cancel(&mut self, user_data: usize, _registry: &mut Slab<RawOp>) {
instrument!(compio_log::Level::TRACE, "cancel", user_data);
trace!("cancel RawOp");
#[allow(clippy::useless_conversion)]
self.squeue.push_back(
AsyncCancel::new(user_data as _)
.build()
.user_data(Self::CANCEL)
.into(),
);
unsafe {
#[allow(clippy::useless_conversion)]
self.inner
.submission()
.push(
&AsyncCancel::new(user_data as _)
.build()
.user_data(Self::CANCEL)
.into(),
)
.ok();
}
}

fn push_raw(&mut self, entry: SEntry) -> io::Result<()> {
loop {
let mut squeue = self.inner.submission();
match unsafe { squeue.push(&entry) } {
Ok(()) => {
squeue.sync();
break Ok(());
}
Err(_) => {
drop(squeue);
self.inner.submit()?;
}
}
}
}

pub fn push(&mut self, user_data: usize, op: &mut RawOp) -> Poll<io::Result<usize>> {
Expand All @@ -223,13 +207,12 @@ impl Driver {
match op_pin.create_entry() {
OpEntry::Submission(entry) => {
#[allow(clippy::useless_conversion)]
self.squeue
.push_back(entry.user_data(user_data as _).into());
self.push_raw(entry.user_data(user_data as _).into())?;
Poll::Pending
}
#[cfg(feature = "io-uring-sqe128")]
OpEntry::Submission128(_entry) => {
self.squeue.push_back(_entry.user_data(user_data as _));
OpEntry::Submission128(entry) => {
self.push_raw(entry.user_data(user_data as _))?;
Poll::Pending
}
OpEntry::Blocking => {
Expand Down Expand Up @@ -273,18 +256,12 @@ impl Driver {
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
// Anyway we need to submit once, no matter there are entries in squeue.
trace!("start polling");
loop {
let ended = self.flush_submissions();

self.submit_auto(timeout, ended)?;

if !self.poll_entries(&mut entries) {
self.submit_auto(timeout)?;
self.poll_entries(&mut entries);

if ended {
trace!("polling ended");
break;
}
}

Ok(())
}

Expand Down

0 comments on commit bcf9301

Please sign in to comment.