Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(driver): move future state into RawOp #251

Merged
merged 17 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion compio-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ compio-log = { workspace = true }
cfg-if = { workspace = true }
crossbeam-channel = { workspace = true }
futures-util = { workspace = true }
slab = { workspace = true }
socket2 = { workspace = true }

# Windows specific dependencies
Expand Down
2 changes: 1 addition & 1 deletion compio-driver/src/fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl<T> SharedFd<T> {
impl<T> Drop for SharedFd<T> {
fn drop(&mut self) {
// It's OK to wake multiple times.
if Arc::strong_count(&self.0) == 2 {
if Arc::strong_count(&self.0) == 2 && self.0.waits.load(Ordering::Acquire) {
self.0.waker.wake()
}
}
Expand Down
22 changes: 10 additions & 12 deletions compio-driver/src/fusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ pub use driver_type::DriverType;
pub(crate) use iour::{sockaddr_storage, socklen_t};
pub use iour::{OpCode as IourOpCode, OpEntry};
pub use poll::{Decision, OpCode as PollOpCode};
use slab::Slab;

pub(crate) use crate::unix::RawOp;
use crate::{OutEntries, ProactorBuilder};
use crate::{Key, OutEntries, ProactorBuilder};

mod driver_type {
use std::sync::atomic::{AtomicU8, Ordering};
Expand Down Expand Up @@ -136,10 +134,10 @@ impl Driver {
}
}

pub fn create_op<T: OpCode + 'static>(&self, user_data: usize, op: T) -> RawOp {
pub fn create_op<T: OpCode + 'static>(&self, op: T) -> Key<T> {
match &self.fuse {
FuseDriver::Poll(driver) => driver.create_op(user_data, op),
FuseDriver::IoUring(driver) => driver.create_op(user_data, op),
FuseDriver::Poll(driver) => driver.create_op(op),
FuseDriver::IoUring(driver) => driver.create_op(op),
}
}

Expand All @@ -150,17 +148,17 @@ impl Driver {
}
}

pub fn cancel(&mut self, user_data: usize, registry: &mut Slab<RawOp>) {
pub fn cancel<T>(&mut self, op: Key<T>) {
match &mut self.fuse {
FuseDriver::Poll(driver) => driver.cancel(user_data, registry),
FuseDriver::IoUring(driver) => driver.cancel(user_data, registry),
FuseDriver::Poll(driver) => driver.cancel(op),
FuseDriver::IoUring(driver) => driver.cancel(op),
}
}

pub fn push(&mut self, user_data: usize, op: &mut RawOp) -> Poll<io::Result<usize>> {
pub fn push<T: OpCode + 'static>(&mut self, op: &mut Key<T>) -> Poll<io::Result<usize>> {
match &mut self.fuse {
FuseDriver::Poll(driver) => driver.push(user_data, op),
FuseDriver::IoUring(driver) => driver.push(user_data, op),
FuseDriver::Poll(driver) => driver.push(op),
FuseDriver::IoUring(driver) => driver.push(op),
}
}

Expand Down
18 changes: 5 additions & 13 deletions compio-driver/src/iocp/cp/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,11 @@ impl GlobalPort {
self.port.attach(fd)
}

pub fn post<T: ?Sized>(
&self,
res: io::Result<usize>,
optr: *mut Overlapped<T>,
) -> io::Result<()> {
pub fn post(&self, res: io::Result<usize>, optr: *mut Overlapped) -> io::Result<()> {
self.port.post(res, optr)
}

pub fn post_raw<T: ?Sized>(&self, optr: *const Overlapped<T>) -> io::Result<()> {
pub fn post_raw(&self, optr: *const Overlapped) -> io::Result<()> {
self.port.post_raw(optr)
}
}
Expand All @@ -62,7 +58,7 @@ fn iocp_start() -> io::Result<()> {
loop {
for entry in port.port.poll_raw(None)? {
// Any thin pointer is OK because we don't use the type of opcode.
let overlapped_ptr: *mut Overlapped<()> = entry.lpOverlapped.cast();
let overlapped_ptr: *mut Overlapped = entry.lpOverlapped.cast();
let overlapped = unsafe { &*overlapped_ptr };
if let Err(_e) = syscall!(
BOOL,
Expand Down Expand Up @@ -135,15 +131,11 @@ impl PortHandle {
Self { port }
}

pub fn post<T: ?Sized>(
&self,
res: io::Result<usize>,
optr: *mut Overlapped<T>,
) -> io::Result<()> {
pub fn post(&self, res: io::Result<usize>, optr: *mut Overlapped) -> io::Result<()> {
self.port.post(res, optr)
}

pub fn post_raw<T: ?Sized>(&self, optr: *const Overlapped<T>) -> io::Result<()> {
pub fn post_raw(&self, optr: *const Overlapped) -> io::Result<()> {
self.port.post_raw(optr)
}
}
12 changes: 4 additions & 8 deletions compio-driver/src/iocp/cp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,7 @@ impl CompletionPort {
Ok(())
}

pub fn post<T: ?Sized>(
&self,
res: io::Result<usize>,
optr: *mut Overlapped<T>,
) -> io::Result<()> {
pub fn post(&self, res: io::Result<usize>, optr: *mut Overlapped) -> io::Result<()> {
if let Some(overlapped) = unsafe { optr.as_mut() } {
match &res {
Ok(transferred) => {
Expand All @@ -97,7 +93,7 @@ impl CompletionPort {
self.post_raw(optr)
}

pub fn post_raw<T: ?Sized>(&self, optr: *const Overlapped<T>) -> io::Result<()> {
pub fn post_raw(&self, optr: *const Overlapped) -> io::Result<()> {
syscall!(
BOOL,
PostQueuedCompletionStatus(self.port.as_raw_handle() as _, 0, 0, optr.cast())
Expand Down Expand Up @@ -143,7 +139,7 @@ impl CompletionPort {
) -> io::Result<impl Iterator<Item = Entry>> {
Ok(self.poll_raw(timeout)?.filter_map(move |entry| {
// Any thin pointer is OK because we don't use the type of opcode.
let overlapped_ptr: *mut Overlapped<()> = entry.lpOverlapped.cast();
let overlapped_ptr: *mut Overlapped = entry.lpOverlapped.cast();
let overlapped = unsafe { &*overlapped_ptr };
if let Some(current_driver) = current_driver {
if overlapped.driver != current_driver {
Expand Down Expand Up @@ -181,7 +177,7 @@ impl CompletionPort {
_ => Err(io::Error::from_raw_os_error(error as _)),
}
};
Some(Entry::new(overlapped.user_data, res))
Some(Entry::new(overlapped_ptr as usize, res))
}))
}
}
Expand Down
8 changes: 2 additions & 6 deletions compio-driver/src/iocp/cp/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,11 @@ impl PortHandle {
Self { port }
}

pub fn post<T: ?Sized>(
&self,
res: io::Result<usize>,
optr: *mut Overlapped<T>,
) -> io::Result<()> {
pub fn post(&self, res: io::Result<usize>, optr: *mut Overlapped) -> io::Result<()> {
self.port.post(res, optr)
}

pub fn post_raw<T: ?Sized>(&self, optr: *const Overlapped<T>) -> io::Result<()> {
pub fn post_raw(&self, optr: *const Overlapped) -> io::Result<()> {
self.port.post_raw(optr)
}
}
Loading