Skip to content

Commit

Permalink
feat: allow user attach thread pool with config
Browse files Browse the repository at this point in the history
  • Loading branch information
ihciah committed Sep 13, 2023
1 parent 64e78a4 commit 2d2e14d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
2 changes: 2 additions & 0 deletions monolake-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct RuntimeConfig {
pub runtime_type: RuntimeType,
#[serde(default = "default_cpu_affinity")]
pub cpu_affinity: bool,
pub thread_pool: Option<usize>,
}

impl Default for RuntimeConfig {
Expand All @@ -35,6 +36,7 @@ impl Default for RuntimeConfig {
sqpoll_idle: None,
runtime_type: Default::default(),
cpu_affinity: default_cpu_affinity(),
thread_pool: None,
}
}
}
Expand Down
13 changes: 11 additions & 2 deletions monolake-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures_channel::{
oneshot::{channel as ochannel, Receiver as OReceiver, Sender as OSender},
};
use futures_util::{sink::SinkExt, stream::StreamExt};
use monoio::{io::stream::Stream, utils::bind_to_cpu_set};
use monoio::{blocking::DefaultThreadPool, io::stream::Stream, utils::bind_to_cpu_set};
use service_async::{MakeService, Service};
use tracing::{error, info, warn};

Expand All @@ -18,6 +18,7 @@ mod runtime;
/// Manager is holden by the main thread, and is used to start and control workers.
pub struct Manager<F, LF> {
runtime_config: RuntimeConfig,
thread_pool: Option<Box<DefaultThreadPool>>,
workers: Vec<Sender<Update<F, LF>>>,
}

Expand Down Expand Up @@ -48,6 +49,7 @@ where
let runtime_config = Arc::new(self.runtime_config.clone());
(0..self.runtime_config.worker_threads)
.map(|worker_id| {
let thread_pool = self.thread_pool.clone();
let (tx, rx) = channel(128);
let runtime_config = runtime_config.clone();
let (finish_tx, mut finish_rx) = futures_channel::oneshot::channel::<()>();
Expand All @@ -61,7 +63,10 @@ where
warn!("bind thread {worker_id} to core {core} failed: {e}");
}
}
let mut runtime = RuntimeWrapper::from(runtime_config.as_ref());
let mut runtime = RuntimeWrapper::new(
runtime_config.as_ref(),
thread_pool.map(|p| p as Box<_>),
);
runtime.block_on(async move {
worker_controller.run_controller(rx).await;
finish_rx.close();
Expand Down Expand Up @@ -98,8 +103,12 @@ where

impl<F, LF> Manager<F, LF> {
pub fn new(runtime_config: RuntimeConfig) -> Self {
let thread_pool = runtime_config
.thread_pool
.map(|tn| Box::new(DefaultThreadPool::new(tn)));
Self {
runtime_config,
thread_pool,
workers: Vec::new(),
}
}
Expand Down
26 changes: 15 additions & 11 deletions monolake-core/src/server/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ pub enum RuntimeWrapper {
Legacy(Runtime<TimeDriver<LegacyDriver>>),
}

impl From<&RuntimeConfig> for RuntimeWrapper {
fn from(_config: &RuntimeConfig) -> Self {
impl RuntimeWrapper {
pub fn new(
_config: &RuntimeConfig,
thread_pool: Option<Box<dyn monoio::blocking::ThreadPool + Send + 'static>>,
) -> Self {
#[cfg(target_os = "linux")]
let runtime_type =
if _config.runtime_type == RuntimeType::IoUring && monoio::utils::detect_uring() {
Expand All @@ -41,18 +44,19 @@ impl From<&RuntimeConfig> for RuntimeWrapper {
}
None => RuntimeBuilder::<monoio::IoUringDriver>::new(),
};
let runtime = builder
.enable_timer()
.with_entries(_config.entries)
.build()
.unwrap();
let mut builder = builder.enable_timer().with_entries(_config.entries);
if let Some(tp) = thread_pool {
builder = builder.attach_thread_pool(tp);
}
let runtime = builder.build().unwrap();
RuntimeWrapper::IoUring(runtime)
}
RuntimeType::Legacy => {
let runtime = RuntimeBuilder::<monoio::LegacyDriver>::new()
.enable_timer()
.build()
.unwrap();
let mut builder = RuntimeBuilder::<monoio::LegacyDriver>::new().enable_timer();
if let Some(tp) = thread_pool {
builder = builder.attach_thread_pool(tp);
}
let runtime = builder.build().unwrap();
RuntimeWrapper::Legacy(runtime)
}
}
Expand Down

0 comments on commit 2d2e14d

Please sign in to comment.