diff --git a/monolake/src/config/manager.rs b/monolake/src/config/manager.rs new file mode 100644 index 0000000..0edffb2 --- /dev/null +++ b/monolake/src/config/manager.rs @@ -0,0 +1,234 @@ +use std::{ + cell::RefCell, + collections::{HashMap, HashSet}, + path::{Path, PathBuf}, + rc::Rc, + sync::Arc, + time::Duration, +}; + +use monoio::spawn; +use monolake_core::{ + config::ServiceConfig, + orchestrator::{ServiceCommand, WorkerManager}, +}; +use service_async::AsyncMakeService; + +use crate::config::{parse_server_config, ListenerConfig, ServerConfig}; + +type ServerConfigMap = HashMap>; + +pub struct StaticFileConfigManager +where FP: Fn(ServerConfig) -> F, + LFP: Fn(ListenerConfig) -> LF, +{ + online_config_content: RefCell>, + online_servers: RefCell, + worker_manager: RefCell>, + listener_factory_provider: LFP, + server_factory_provider: FP, +} + +impl StaticFileConfigManager +where + F: Send + Clone + 'static, + LF: Send + Clone + 'static, + FP: 'static, + LFP: 'static, + F: AsyncMakeService, + FP: Fn(ServerConfig) -> F, + LFP: Fn(ListenerConfig) -> LF, +{ + pub fn new( + worker_manager: WorkerManager, + listener_factory_provider: LFP, + server_factory_provider: FP, + ) -> Self { + Self { + online_config_content: Default::default(), + online_servers: Default::default(), + worker_manager: RefCell::new(worker_manager), + listener_factory_provider, + server_factory_provider, + } + } + + pub async fn load_and_watch(self: &Rc, path: impl AsRef) -> anyhow::Result<()> { + self.reload_file(&path).await?; + self.watch(path.as_ref().to_path_buf()).await; + Ok(()) + } + + async fn reload_file(&self, path: impl AsRef) -> anyhow::Result<()> { + let latest_content = monolake_core::util::file_read(path).await?; + if self.online_config_content.borrow().eq(&latest_content) { + return Ok(()); + } + + tracing::info!("config change detected, reloading"); + let new_servers = parse_server_config(&latest_content)?; + self.reload_servers(&new_servers).await?; + + self.online_config_content.replace(latest_content); + self.online_servers.replace(new_servers); + Ok(()) + } + + async fn reload_servers(&self, new_servers: &ServerConfigMap) -> anyhow::Result<()> { + let patches = Self::diff(&self.online_servers.borrow(), new_servers); + match self.prepare(&patches).await { + Ok(_) => { + self.commit(&patches) + .await + .expect("config reload failed at commit stage"); + Ok(()) + } + Err(e) => { + tracing::error!("config reload failed at prepare stage: {}, aborting", e); + self.abort(&patches) + .await + .expect("abort config reload failed"); + Err(e) + } + } + } + + fn diff(old_servers: &ServerConfigMap, new_servers: &ServerConfigMap) -> Vec { + let mut patches = Vec::new(); + + let old_keys = old_servers.keys().collect::>(); + let new_keys = new_servers.keys().collect::>(); + let all_keys = old_keys.union(&new_keys).collect::>(); + for key in all_keys { + let patch = match (old_keys.contains(key), new_keys.contains(key)) { + (true, true) => { + // TODO: skip keys whose configuration didn't change + let new_config = new_servers.get(*key).unwrap(); + Patch::Update { + key: key.to_string(), + server_config: new_config.server.clone(), + } + } + (true, false) => Patch::Delete { + key: key.to_string(), + }, + (false, true) => { + let new_config = new_servers.get(*key).unwrap(); + Patch::Insert { + key: key.to_string(), + listener_config: new_config.listener.clone(), + server_config: new_config.server.clone(), + } + } + (false, false) => { + panic!("unexpected error: illegal key {}", key); + } + }; + patches.push(patch); + } + patches + } + + async fn prepare(&self, patches: &[Patch]) -> anyhow::Result<()> { + for patch in patches { + match patch { + Patch::Insert { + key, server_config, .. + } + | Patch::Update { + key, server_config, .. + } => { + self.worker_manager.borrow_mut() + .dispatch_service_command(ServiceCommand::Precommit( + Arc::new(key.to_string()), + (self.server_factory_provider)(server_config.clone()), + )) + .await + .err()?; + } + Patch::Delete { .. } => { + // nothing to do at prepare stage + } + } + } + Ok(()) + } + + async fn commit(&self, patches: &[Patch]) -> anyhow::Result<()> { + for patch in patches { + match patch { + Patch::Insert { + key, + listener_config, + .. + } => { + self.worker_manager.borrow_mut() + .dispatch_service_command(ServiceCommand::Commit( + Arc::new(key.to_string()), + (self.listener_factory_provider)(listener_config.clone()), + )) + .await + .err()?; + } + Patch::Update { key, .. } => { + self.worker_manager.borrow_mut() + .dispatch_service_command(ServiceCommand::Update(Arc::new(key.to_string()))) + .await + .err()?; + } + Patch::Delete { key } => { + self.worker_manager.borrow_mut() + .dispatch_service_command(ServiceCommand::Remove(Arc::new(key.to_string()))) + .await + .err()?; + // nothing to do at prepare stage + } + } + } + Ok(()) + } + + async fn abort(&self, patches: &[Patch]) -> anyhow::Result<()> { + for patch in patches { + match patch { + Patch::Insert { key, .. } | Patch::Update { key, .. } => { + self.worker_manager.borrow_mut() + .dispatch_service_command(ServiceCommand::Abort(Arc::new(key.to_string()))) + .await; // discard error + } + Patch::Delete { .. } => { + // nothing to do at abort stage + } + } + } + Ok(()) + } + + async fn watch(self: &Rc, path: PathBuf) { + let self_clone = self.clone(); + spawn(async move { + loop { + if let Err(e) = self_clone.reload_file(&path).await { + tracing::error!("reload config failed: {}", e); + } + monoio::time::sleep(Duration::from_secs(1)).await; + } + }) + .await; + } +} + +enum Patch { + Insert { + key: String, + listener_config: ListenerConfig, + server_config: ServerConfig, + }, + Update { + key: String, + server_config: ServerConfig, // ListenerConfig dynamic update not supported yet + }, + Delete { + key: String, + }, +} diff --git a/monolake/src/config/mod.rs b/monolake/src/config/mod.rs index c9834fa..f24cd81 100644 --- a/monolake/src/config/mod.rs +++ b/monolake/src/config/mod.rs @@ -11,12 +11,7 @@ use monolake_services::{ use serde::{de::DeserializeOwned, Deserialize, Serialize}; mod extractor; - -#[derive(Debug, Clone)] -pub struct Config { - pub runtime: RuntimeConfig, - pub servers: HashMap>, -} +pub mod manager; #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq, Eq)] #[serde(rename_all = "snake_case")] @@ -114,95 +109,99 @@ impl TryFrom for ListenerBuilder { } } -impl Config { - pub fn load(path: impl AsRef) -> anyhow::Result { - #[derive(Debug, Clone, Serialize, Deserialize)] - pub struct UserConfig { - #[serde(default)] - pub runtime: RuntimeConfig, - pub servers: HashMap>, - } - // 1. load from file -> UserConfig - let file_context = monolake_core::util::file_read_sync(path)?; - let user_config = Self::from_slice::(&file_context)?; - - // 2. UserConfig -> Config - let UserConfig { runtime, servers } = user_config; - let mut servers_new = HashMap::with_capacity(servers.len()); - for (key, server) in servers.into_iter() { - let ServiceConfig { listener, server } = server; - #[cfg(feature = "tls")] - let tls = match server.tls { - Some(inner) => { - let chain = monolake_core::util::file_read_sync(&inner.chain)?; - let key = monolake_core::util::file_read_sync(&inner.key)?; - match inner.stack { - TlsStack::Rustls => { - monolake_services::tls::TlsConfig::Rustls((chain, key)).try_into()? - } - TlsStack::NativeTls => { - monolake_services::tls::TlsConfig::Native((chain, key)).try_into()? - } +pub fn load_runtime_config(path: impl AsRef) -> anyhow::Result { + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct RuntimeConfigContainer { + #[serde(default)] + pub runtime: RuntimeConfig, + } + let file_content = monolake_core::util::file_read_sync(path)?; + let container = from_slice::(&file_content)?; + Ok(container.runtime) +} + +pub fn parse_server_config(file_content: &[u8]) -> anyhow::Result>> { + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct UserConfig { + pub servers: HashMap>, + } + + // 1. load from file -> UserConfig + let user_config = from_slice::(&file_content)?; + + // 2. UserConfig -> Config + let UserConfig { servers } = user_config; + let mut servers_new = HashMap::with_capacity(servers.len()); + for (key, server) in servers.into_iter() { + let ServiceConfig { listener, server } = server; + #[cfg(feature = "tls")] + let tls = match server.tls { + Some(inner) => { + let chain = monolake_core::util::file_read_sync(&inner.chain)?; + let key = monolake_core::util::file_read_sync(&inner.key)?; + match inner.stack { + TlsStack::Rustls => { + monolake_services::tls::TlsConfig::Rustls((chain, key)).try_into()? + } + TlsStack::NativeTls => { + monolake_services::tls::TlsConfig::Native((chain, key)).try_into()? } } - None => monolake_services::tls::TlsConfig::None, - }; - let server_http_timeout = server.http_timeout.unwrap_or_default(); - let server_thrift_timeout = server.thrift_timeout.unwrap_or_default(); - servers_new.insert( - key, - ServiceConfig { - server: ServerConfig { - name: server.name, - proxy_type: server.proxy_type, - #[cfg(feature = "tls")] - tls, - routes: server.routes, - http_server_timeout: HttpServerTimeout { - keepalive_timeout: server_http_timeout - .server_keepalive_timeout_sec - .map(Duration::from_secs), - read_header_timeout: server_http_timeout - .server_read_header_timeout_sec - .map(Duration::from_secs), - read_body_timeout: server_http_timeout - .server_read_body_timeout_sec - .map(Duration::from_secs), - }, - thrift_server_timeout: ThriftServerTimeout { - keepalive_timeout: server_thrift_timeout - .server_keepalive_timeout_sec - .map(Duration::from_secs), - message_timeout: server_thrift_timeout - .server_message_timeout_sec - .map(Duration::from_secs), - }, - protocol: server.protocol, - #[cfg(feature = "openid")] - auth_config: None, + } + None => monolake_services::tls::TlsConfig::None, + }; + let server_http_timeout = server.http_timeout.unwrap_or_default(); + let server_thrift_timeout = server.thrift_timeout.unwrap_or_default(); + servers_new.insert( + key, + ServiceConfig { + server: ServerConfig { + name: server.name, + proxy_type: server.proxy_type, + #[cfg(feature = "tls")] + tls, + routes: server.routes, + http_server_timeout: HttpServerTimeout { + keepalive_timeout: server_http_timeout + .server_keepalive_timeout_sec + .map(Duration::from_secs), + read_header_timeout: server_http_timeout + .server_read_header_timeout_sec + .map(Duration::from_secs), + read_body_timeout: server_http_timeout + .server_read_body_timeout_sec + .map(Duration::from_secs), + }, + thrift_server_timeout: ThriftServerTimeout { + keepalive_timeout: server_thrift_timeout + .server_keepalive_timeout_sec + .map(Duration::from_secs), + message_timeout: server_thrift_timeout + .server_message_timeout_sec + .map(Duration::from_secs), }, - listener, + protocol: server.protocol, + #[cfg(feature = "openid")] + auth_config: None, }, - ); - } - Ok(Config { - runtime, - servers: servers_new, - }) + listener, + }, + ); } + Ok(servers_new) +} - pub fn from_slice(content: &[u8]) -> anyhow::Result { - // read first non-space u8 - let is_json = match content - .iter() - .find(|&&b| b != b' ' && b != b'\r' && b != b'\n' && b != b'\t') - { - Some(first) => *first == b'{', - None => false, - }; - match is_json { - true => serde_json::from_slice::(content).map_err(Into::into), - false => toml::from_str::(&String::from_utf8_lossy(content)).map_err(Into::into), - } +pub fn from_slice(content: &[u8]) -> anyhow::Result { + // read first non-space u8 + let is_json = match content + .iter() + .find(|&&b| b != b' ' && b != b'\r' && b != b'\n' && b != b'\t') + { + Some(first) => *first == b'{', + None => false, + }; + match is_json { + true => serde_json::from_slice::(content).map_err(Into::into), + false => toml::from_str::(&String::from_utf8_lossy(content)).map_err(Into::into), } } diff --git a/monolake/src/main.rs b/monolake/src/main.rs index aadbcf9..3ad74ce 100644 --- a/monolake/src/main.rs +++ b/monolake/src/main.rs @@ -1,16 +1,20 @@ +use std::path::Path; +use std::rc::Rc; use std::sync::Arc; use anyhow::Result; use clap::Parser; use monolake_core::{ - config::{RuntimeType, ServiceConfig}, + config::RuntimeType, listener::ListenerBuilder, - orchestrator::{ServiceCommand, WorkerManager}, + orchestrator::WorkerManager, }; use service_async::AsyncMakeServiceWrapper; use tracing_subscriber::{filter::LevelFilter, fmt, prelude::*, EnvFilter}; - -use crate::{config::Config, factory::l7_factory, util::print_logo}; +use monolake_core::config::RuntimeConfig; +use crate::{factory::l7_factory, util::print_logo}; +use crate::config::load_runtime_config; +use crate::config::manager::StaticFileConfigManager; mod config; mod context; @@ -39,35 +43,35 @@ fn main() -> Result<()> { print_logo(); let args = Args::parse(); - let mut config = Config::load(args.config)?; + let mut runtime_config = load_runtime_config(&args.config)?; #[cfg(target_os = "linux")] if matches!(config.runtime.runtime_type, RuntimeType::IoUring) && !monoio::utils::detect_uring() { - config.runtime.runtime_type = RuntimeType::Legacy; + runtime_config.runtime_type = RuntimeType::Legacy; } - match config.runtime.runtime_type { + match runtime_config.runtime_type { #[cfg(target_os = "linux")] monolake_core::config::RuntimeType::IoUring => { monoio::RuntimeBuilder::::new() .enable_timer() .build() .expect("Failed building the Runtime with IoUringDriver") - .block_on(run(config)); + .block_on(run(runtime_config, &args.config)); } monolake_core::config::RuntimeType::Legacy => { monoio::RuntimeBuilder::::new() .enable_timer() .build() .expect("Failed building the Runtime with LegacyDriver") - .block_on(run(config)); + .block_on(run(runtime_config, &args.config)); } } Ok(()) } -async fn run(config: Config) { +async fn run(runtime_config: RuntimeConfig, server_config_path: impl AsRef) { // Start workers - let mut manager = WorkerManager::new(config.runtime); + let mut manager = WorkerManager::new(runtime_config); let join_handlers = manager.spawn_workers_async(); tracing::info!( "Start monolake with {:?} runtime, {} worker(s), {} entries and sqpoll {:?}.", @@ -77,25 +81,15 @@ async fn run(config: Config) { manager.config().sqpoll_idle ); - // Construct Service Factory and Listener Factory - for (name, ServiceConfig { listener, server }) in config.servers.into_iter() { - let lis_fac = ListenerBuilder::try_from(listener).expect("build listener failed"); - - let svc_fac = l7_factory(server); - - manager - .dispatch_service_command(ServiceCommand::PrepareAndCommit( - Arc::new(name), - AsyncMakeServiceWrapper(svc_fac), - AsyncMakeServiceWrapper(Arc::new(lis_fac)), - )) - .await - .err() - .expect("apply init config failed"); - } + // Create config manager + let config_manager = Rc::new(StaticFileConfigManager::new( + manager, + |config| AsyncMakeServiceWrapper(Arc::new(ListenerBuilder::try_from(config).expect("build listener failed"))), + |config| AsyncMakeServiceWrapper(l7_factory(config)), + )); + config_manager.load_and_watch(&server_config_path).await.expect("apply init config failed"); tracing::info!("init config broadcast successfully"); - // TODO(ihciah): run update task or api server to do config update, maybe in xDS protocol // Wait for workers for (_, mut close) in join_handlers.into_iter() { close.cancellation().await;