diff --git a/monolake/src/config/manager.rs b/monolake/src/config/manager.rs new file mode 100644 index 0000000..1e25d0f --- /dev/null +++ b/monolake/src/config/manager.rs @@ -0,0 +1,233 @@ +use std::{ + cell::RefCell, + collections::{HashMap, HashSet}, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; + +use monoio::spawn; +use monolake_core::{ + config::ServiceConfig, + orchestrator::{ServiceCommand, WorkerManager}, +}; +use service_async::AsyncMakeService; + +use crate::config::{Config, ListenerConfig, ServerConfig}; + +type ServiceConfigMap = HashMap>; + +pub struct StaticFileConfigManager +where + FP: Fn(ServerConfig) -> F, + LFP: Fn(ListenerConfig) -> LF, +{ + online_config_content: RefCell>, + online_services: RefCell, + worker_manager: WorkerManager, + listener_factory_provider: LFP, + server_factory_provider: FP, +} + +impl StaticFileConfigManager +where + F: Send + Clone + 'static, + LF: Send + Clone + 'static, + FP: 'static, + LFP: 'static, + F: AsyncMakeService, + FP: Fn(ServerConfig) -> F, + LFP: Fn(ListenerConfig) -> LF, +{ + pub fn new( + worker_manager: WorkerManager, + listener_factory_provider: LFP, + server_factory_provider: FP, + ) -> Self { + Self { + online_config_content: Default::default(), + online_services: Default::default(), + worker_manager, + listener_factory_provider, + server_factory_provider, + } + } + + pub async fn load_and_watch(mut self, path: impl AsRef) -> anyhow::Result<()> { + self.reload_file(&path).await?; + self.watch(path.as_ref().to_path_buf()).await; + Ok(()) + } + + async fn reload_file(&mut self, path: impl AsRef) -> anyhow::Result<()> { + let latest_content = monolake_core::util::file_read(path).await?; + if self.online_config_content.borrow().eq(&latest_content) { + return Ok(()); + } + + tracing::info!("config change detected, reloading"); + let new_services = Config::parse_service_config(&latest_content)?; + self.reload_services(&new_services).await?; + + tracing::info!("config reload success"); + self.online_config_content.replace(latest_content); + self.online_services.replace(new_services); + Ok(()) + } + + async fn reload_services(&mut self, new_services: &ServiceConfigMap) -> anyhow::Result<()> { + let patches = Self::diff(&self.online_services.borrow(), new_services); + match self.prepare(&patches).await { + Ok(_) => { + self.commit(&patches) + .await + .expect("config reload failed at commit stage"); + Ok(()) + } + Err(e) => { + tracing::error!("config reload failed at prepare stage: {}, aborting", e); + self.abort(&patches) + .await + .expect("abort config reload failed"); + Err(e) + } + } + } + + fn diff(old_services: &ServiceConfigMap, new_services: &ServiceConfigMap) -> Vec { + let mut patches = Vec::new(); + + let old_keys = old_services.keys().collect::>(); + let new_keys = new_services.keys().collect::>(); + let all_keys = old_keys.union(&new_keys).collect::>(); + for key in all_keys { + let patch = match (old_keys.contains(key), new_keys.contains(key)) { + (true, true) => { + // TODO: Skip keys whose configuration didn't change + let new_config = new_services.get(*key).unwrap(); + Patch::Update { + key: key.to_string(), + server_config: new_config.server.clone(), + } + } + (true, false) => Patch::Delete { + key: key.to_string(), + }, + (false, true) => { + let new_config = new_services.get(*key).unwrap(); + Patch::Insert { + key: key.to_string(), + listener_config: new_config.listener.clone(), + server_config: new_config.server.clone(), + } + } + (false, false) => { + panic!("unexpected error: illegal key {}", key); + } + }; + patches.push(patch); + } + patches + } + + async fn prepare(&mut self, patches: &[Patch]) -> anyhow::Result<()> { + for patch in patches { + match patch { + Patch::Insert { + key, server_config, .. + } + | Patch::Update { + key, server_config, .. + } => { + self.worker_manager + .dispatch_service_command(ServiceCommand::Precommit( + Arc::new(key.to_string()), + (self.server_factory_provider)(server_config.clone()), + )) + .await + .err()?; + } + Patch::Delete { .. } => { + // nothing to do at prepare stage + } + } + } + Ok(()) + } + + async fn commit(&mut self, patches: &[Patch]) -> anyhow::Result<()> { + for patch in patches { + match patch { + Patch::Insert { + key, + listener_config, + .. + } => { + self.worker_manager + .dispatch_service_command(ServiceCommand::Commit( + Arc::new(key.to_string()), + (self.listener_factory_provider)(listener_config.clone()), + )) + .await + .err()?; + } + Patch::Update { key, .. } => { + self.worker_manager + .dispatch_service_command(ServiceCommand::Update(Arc::new(key.to_string()))) + .await + .err()?; + } + Patch::Delete { key } => { + self.worker_manager + .dispatch_service_command(ServiceCommand::Remove(Arc::new(key.to_string()))) + .await + .err()?; + } + } + } + Ok(()) + } + + async fn abort(&mut self, patches: &[Patch]) -> anyhow::Result<()> { + for patch in patches { + match patch { + Patch::Insert { key, .. } | Patch::Update { key, .. } => { + self.worker_manager + .dispatch_service_command(ServiceCommand::Abort(Arc::new(key.to_string()))) + .await; // discard errors due to partial pre-commits + } + Patch::Delete { .. } => { + // nothing to do at abort stage + } + } + } + Ok(()) + } + + async fn watch(mut self, path: PathBuf) { + spawn(async move { + loop { + if let Err(e) = self.reload_file(&path).await { + tracing::error!("reload config failed: {}", e); + } + monoio::time::sleep(Duration::from_secs(1)).await; + } + }) + .await; + } +} + +enum Patch { + Insert { + key: String, + listener_config: ListenerConfig, + server_config: ServerConfig, + }, + Update { + key: String, + server_config: ServerConfig, // ListenerConfig dynamic update not supported yet + }, + Delete { + key: String, + }, +} diff --git a/monolake/src/config/mod.rs b/monolake/src/config/mod.rs index 7c75cec..541e845 100644 --- a/monolake/src/config/mod.rs +++ b/monolake/src/config/mod.rs @@ -14,7 +14,9 @@ use monolake_services::{ use serde::{de::DeserializeOwned, Deserialize, Serialize}; mod extractor; +pub mod manager; +#[allow(unused)] #[derive(Debug, Clone)] pub struct Config { pub runtime: RuntimeConfig, @@ -133,103 +135,133 @@ impl TryFrom for ListenerBuilder { } impl Config { + #[allow(unused)] pub fn load(path: impl AsRef) -> anyhow::Result { - #[derive(Debug, Clone, Serialize, Deserialize)] - pub struct UserConfig { + #[derive(Deserialize)] + struct UserConfig { #[serde(default)] - pub runtime: RuntimeConfig, - pub servers: HashMap>, + runtime: RuntimeConfig, + servers: HashMap>, } // 1. load from file -> UserConfig let file_context = monolake_core::util::file_read_sync(path)?; - let user_config = Self::from_slice::(&file_context)?; + let user_config = parse_from_slice::(&file_context)?; // 2. UserConfig -> Config let UserConfig { runtime, servers } = user_config; - let mut servers_new = HashMap::with_capacity(servers.len()); - for (key, server) in servers.into_iter() { - let ServiceConfig { listener, server } = server; - #[cfg(feature = "tls")] - let tls = match server.tls { - Some(inner) => { - let chain = monolake_core::util::file_read_sync(&inner.chain)?; - let key = monolake_core::util::file_read_sync(&inner.key)?; - match inner.stack { - TlsStack::Rustls => { - monolake_services::tls::TlsConfig::Rustls((chain, key)).try_into()? - } - TlsStack::NativeTls => { - monolake_services::tls::TlsConfig::Native((chain, key)).try_into()? - } - } - } - None => monolake_services::tls::TlsConfig::None, - }; - let server_http_timeout = server.http_timeout.unwrap_or_default(); - let server_thrift_timeout = server.thrift_timeout.unwrap_or_default(); - servers_new.insert( - key, - ServiceConfig { - server: ServerConfig { - name: server.name, - proxy_type: server.proxy_type, - #[cfg(feature = "tls")] - tls, - routes: server.routes, - http_server_timeout: HttpServerTimeout { - keepalive_timeout: server_http_timeout - .server_keepalive_timeout_sec - .map(Duration::from_secs), - read_header_timeout: server_http_timeout - .server_read_header_timeout_sec - .map(Duration::from_secs), - read_body_timeout: server_http_timeout - .server_read_body_timeout_sec - .map(Duration::from_secs), - }, - http_upstream_timeout: HttpUpstreamTimeout { - connect_timeout: server_http_timeout - .upstream_connect_timeout_sec - .map(Duration::from_secs), - read_timeout: server_http_timeout - .upstream_read_timeout_sec - .map(Duration::from_secs), - }, - thrift_server_timeout: ThriftServerTimeout { - keepalive_timeout: server_thrift_timeout - .server_keepalive_timeout_sec - .map(Duration::from_secs), - message_timeout: server_thrift_timeout - .server_message_timeout_sec - .map(Duration::from_secs), - }, - upstream_http_version: server.upstream_http_version, - #[cfg(feature = "openid")] - auth_config: None, - http_opt_handlers: server.http_opt_handlers, - }, - listener, - }, - ); - } + let servers_new = build_server_config(servers)?; Ok(Config { runtime, servers: servers_new, }) } - pub fn from_slice(content: &[u8]) -> anyhow::Result { - // read first non-space u8 - let is_json = match content - .iter() - .find(|&&b| b != b' ' && b != b'\r' && b != b'\n' && b != b'\t') - { - Some(first) => *first == b'{', - None => false, - }; - match is_json { - true => serde_json::from_slice::(content).map_err(Into::into), - false => toml::from_str::(&String::from_utf8_lossy(content)).map_err(Into::into), + pub fn load_runtime_config(path: impl AsRef) -> anyhow::Result { + #[derive(Deserialize)] + struct RuntimeConfigContainer { + runtime: RuntimeConfig, } + let file_content = monolake_core::util::file_read_sync(path)?; + let container = parse_from_slice::(&file_content)?; + Ok(container.runtime) + } + + pub fn parse_service_config( + file_content: &[u8], + ) -> anyhow::Result>> { + #[derive(Deserialize)] + struct UserConfigContainer { + servers: HashMap>, + } + + let container = parse_from_slice::(file_content)?; + build_server_config(container.servers) + } +} + +pub fn build_server_config( + servers: HashMap>, +) -> anyhow::Result>> { + let mut servers_new = HashMap::with_capacity(servers.len()); + for (key, server) in servers.into_iter() { + let ServiceConfig { listener, server } = server; + #[cfg(feature = "tls")] + let tls = match server.tls { + Some(inner) => { + let chain = monolake_core::util::file_read_sync(&inner.chain)?; + let key = monolake_core::util::file_read_sync(&inner.key)?; + match inner.stack { + TlsStack::Rustls => { + monolake_services::tls::TlsConfig::Rustls((chain, key)).try_into()? + } + TlsStack::NativeTls => { + monolake_services::tls::TlsConfig::Native((chain, key)).try_into()? + } + } + } + None => monolake_services::tls::TlsConfig::None, + }; + let server_http_timeout = server.http_timeout.unwrap_or_default(); + let server_thrift_timeout = server.thrift_timeout.unwrap_or_default(); + servers_new.insert( + key, + ServiceConfig { + server: ServerConfig { + name: server.name, + proxy_type: server.proxy_type, + #[cfg(feature = "tls")] + tls, + routes: server.routes, + http_server_timeout: HttpServerTimeout { + keepalive_timeout: server_http_timeout + .server_keepalive_timeout_sec + .map(Duration::from_secs), + read_header_timeout: server_http_timeout + .server_read_header_timeout_sec + .map(Duration::from_secs), + read_body_timeout: server_http_timeout + .server_read_body_timeout_sec + .map(Duration::from_secs), + }, + http_upstream_timeout: HttpUpstreamTimeout { + connect_timeout: server_http_timeout + .upstream_connect_timeout_sec + .map(Duration::from_secs), + read_timeout: server_http_timeout + .upstream_read_timeout_sec + .map(Duration::from_secs), + }, + thrift_server_timeout: ThriftServerTimeout { + keepalive_timeout: server_thrift_timeout + .server_keepalive_timeout_sec + .map(Duration::from_secs), + message_timeout: server_thrift_timeout + .server_message_timeout_sec + .map(Duration::from_secs), + }, + upstream_http_version: server.upstream_http_version, + #[cfg(feature = "openid")] + auth_config: None, + http_opt_handlers: server.http_opt_handlers, + }, + listener, + }, + ); + } + Ok(servers_new) +} + +pub fn parse_from_slice(content: &[u8]) -> anyhow::Result { + // read first non-space u8 + let is_json = match content + .iter() + .find(|&&b| b != b' ' && b != b'\r' && b != b'\n' && b != b'\t') + { + Some(first) => *first == b'{', + None => false, + }; + match is_json { + true => serde_json::from_slice::(content).map_err(Into::into), + false => toml::from_str::(&String::from_utf8_lossy(content)).map_err(Into::into), } } diff --git a/monolake/src/main.rs b/monolake/src/main.rs index aadbcf9..970ce66 100644 --- a/monolake/src/main.rs +++ b/monolake/src/main.rs @@ -1,16 +1,20 @@ -use std::sync::Arc; +use std::{path::Path, sync::Arc}; use anyhow::Result; use clap::Parser; use monolake_core::{ - config::{RuntimeType, ServiceConfig}, + config::{RuntimeConfig, RuntimeType}, listener::ListenerBuilder, - orchestrator::{ServiceCommand, WorkerManager}, + orchestrator::WorkerManager, }; use service_async::AsyncMakeServiceWrapper; use tracing_subscriber::{filter::LevelFilter, fmt, prelude::*, EnvFilter}; -use crate::{config::Config, factory::l7_factory, util::print_logo}; +use crate::{ + config::{manager::StaticFileConfigManager, Config}, + factory::l7_factory, + util::print_logo, +}; mod config; mod context; @@ -39,35 +43,35 @@ fn main() -> Result<()> { print_logo(); let args = Args::parse(); - let mut config = Config::load(args.config)?; + let mut runtime_config = Config::load_runtime_config(&args.config)?; #[cfg(target_os = "linux")] - if matches!(config.runtime.runtime_type, RuntimeType::IoUring) && !monoio::utils::detect_uring() + if matches!(runtime_config.runtime_type, RuntimeType::IoUring) && !monoio::utils::detect_uring() { - config.runtime.runtime_type = RuntimeType::Legacy; + runtime_config.runtime_type = RuntimeType::Legacy; } - match config.runtime.runtime_type { + match runtime_config.runtime_type { #[cfg(target_os = "linux")] monolake_core::config::RuntimeType::IoUring => { monoio::RuntimeBuilder::::new() .enable_timer() .build() .expect("Failed building the Runtime with IoUringDriver") - .block_on(run(config)); + .block_on(run(runtime_config, &args.config)); } monolake_core::config::RuntimeType::Legacy => { monoio::RuntimeBuilder::::new() .enable_timer() .build() .expect("Failed building the Runtime with LegacyDriver") - .block_on(run(config)); + .block_on(run(runtime_config, &args.config)); } } Ok(()) } -async fn run(config: Config) { +async fn run(runtime_config: RuntimeConfig, service_config_path: impl AsRef) { // Start workers - let mut manager = WorkerManager::new(config.runtime); + let mut manager = WorkerManager::new(runtime_config); let join_handlers = manager.spawn_workers_async(); tracing::info!( "Start monolake with {:?} runtime, {} worker(s), {} entries and sqpoll {:?}.", @@ -77,25 +81,22 @@ async fn run(config: Config) { manager.config().sqpoll_idle ); - // Construct Service Factory and Listener Factory - for (name, ServiceConfig { listener, server }) in config.servers.into_iter() { - let lis_fac = ListenerBuilder::try_from(listener).expect("build listener failed"); - - let svc_fac = l7_factory(server); - - manager - .dispatch_service_command(ServiceCommand::PrepareAndCommit( - Arc::new(name), - AsyncMakeServiceWrapper(svc_fac), - AsyncMakeServiceWrapper(Arc::new(lis_fac)), + // Create config manager + let config_manager = StaticFileConfigManager::new( + manager, + |config| { + AsyncMakeServiceWrapper(Arc::new( + ListenerBuilder::try_from(config).expect("build listener failed"), )) - .await - .err() - .expect("apply init config failed"); - } + }, + |config| AsyncMakeServiceWrapper(l7_factory(config)), + ); + config_manager + .load_and_watch(&service_config_path) + .await + .expect("apply init config failed"); tracing::info!("init config broadcast successfully"); - // TODO(ihciah): run update task or api server to do config update, maybe in xDS protocol // Wait for workers for (_, mut close) in join_handlers.into_iter() { close.cancellation().await;