Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flags related methods #272

Merged
merged 9 commits into from
Jul 9, 2024
4 changes: 2 additions & 2 deletions compio-driver/src/iour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl Driver {
let mut op = unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
let op_pin = op.as_op_pin();
let res = op_pin.call_blocking();
completed.push(Entry::new(user_data, res));
completed.push(Entry::new(user_data, res, todo!("how to get flags?")));
Sherlock-Holo marked this conversation as resolved.
Show resolved Hide resolved
handle.notify().ok();
})
.is_ok();
Expand Down Expand Up @@ -294,7 +294,7 @@ fn create_entry(entry: CEntry) -> Entry {
} else {
Ok(result as _)
};
Entry::new(entry.user_data() as _, result)
Entry::new(entry.user_data() as _, result, entry.flags())
}

fn timespec(duration: std::time::Duration) -> Timespec {
Expand Down
19 changes: 19 additions & 0 deletions compio-driver/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) struct RawOp<T: ?Sized> {
// The metadata in `*mut RawOp<dyn OpCode>`
metadata: usize,
result: PushEntry<Option<Waker>, io::Result<usize>>,
flags: u32,
op: T,
}

Expand Down Expand Up @@ -84,6 +85,7 @@ impl<T: OpCode + 'static> Key<T> {
cancelled: false,
metadata: opcode_metadata::<T>(),
result: PushEntry::Pending(None),
flags: 0,
op,
});
unsafe { Self::new_unchecked(Box::into_raw(raw_op) as _) }
Expand Down Expand Up @@ -154,6 +156,10 @@ impl<T: ?Sized> Key<T> {
this.cancelled
}

pub(crate) fn set_flags(&mut self, flags: u32) {
self.as_opaque_mut().flags = flags;
}

/// Whether the op is completed.
pub(crate) fn has_result(&self) -> bool {
self.as_opaque().result.is_ready()
Expand Down Expand Up @@ -189,6 +195,19 @@ impl<T> Key<T> {
let op = unsafe { Box::from_raw(self.user_data as *mut RawOp<T>) };
BufResult(op.result.take_ready().unwrap_unchecked(), op.op)
}

/// Get the inner result and flags if it is completed.
///
/// # Safety
///
/// Call it only when the op is completed, otherwise it is UB.
pub(crate) unsafe fn into_inner_flags(self) -> (BufResult<usize, T>, u32) {
Sherlock-Holo marked this conversation as resolved.
Show resolved Hide resolved
let op = unsafe { Box::from_raw(self.user_data as *mut RawOp<T>) };
(
BufResult(op.result.take_ready().unwrap_unchecked(), op.op),
op.flags,
)
}
}

impl<T: OpCode + ?Sized> Key<T> {
Expand Down
29 changes: 27 additions & 2 deletions compio-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,21 @@ impl Proactor {
}
}

/// Get the pushed operations from the completion entries.
///
/// # Panics
/// This function will panic if the requested operation has not been
/// completed.
pub fn pop_flags<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
instrument!(compio_log::Level::DEBUG, "pop_flags", ?op);
Sherlock-Holo marked this conversation as resolved.
Show resolved Hide resolved
if op.has_result() {
// SAFETY: completed.
PushEntry::Ready(unsafe { op.into_inner_flags() })
} else {
PushEntry::Pending(op)
}
}

/// Update the waker of the specified op.
pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: Waker) {
op.set_waker(waker);
Expand All @@ -322,18 +337,27 @@ impl AsRawFd for Proactor {
pub(crate) struct Entry {
user_data: usize,
result: io::Result<usize>,
flags: u32,
}

impl Entry {
pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
Self { user_data, result }
pub(crate) fn new(user_data: usize, result: io::Result<usize>, flags: u32) -> Self {
Sherlock-Holo marked this conversation as resolved.
Show resolved Hide resolved
Self {
user_data,
result,
flags,
}
}

/// The user-defined data returned by [`Proactor::push`].
pub fn user_data(&self) -> usize {
self.user_data
}

pub fn flags(&self) -> u32 {
self.flags
}

/// The result of the operation.
pub fn into_result(self) -> io::Result<usize> {
self.result
Expand All @@ -357,6 +381,7 @@ impl<E: Extend<usize>> Extend<Entry> for OutEntries<'_, E> {
self.entries.extend(iter.into_iter().filter_map(|e| {
let user_data = e.user_data();
let mut op = unsafe { Key::<()>::new_unchecked(user_data) };
op.set_flags(e.flags());
if op.set_result(e.into_result()) {
// SAFETY: completed and cancelled.
let _ = unsafe { op.into_box() };
Expand Down
38 changes: 37 additions & 1 deletion compio-runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use send_wrapper::SendWrapper;

#[cfg(feature = "time")]
use crate::runtime::time::{TimerFuture, TimerRuntime};
use crate::{runtime::op::OpFuture, BufResult};
use crate::{
runtime::op::{OpFlagsFuture, OpFuture},
BufResult,
};

scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);

Expand Down Expand Up @@ -241,6 +244,26 @@ impl Runtime {
}
}

/// Submit an operation to the runtime.
///
/// The difference between [`Runtime::submit`] is this method will return
/// the flags
///
/// You only need this when authoring your own [`OpCode`].
pub fn submit_flags<T: OpCode + 'static>(
&self,
op: T,
) -> impl Future<Output = (BufResult<usize, T>, u32)> {
match self.submit_raw(op) {
PushEntry::Pending(user_data) => Either::Left(OpFlagsFuture::new(user_data)),
PushEntry::Ready(res) => {
// submit_flags won't be ready immediately, if ready, it must be error without
// flags
Either::Right(ready((res, 0)))
}
}
}

#[cfg(feature = "time")]
pub(crate) fn create_timer(&self, delay: std::time::Duration) -> impl Future<Output = ()> {
let mut timer_runtime = self.timer_runtime.borrow_mut();
Expand Down Expand Up @@ -273,6 +296,19 @@ impl Runtime {
})
}

pub(crate) fn poll_task_flags<T: OpCode>(
&self,
cx: &mut Context,
op: Key<T>,
) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
instrument!(compio_log::Level::DEBUG, "poll_task_flags", ?op);
let mut driver = self.driver.borrow_mut();
driver.pop_flags(op).map_pending(|mut k| {
driver.update_waker(&mut k, cx.waker().clone());
k
})
}

#[cfg(feature = "time")]
pub(crate) fn poll_timer(&self, cx: &mut Context, key: usize) -> Poll<()> {
instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
Expand Down
34 changes: 34 additions & 0 deletions compio-runtime/src/runtime/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,37 @@ impl<T: OpCode> Drop for OpFuture<T> {
}
}
}

#[derive(Debug)]
pub struct OpFlagsFuture<T: OpCode> {
Sherlock-Holo marked this conversation as resolved.
Show resolved Hide resolved
key: Option<Key<T>>,
}

impl<T: OpCode> OpFlagsFuture<T> {
pub fn new(key: Key<T>) -> Self {
Self { key: Some(key) }
}
}

impl<T: OpCode> Future for OpFlagsFuture<T> {
type Output = (BufResult<usize, T>, u32);

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = Runtime::with_current(|r| r.poll_task_flags(cx, self.key.take().unwrap()));
match res {
PushEntry::Pending(key) => {
self.key = Some(key);
Poll::Pending
}
PushEntry::Ready(res) => Poll::Ready(res),
}
}
}

impl<T: OpCode> Drop for OpFlagsFuture<T> {
fn drop(&mut self) {
if let Some(key) = self.key.take() {
Runtime::with_current(|r| r.cancel_op(key))
}
}
}
Loading