Skip to content

Commit

Permalink
Merge pull request #255 from George-Miao/refactor/dispatcher
Browse files Browse the repository at this point in the history
refactor(dispatcher): reduces alloc and API change
  • Loading branch information
Berrysoft authored May 26, 2024
2 parents 8429dc7 + f1822c0 commit 4d6d3f1
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 106 deletions.
4 changes: 3 additions & 1 deletion compio-dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ compio-driver = { workspace = true }
compio-runtime = { workspace = true, features = ["event", "time"] }

flume = { workspace = true }
futures-util = { workspace = true }
futures-channel = { workspace = true }

[dev-dependencies]
compio-buf = { workspace = true }
compio-io = { workspace = true }
compio-net = { workspace = true }
compio-macros = { workspace = true }

futures-util = { workspace = true }
153 changes: 53 additions & 100 deletions compio-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,49 @@
#![warn(missing_docs)]

use std::{
any::Any,
future::Future,
io,
num::NonZeroUsize,
panic::resume_unwind,
pin::Pin,
sync::{Arc, Mutex},
thread::{available_parallelism, JoinHandle},
};

use compio_driver::{AsyncifyPool, ProactorBuilder};
use compio_runtime::{event::Event, Runtime};
use compio_runtime::{event::Event, JoinHandle as CompioJoinHandle, Runtime};
use flume::{unbounded, SendError, Sender};
use futures_channel::oneshot;

type Dispatching = Box<dyn Dispatchable + Send>;

trait Dispatchable {
fn spawn(self: Box<Self>, handle: &Runtime) -> CompioJoinHandle<()>;
}

/// Concrete type for the closure we're sending to worker threads
struct Concrete<F, R> {
callback: oneshot::Sender<R>,
func: F,
}

impl<F, Fut, R> Dispatchable for Concrete<F, R>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = R>,
R: Send + 'static,
{
fn spawn(self: Box<Self>, handle: &Runtime) -> CompioJoinHandle<()> {
let Concrete { callback, func } = *self;
handle.spawn(async move {
let res = func().await;
callback.send(res).ok();
})
}
}

/// The dispatcher. It manages the threads and dispatches the tasks.
pub struct Dispatcher {
sender: Sender<Box<Closure>>,
sender: Sender<Dispatching>,
threads: Vec<JoinHandle<()>>,
pool: AsyncifyPool,
}
Expand All @@ -30,7 +56,7 @@ impl Dispatcher {
let mut proactor_builder = builder.proactor_builder;
proactor_builder.force_reuse_thread_pool();
let pool = proactor_builder.create_or_get_thread_pool();
let (sender, receiver) = unbounded::<Box<Closure>>();
let (sender, receiver) = unbounded::<Dispatching>();

let threads = (0..builder.nthreads)
.map({
Expand All @@ -57,14 +83,14 @@ impl Dispatcher {
.expect("cannot create compio runtime")
.block_on(async move {
while let Ok(f) = receiver.recv_async().await {
let fut = (f)();
let task = Runtime::with_current(|rt| f.spawn(rt));
if builder.concurrent {
compio_runtime::spawn(fut).detach()
task.detach()
} else {
fut.await
task.await.ok();
}
}
})
});
})
}
})
Expand All @@ -86,75 +112,36 @@ impl Dispatcher {
DispatcherBuilder::default()
}

fn prepare<Fut, Fn, R>(&self, f: Fn) -> (Executing<R>, Box<Closure>)
where
Fn: (FnOnce() -> Fut) + Send + 'static,
Fut: Future<Output = R> + 'static,
R: Any + Send + 'static,
{
let event = Event::new();
let handle = event.handle();
let res = Arc::new(Mutex::new(None));
let dispatched = Executing {
event,
result: res.clone(),
};
let closure = Box::new(|| {
Box::pin(async move {
*res.lock().unwrap() = Some(f().await);
handle.notify();
}) as BoxFuture<()>
});
(dispatched, closure)
}

/// Spawn a boxed closure to the threads.
///
/// If all threads have panicked, this method will return an error with the
/// sent closure.
pub fn spawn(&self, closure: Box<Closure>) -> Result<(), SendError<Box<Closure>>> {
self.sender.send(closure)
}

/// Dispatch a task to the threads
///
/// The provided `f` should be [`Send`] because it will be send to another
/// thread before calling. The return [`Future`] need not to be [`Send`]
/// thread before calling. The returned [`Future`] need not to be [`Send`]
/// because it will be executed on only one thread.
///
/// # Error
///
/// If all threads have panicked, this method will return an error with the
/// sent closure. Notice that the returned closure is not the same as the
/// argument and cannot be simply transmuted back to `Fn`.
pub fn dispatch<Fut, Fn>(&self, f: Fn) -> Result<(), SendError<Box<Closure>>>
where
Fn: (FnOnce() -> Fut) + Send + 'static,
Fut: Future<Output = ()> + 'static,
{
self.spawn(Box::new(|| Box::pin(f()) as BoxFuture<()>))
}

