diff --git a/monolake-core/src/config/mod.rs b/monolake-core/src/config/mod.rs index 46480a4..b03e9ab 100644 --- a/monolake-core/src/config/mod.rs +++ b/monolake-core/src/config/mod.rs @@ -25,6 +25,7 @@ pub struct RuntimeConfig { pub runtime_type: RuntimeType, #[serde(default = "default_cpu_affinity")] pub cpu_affinity: bool, + pub thread_pool: Option, } impl Default for RuntimeConfig { @@ -35,6 +36,7 @@ impl Default for RuntimeConfig { sqpoll_idle: None, runtime_type: Default::default(), cpu_affinity: default_cpu_affinity(), + thread_pool: None, } } } diff --git a/monolake-core/src/server/mod.rs b/monolake-core/src/server/mod.rs index bd5ca7a..4d93cd3 100644 --- a/monolake-core/src/server/mod.rs +++ b/monolake-core/src/server/mod.rs @@ -6,7 +6,7 @@ use futures_channel::{ oneshot::{channel as ochannel, Receiver as OReceiver, Sender as OSender}, }; use futures_util::{sink::SinkExt, stream::StreamExt}; -use monoio::{io::stream::Stream, utils::bind_to_cpu_set}; +use monoio::{blocking::DefaultThreadPool, io::stream::Stream, utils::bind_to_cpu_set}; use service_async::{MakeService, Service}; use tracing::{error, info, warn}; @@ -18,6 +18,7 @@ mod runtime; /// Manager is holden by the main thread, and is used to start and control workers. pub struct Manager { runtime_config: RuntimeConfig, + thread_pool: Option>, workers: Vec>>, } @@ -48,6 +49,7 @@ where let runtime_config = Arc::new(self.runtime_config.clone()); (0..self.runtime_config.worker_threads) .map(|worker_id| { + let thread_pool = self.thread_pool.clone(); let (tx, rx) = channel(128); let runtime_config = runtime_config.clone(); let (finish_tx, mut finish_rx) = futures_channel::oneshot::channel::<()>(); @@ -61,7 +63,10 @@ where warn!("bind thread {worker_id} to core {core} failed: {e}"); } } - let mut runtime = RuntimeWrapper::from(runtime_config.as_ref()); + let mut runtime = RuntimeWrapper::new( + runtime_config.as_ref(), + thread_pool.map(|p| p as Box<_>), + ); runtime.block_on(async move { worker_controller.run_controller(rx).await; finish_rx.close(); @@ -98,8 +103,12 @@ where impl Manager { pub fn new(runtime_config: RuntimeConfig) -> Self { + let thread_pool = runtime_config + .thread_pool + .map(|tn| Box::new(DefaultThreadPool::new(tn))); Self { runtime_config, + thread_pool, workers: Vec::new(), } } diff --git a/monolake-core/src/server/runtime.rs b/monolake-core/src/server/runtime.rs index 6146e10..1d440d2 100644 --- a/monolake-core/src/server/runtime.rs +++ b/monolake-core/src/server/runtime.rs @@ -16,8 +16,11 @@ pub enum RuntimeWrapper { Legacy(Runtime>), } -impl From<&RuntimeConfig> for RuntimeWrapper { - fn from(_config: &RuntimeConfig) -> Self { +impl RuntimeWrapper { + pub fn new( + _config: &RuntimeConfig, + thread_pool: Option>, + ) -> Self { #[cfg(target_os = "linux")] let runtime_type = if _config.runtime_type == RuntimeType::IoUring && monoio::utils::detect_uring() { @@ -41,18 +44,19 @@ impl From<&RuntimeConfig> for RuntimeWrapper { } None => RuntimeBuilder::::new(), }; - let runtime = builder - .enable_timer() - .with_entries(_config.entries) - .build() - .unwrap(); + let mut builder = builder.enable_timer().with_entries(_config.entries); + if let Some(tp) = thread_pool { + builder = builder.attach_thread_pool(tp); + } + let runtime = builder.build().unwrap(); RuntimeWrapper::IoUring(runtime) } RuntimeType::Legacy => { - let runtime = RuntimeBuilder::::new() - .enable_timer() - .build() - .unwrap(); + let mut builder = RuntimeBuilder::::new().enable_timer(); + if let Some(tp) = thread_pool { + builder = builder.attach_thread_pool(tp); + } + let runtime = builder.build().unwrap(); RuntimeWrapper::Legacy(runtime) } }