diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs index 39a472cd..1ac6dec7 100644 --- a/ballista/core/src/config.rs +++ b/ballista/core/src/config.rs @@ -18,8 +18,6 @@ //! Ballista configuration -use clap::ArgEnum; -use core::fmt; use std::collections::HashMap; use std::result; @@ -294,30 +292,6 @@ impl BallistaConfig { } } -// an enum used to configure the log rolling policy -// needs to be visible to code generated by configure_me -#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)] -pub enum LogRotationPolicy { - Minutely, - Hourly, - Daily, - Never, -} - -impl std::str::FromStr for LogRotationPolicy { - type Err = String; - - fn from_str(s: &str) -> std::result::Result { - ArgEnum::from_str(s, true) - } -} - -impl parse_arg::ParseArgFromStr for LogRotationPolicy { - fn describe_type(mut writer: W) -> fmt::Result { - write!(writer, "The log rotation policy") - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs index 94f37841..8a4c9fbb 100644 --- a/ballista/core/src/lib.rs +++ b/ballista/core/src/lib.rs @@ -18,10 +18,6 @@ #![doc = include_str!("../README.md")] pub const BALLISTA_VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn print_version() { - println!("Ballista version: {BALLISTA_VERSION}") -} - pub mod client; pub mod config; pub mod error; diff --git a/ballista/executor/executor_config_spec.toml b/ballista/executor/executor_config_spec.toml index d2272de8..bb634c33 100644 --- a/ballista/executor/executor_config_spec.toml +++ b/ballista/executor/executor_config_spec.toml @@ -113,12 +113,6 @@ type = "String" doc = "special log level for sub mod. link: https://docs.rs/env_logger/latest/env_logger/#enabling-logging. For example we want whole level is INFO but datafusion mode is DEBUG" default = "std::string::String::from(\"INFO,datafusion=INFO\")" -[[param]] -name = "log_rotation_policy" -type = "ballista_core::config::LogRotationPolicy" -doc = "Tracing log rotation policy, possible values: minutely, hourly, daily, never. Default: daily" -default = "ballista_core::config::LogRotationPolicy::Daily" - [[param]] name = "grpc_server_max_decoding_message_size" type = "u32" diff --git a/ballista/executor/src/bin/main.rs b/ballista/executor/src/bin/main.rs index e205d16b..582f3c46 100644 --- a/ballista/executor/src/bin/main.rs +++ b/ballista/executor/src/bin/main.rs @@ -20,7 +20,6 @@ use anyhow::Result; use std::sync::Arc; -use ballista_core::print_version; use ballista_executor::executor_process::{start_executor_process, ExecutorProcessConfig}; use config::prelude::*; @@ -44,19 +43,6 @@ async fn main() -> Result<()> { let (opt, _remaining_args) = Config::including_optional_config_files(&["/etc/ballista/executor.toml"]).unwrap_or_exit(); - if opt.version { - print_version(); - std::process::exit(0); - } - - let log_file_name_prefix = format!( - "executor_{}_{}", - opt.external_host - .clone() - .unwrap_or_else(|| "localhost".to_string()), - opt.bind_port - ); - let config = ExecutorProcessConfig { special_mod_log_level: opt.log_level_setting, external_host: opt.external_host, @@ -68,16 +54,11 @@ async fn main() -> Result<()> { scheduler_connect_timeout_seconds: opt.scheduler_connect_timeout_seconds, concurrent_tasks: opt.concurrent_tasks, work_dir: opt.work_dir, - log_dir: opt.log_dir, - log_file_name_prefix, - log_rotation_policy: opt.log_rotation_policy, - print_thread_info: opt.print_thread_info, job_data_ttl_seconds: opt.job_data_ttl_seconds, job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds, grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size, grpc_server_max_encoding_message_size: opt.grpc_server_max_encoding_message_size, executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds, - execution_engine: None, }; start_executor_process(Arc::new(config)).await diff --git a/ballista/executor/src/executor.rs b/ballista/executor/src/executor.rs index 4e606d6f..ffb66c47 100644 --- a/ballista/executor/src/executor.rs +++ b/ballista/executor/src/executor.rs @@ -102,7 +102,6 @@ impl Executor { runtime_with_data_cache: Option>, metrics_collector: Arc, concurrent_tasks: usize, - execution_engine: Option>, ) -> Self { Self { metadata, @@ -116,8 +115,7 @@ impl Executor { metrics_collector, concurrent_tasks, abort_handles: Default::default(), - execution_engine: execution_engine - .unwrap_or_else(|| Arc::new(DatafusionExecutionEngine {})), + execution_engine: Arc::new(DatafusionExecutionEngine {}), } } } @@ -333,7 +331,6 @@ mod test { None, Arc::new(LoggingMetricsCollector {}), 2, - None, ); let (sender, receiver) = tokio::sync::oneshot::channel(); diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index 16eac271..7a37ada7 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -40,7 +40,6 @@ use uuid::Uuid; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode}; -use ballista_core::config::LogRotationPolicy; use ballista_core::error::BallistaError; use ballista_core::serde::protobuf::executor_resource::Resource; use ballista_core::serde::protobuf::executor_status::Status; @@ -53,7 +52,6 @@ use ballista_core::serde::BallistaCodec; use ballista_core::utils::{create_grpc_client_connection, create_grpc_server, get_time_before}; use ballista_core::BALLISTA_VERSION; -use crate::execution_engine::DatafusionExecutionEngine; use crate::executor::{Executor, TasksDrainedFuture}; use crate::executor_server; use crate::executor_server::TERMINATING; @@ -72,12 +70,8 @@ pub struct ExecutorProcessConfig { pub scheduler_port: u16, pub scheduler_connect_timeout_seconds: u16, pub concurrent_tasks: usize, - pub log_dir: Option, pub work_dir: Option, pub special_mod_log_level: String, - pub print_thread_info: bool, - pub log_file_name_prefix: String, - pub log_rotation_policy: LogRotationPolicy, pub job_data_ttl_seconds: u64, pub job_data_clean_up_interval_seconds: u64, /// The maximum size of a decoded message at the grpc server side. @@ -85,47 +79,19 @@ pub struct ExecutorProcessConfig { /// The maximum size of an encoded message at the grpc server side. pub grpc_server_max_encoding_message_size: u32, pub executor_heartbeat_interval_seconds: u64, - /// Optional execution engine to use to execute physical plans, will default to - /// DataFusion if none is provided. - pub execution_engine: Option>, } pub async fn start_executor_process(opt: Arc) -> Result<()> { let rust_log = env::var(EnvFilter::DEFAULT_ENV); let log_filter = EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level.clone())); - // File layer - if let Some(log_dir) = opt.log_dir.clone() { - let log_file = match opt.log_rotation_policy { - LogRotationPolicy::Minutely => { - tracing_appender::rolling::minutely(log_dir, &opt.log_file_name_prefix) - } - LogRotationPolicy::Hourly => { - tracing_appender::rolling::hourly(log_dir, &opt.log_file_name_prefix) - } - LogRotationPolicy::Daily => { - tracing_appender::rolling::daily(log_dir, &opt.log_file_name_prefix) - } - LogRotationPolicy::Never => { - tracing_appender::rolling::never(log_dir, &opt.log_file_name_prefix) - } - }; - tracing_subscriber::fmt() - .with_ansi(false) - .with_thread_names(opt.print_thread_info) - .with_thread_ids(opt.print_thread_info) - .with_writer(log_file) - .with_env_filter(log_filter) - .init(); - } else { - // Console layer - tracing_subscriber::fmt() - .with_ansi(false) - .with_thread_names(opt.print_thread_info) - .with_thread_ids(opt.print_thread_info) - .with_writer(io::stdout) - .with_env_filter(log_filter) - .init(); - } + // Console layer + tracing_subscriber::fmt() + .with_ansi(false) + .with_thread_names(true) + .with_thread_ids(true) + .with_writer(io::stdout) + .with_env_filter(log_filter) + .init(); let addr = format!("{}:{}", opt.bind_host, opt.port); let addr = addr @@ -188,7 +154,6 @@ pub async fn start_executor_process(opt: Arc) -> Result<( None, metrics_collector, concurrent_tasks, - opt.execution_engine.clone(), )); let connect_timeout = opt.scheduler_connect_timeout_seconds as u64; diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml index a1e7409c..42a8ffcb 100644 --- a/ballista/scheduler/scheduler_config_spec.toml +++ b/ballista/scheduler/scheduler_config_spec.toml @@ -134,12 +134,6 @@ type = "String" doc = "special log level for sub mod. link: https://docs.rs/env_logger/latest/env_logger/#enabling-logging. For example we want whole level is INFO but datafusion mode is DEBUG" default = "std::string::String::from(\"INFO,datafusion=INFO\")" -[[param]] -name = "log_rotation_policy" -type = "ballista_core::config::LogRotationPolicy" -doc = "Tracing log rotation policy, possible values: minutely, hourly, daily, never. Default: daily" -default = "ballista_core::config::LogRotationPolicy::Daily" - [[param]] name = "job_resubmit_interval_ms" type = "u64" diff --git a/ballista/scheduler/src/bin/main.rs b/ballista/scheduler/src/bin/main.rs index e67e474c..7e24e531 100644 --- a/ballista/scheduler/src/bin/main.rs +++ b/ballista/scheduler/src/bin/main.rs @@ -23,13 +23,9 @@ use std::{env, io}; use anyhow::Result; use crate::config::{Config, ResultExt}; -use ballista_core::config::LogRotationPolicy; -use ballista_core::print_version; use ballista_scheduler::cluster::BallistaCluster; use ballista_scheduler::cluster::ClusterStorage; -use ballista_scheduler::config::{ - ClusterStorageConfig, SchedulerConfig, TaskDistribution, TaskDistributionPolicy, -}; +use ballista_scheduler::config::{ClusterStorageConfig, SchedulerConfig, TaskDistributionPolicy}; use ballista_scheduler::scheduler_process::start_server; use tracing_subscriber::EnvFilter; @@ -52,55 +48,16 @@ async fn main() -> Result<()> { let (opt, _remaining_args) = Config::including_optional_config_files(&["/etc/ballista/scheduler.toml"]).unwrap_or_exit(); - if opt.version { - print_version(); - std::process::exit(0); - } - - let special_mod_log_level = opt.log_level_setting; - let log_dir = opt.log_dir; - let print_thread_info = opt.print_thread_info; - - let log_file_name_prefix = format!( - "scheduler_{}_{}_{}", - opt.namespace, opt.external_host, opt.bind_port - ); - let rust_log = env::var(EnvFilter::DEFAULT_ENV); - let log_filter = EnvFilter::new(rust_log.unwrap_or(special_mod_log_level)); - // File layer - if let Some(log_dir) = log_dir { - let log_file = match opt.log_rotation_policy { - LogRotationPolicy::Minutely => { - tracing_appender::rolling::minutely(log_dir, &log_file_name_prefix) - } - LogRotationPolicy::Hourly => { - tracing_appender::rolling::hourly(log_dir, &log_file_name_prefix) - } - LogRotationPolicy::Daily => { - tracing_appender::rolling::daily(log_dir, &log_file_name_prefix) - } - LogRotationPolicy::Never => { - tracing_appender::rolling::never(log_dir, &log_file_name_prefix) - } - }; - tracing_subscriber::fmt() - .with_ansi(false) - .with_thread_names(print_thread_info) - .with_thread_ids(print_thread_info) - .with_writer(log_file) - .with_env_filter(log_filter) - .init(); - } else { - // Console layer - tracing_subscriber::fmt() - .with_ansi(false) - .with_thread_names(print_thread_info) - .with_thread_ids(print_thread_info) - .with_writer(io::stdout) - .with_env_filter(log_filter) - .init(); - } + let log_filter = EnvFilter::new(rust_log.unwrap_or("INFO,datafusion=INFO".to_string())); + // Console layer + tracing_subscriber::fmt() + .with_ansi(false) + .with_thread_names(true) + .with_thread_ids(true) + .with_writer(io::stdout) + .with_env_filter(log_filter) + .init(); let addr = format!("{}:{}", opt.bind_host, opt.bind_port); let addr = addr.parse()?; @@ -122,17 +79,12 @@ async fn main() -> Result<()> { } }; - let task_distribution = match opt.task_distribution { - TaskDistribution::Bias => TaskDistributionPolicy::Bias, - TaskDistribution::RoundRobin => TaskDistributionPolicy::RoundRobin, - }; - let config = SchedulerConfig { namespace: opt.namespace, external_host: opt.external_host, bind_port: opt.bind_port, event_loop_buffer_size: opt.event_loop_buffer_size, - task_distribution, + task_distribution: TaskDistributionPolicy::Bias, finished_job_data_clean_up_interval_seconds: opt .finished_job_data_clean_up_interval_seconds, finished_job_state_clean_up_interval_seconds: opt