Skip to content

Commit

Permalink
delete unnecessary config
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jan 22, 2024
1 parent c10de07 commit 5e9c4a4
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 167 deletions.
26 changes: 0 additions & 26 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

//! Ballista configuration
use clap::ArgEnum;
use core::fmt;
use std::collections::HashMap;
use std::result;

Expand Down Expand Up @@ -294,30 +292,6 @@ impl BallistaConfig {
}
}

// an enum used to configure the log rolling policy
// needs to be visible to code generated by configure_me
#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
pub enum LogRotationPolicy {
Minutely,
Hourly,
Daily,
Never,
}

impl std::str::FromStr for LogRotationPolicy {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
ArgEnum::from_str(s, true)
}
}

impl parse_arg::ParseArgFromStr for LogRotationPolicy {
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
write!(writer, "The log rotation policy")
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 0 additions & 4 deletions ballista/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
#![doc = include_str!("../README.md")]
pub const BALLISTA_VERSION: &str = env!("CARGO_PKG_VERSION");

pub fn print_version() {
println!("Ballista version: {BALLISTA_VERSION}")
}

pub mod client;
pub mod config;
pub mod error;
Expand Down
6 changes: 0 additions & 6 deletions ballista/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,6 @@ type = "String"
doc = "special log level for sub mod. link: https://docs.rs/env_logger/latest/env_logger/#enabling-logging. For example we want whole level is INFO but datafusion mode is DEBUG"
default = "std::string::String::from(\"INFO,datafusion=INFO\")"

[[param]]
name = "log_rotation_policy"
type = "ballista_core::config::LogRotationPolicy"
doc = "Tracing log rotation policy, possible values: minutely, hourly, daily, never. Default: daily"
default = "ballista_core::config::LogRotationPolicy::Daily"

[[param]]
name = "grpc_server_max_decoding_message_size"
type = "u32"
Expand Down
19 changes: 0 additions & 19 deletions ballista/executor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use anyhow::Result;
use std::sync::Arc;

use ballista_core::print_version;
use ballista_executor::executor_process::{start_executor_process, ExecutorProcessConfig};
use config::prelude::*;

Expand All @@ -44,19 +43,6 @@ async fn main() -> Result<()> {
let (opt, _remaining_args) =
Config::including_optional_config_files(&["/etc/ballista/executor.toml"]).unwrap_or_exit();

if opt.version {
print_version();
std::process::exit(0);
}

let log_file_name_prefix = format!(
"executor_{}_{}",
opt.external_host
.clone()
.unwrap_or_else(|| "localhost".to_string()),
opt.bind_port
);

let config = ExecutorProcessConfig {
special_mod_log_level: opt.log_level_setting,
external_host: opt.external_host,
Expand All @@ -68,16 +54,11 @@ async fn main() -> Result<()> {
scheduler_connect_timeout_seconds: opt.scheduler_connect_timeout_seconds,
concurrent_tasks: opt.concurrent_tasks,
work_dir: opt.work_dir,
log_dir: opt.log_dir,
log_file_name_prefix,
log_rotation_policy: opt.log_rotation_policy,
print_thread_info: opt.print_thread_info,
job_data_ttl_seconds: opt.job_data_ttl_seconds,
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
grpc_server_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
execution_engine: None,
};

start_executor_process(Arc::new(config)).await
Expand Down
5 changes: 1 addition & 4 deletions ballista/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl Executor {
runtime_with_data_cache: Option<Arc<RuntimeEnv>>,
metrics_collector: Arc<dyn ExecutorMetricsCollector>,
concurrent_tasks: usize,
execution_engine: Option<Arc<DatafusionExecutionEngine>>,
) -> Self {
Self {
metadata,
Expand All @@ -116,8 +115,7 @@ impl Executor {
metrics_collector,
concurrent_tasks,
abort_handles: Default::default(),
execution_engine: execution_engine
.unwrap_or_else(|| Arc::new(DatafusionExecutionEngine {})),
execution_engine: Arc::new(DatafusionExecutionEngine {}),
}
}
}
Expand Down Expand Up @@ -333,7 +331,6 @@ mod test {
None,
Arc::new(LoggingMetricsCollector {}),
2,
None,
);

let (sender, receiver) = tokio::sync::oneshot::channel();
Expand Down
51 changes: 8 additions & 43 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use uuid::Uuid;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};

use ballista_core::config::LogRotationPolicy;
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::executor_resource::Resource;
use ballista_core::serde::protobuf::executor_status::Status;
Expand All @@ -53,7 +52,6 @@ use ballista_core::serde::BallistaCodec;
use ballista_core::utils::{create_grpc_client_connection, create_grpc_server, get_time_before};
use ballista_core::BALLISTA_VERSION;

use crate::execution_engine::DatafusionExecutionEngine;
use crate::executor::{Executor, TasksDrainedFuture};
use crate::executor_server;
use crate::executor_server::TERMINATING;
Expand All @@ -72,60 +70,28 @@ pub struct ExecutorProcessConfig {
pub scheduler_port: u16,
pub scheduler_connect_timeout_seconds: u16,
pub concurrent_tasks: usize,
pub log_dir: Option<String>,
pub work_dir: Option<String>,
pub special_mod_log_level: String,
pub print_thread_info: bool,
pub log_file_name_prefix: String,
pub log_rotation_policy: LogRotationPolicy,
pub job_data_ttl_seconds: u64,
pub job_data_clean_up_interval_seconds: u64,
/// The maximum size of a decoded message at the grpc server side.
pub grpc_server_max_decoding_message_size: u32,
/// The maximum size of an encoded message at the grpc server side.
pub grpc_server_max_encoding_message_size: u32,
pub executor_heartbeat_interval_seconds: u64,
/// Optional execution engine to use to execute physical plans, will default to
/// DataFusion if none is provided.
pub execution_engine: Option<Arc<DatafusionExecutionEngine>>,
}

pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<()> {
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
let log_filter = EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level.clone()));
// File layer
if let Some(log_dir) = opt.log_dir.clone() {
let log_file = match opt.log_rotation_policy {
LogRotationPolicy::Minutely => {
tracing_appender::rolling::minutely(log_dir, &opt.log_file_name_prefix)
}
LogRotationPolicy::Hourly => {
tracing_appender::rolling::hourly(log_dir, &opt.log_file_name_prefix)
}
LogRotationPolicy::Daily => {
tracing_appender::rolling::daily(log_dir, &opt.log_file_name_prefix)
}
LogRotationPolicy::Never => {
tracing_appender::rolling::never(log_dir, &opt.log_file_name_prefix)
}
};
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(opt.print_thread_info)
.with_thread_ids(opt.print_thread_info)
.with_writer(log_file)
.with_env_filter(log_filter)
.init();
} else {
// Console layer
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(opt.print_thread_info)
.with_thread_ids(opt.print_thread_info)
.with_writer(io::stdout)
.with_env_filter(log_filter)
.init();
}
// Console layer
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(true)
.with_thread_ids(true)
.with_writer(io::stdout)
.with_env_filter(log_filter)
.init();

let addr = format!("{}:{}", opt.bind_host, opt.port);
let addr = addr
Expand Down Expand Up @@ -188,7 +154,6 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
None,
metrics_collector,
concurrent_tasks,
opt.execution_engine.clone(),
));