/// Execute a task on the threads and retrieve its returned value.
///
/// The provided `f` should be [`Send`] because it will be send to another
/// thread before calling. The return [`Future`] need not to be [`Send`]
/// because it will be executed on only one thread.
///
/// # Error
///
/// If all threads have panicked, this method will return an error with the
/// sent closure. Notice that the returned closure is not the same as the
/// argument and cannot be simply transmuted back to `Fn`.
pub fn execute<Fut, Fn, R>(&self, f: Fn) -> Result<Executing<R>, SendError<Box<Closure>>>
/// sent closure.
pub fn dispatch<Fut, Fn, R>(&self, f: Fn) -> Result<oneshot::Receiver<R>, SendError<Fn>>
where
Fn: (FnOnce() -> Fut) + Send + 'static,
Fut: Future<Output = R> + 'static,
R: Any + Send + 'static,
R: Send + 'static,
{
let (dispatched, closure) = self.prepare(f);
self.spawn(closure)?;
Ok(dispatched)
let (tx, rx) = oneshot::channel();
let concrete: Concrete<Fn, R> = Concrete {
callback: tx,
func: f,
};
match self.sender.send(Box::new(concrete)) {
Ok(_) => Ok(rx),
Err(err) => {
// SAFETY: We know the dispatchable we sent has type `Concrete<Fn, R>`
let recovered =
unsafe { Box::from_raw(Box::into_raw(err.0) as *mut Concrete<Fn, R>) };
Err(SendError(recovered.func))
}
}
}

/// Stop the dispatcher and wait for the threads to complete. If there is a
Expand Down Expand Up @@ -253,37 +240,3 @@ impl Default for DispatcherBuilder {
Self::new()
}
}

type BoxFuture<T> = Pin<Box<dyn Future<Output = T>>>;
type Closure = dyn (FnOnce() -> BoxFuture<()>) + Send;

/// The join handle for an executing task. It can be used to wait for the
/// task's returned value.
pub struct Executing<R> {
event: Event,
result: Arc<Mutex<Option<R>>>,
}

impl<R: 'static> Executing<R> {
fn take(val: &Mutex<Option<R>>) -> R {
val.lock()
.unwrap()
.take()
.expect("the result should be set")
}

/// Try to wait for the task to complete without blocking.
pub fn try_join(self) -> Result<R, Self> {
if self.event.notified() {
Ok(Self::take(&self.result))
} else {
Err(self)
}
}

/// Wait for the task to complete.
pub async fn join(self) -> R {
self.event.wait().await;
Self::take(&self.result)
}
}
4 changes: 2 additions & 2 deletions compio-dispatcher/tests/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ async fn listener_dispatch() {
for _i in 0..CLIENT_NUM {
let (mut srv, _) = listener.accept().await.unwrap();
let handle = dispatcher
.execute(move || async move {
.dispatch(move || async move {
let (_, buf) = srv.read_exact(ArrayVec::<u8, 12>::new()).await.unwrap();
assert_eq!(buf.as_slice(), b"Hello world!");
})
.unwrap();
handles.push(handle.join());
handles.push(handle);
}
while handles.next().await.is_some() {}
let (_, results) = futures_util::join!(task, dispatcher.join());
Expand Down
2 changes: 1 addition & 1 deletion compio-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ pub mod time;
pub use async_task::Task;
pub use attacher::*;
use compio_buf::BufResult;
pub use runtime::{spawn, spawn_blocking, submit, Runtime, RuntimeBuilder};
pub use runtime::{spawn, spawn_blocking, submit, JoinHandle, Runtime, RuntimeBuilder};
2 changes: 2 additions & 0 deletions compio-runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::{runtime::op::OpFuture, BufResult};

scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);

/// Type alias for `Task<Result<T, Box<dyn Any + Send>>>`, which resolves to an
/// `Err` when the spawned future panicked.
pub type JoinHandle<T> = Task<Result<T, Box<dyn Any + Send>>>;

/// The async runtime of compio. It is a thread local runtime, and cannot be
Expand Down
4 changes: 2 additions & 2 deletions compio/examples/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ async fn main() {
for _i in 0..CLIENT_NUM {
let (mut srv, _) = listener.accept().await.unwrap();
let handle = dispatcher
.execute(move || async move {
.dispatch(move || async move {
let BufResult(res, buf) = srv.read(Vec::with_capacity(20)).await;
res.unwrap();
println!("{}", std::str::from_utf8(&buf).unwrap());
})
.unwrap();
handles.push(handle.join());
handles.push(handle);
}
while handles.next().await.is_some() {}
dispatcher.join().await.unwrap();
Expand Down

0 comments on commit 4d6d3f1

Please sign in to comment.