let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
Expand Down
6 changes: 0 additions & 6 deletions ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,6 @@ type = "String"
doc = "special log level for sub mod. link: https://docs.rs/env_logger/latest/env_logger/#enabling-logging. For example we want whole level is INFO but datafusion mode is DEBUG"
default = "std::string::String::from(\"INFO,datafusion=INFO\")"

[[param]]
name = "log_rotation_policy"
type = "ballista_core::config::LogRotationPolicy"
doc = "Tracing log rotation policy, possible values: minutely, hourly, daily, never. Default: daily"
default = "ballista_core::config::LogRotationPolicy::Daily"

[[param]]
name = "job_resubmit_interval_ms"
type = "u64"
Expand Down
70 changes: 11 additions & 59 deletions ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@ use std::{env, io};
use anyhow::Result;

use crate::config::{Config, ResultExt};
use ballista_core::config::LogRotationPolicy;
use ballista_core::print_version;
use ballista_scheduler::cluster::BallistaCluster;
use ballista_scheduler::cluster::ClusterStorage;
use ballista_scheduler::config::{
ClusterStorageConfig, SchedulerConfig, TaskDistribution, TaskDistributionPolicy,
};
use ballista_scheduler::config::{ClusterStorageConfig, SchedulerConfig, TaskDistributionPolicy};
use ballista_scheduler::scheduler_process::start_server;
use tracing_subscriber::EnvFilter;

Expand All @@ -52,55 +48,16 @@ async fn main() -> Result<()> {
let (opt, _remaining_args) =
Config::including_optional_config_files(&["/etc/ballista/scheduler.toml"]).unwrap_or_exit();

if opt.version {
print_version();
std::process::exit(0);
}

let special_mod_log_level = opt.log_level_setting;
let log_dir = opt.log_dir;
let print_thread_info = opt.print_thread_info;

let log_file_name_prefix = format!(
"scheduler_{}_{}_{}",
opt.namespace, opt.external_host, opt.bind_port
);

let rust_log = env::var(EnvFilter::DEFAULT_ENV);
let log_filter = EnvFilter::new(rust_log.unwrap_or(special_mod_log_level));
// File layer
if let Some(log_dir) = log_dir {
let log_file = match opt.log_rotation_policy {
LogRotationPolicy::Minutely => {
tracing_appender::rolling::minutely(log_dir, &log_file_name_prefix)
}
LogRotationPolicy::Hourly => {
tracing_appender::rolling::hourly(log_dir, &log_file_name_prefix)
}
LogRotationPolicy::Daily => {
tracing_appender::rolling::daily(log_dir, &log_file_name_prefix)
}
LogRotationPolicy::Never => {
tracing_appender::rolling::never(log_dir, &log_file_name_prefix)
}
};
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(print_thread_info)
.with_thread_ids(print_thread_info)
.with_writer(log_file)
.with_env_filter(log_filter)
.init();
} else {
// Console layer
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(print_thread_info)
.with_thread_ids(print_thread_info)
.with_writer(io::stdout)
.with_env_filter(log_filter)
.init();
}
let log_filter = EnvFilter::new(rust_log.unwrap_or("INFO,datafusion=INFO".to_string()));
// Console layer
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(true)
.with_thread_ids(true)
.with_writer(io::stdout)
.with_env_filter(log_filter)
.init();

let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
let addr = addr.parse()?;
Expand All @@ -122,17 +79,12 @@ async fn main() -> Result<()> {
}
};

let task_distribution = match opt.task_distribution {
TaskDistribution::Bias => TaskDistributionPolicy::Bias,
TaskDistribution::RoundRobin => TaskDistributionPolicy::RoundRobin,
};

let config = SchedulerConfig {
namespace: opt.namespace,
external_host: opt.external_host,
bind_port: opt.bind_port,
event_loop_buffer_size: opt.event_loop_buffer_size,
task_distribution,
task_distribution: TaskDistributionPolicy::Bias,
finished_job_data_clean_up_interval_seconds: opt
.finished_job_data_clean_up_interval_seconds,
finished_job_state_clean_up_interval_seconds: opt
Expand Down

0 comments on commit 5e9c4a4

Please sign in to comment.