diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs deleted file mode 100644 index b851bd5c..00000000 --- a/ballista/core/src/config.rs +++ /dev/null @@ -1,307 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// - -//! Ballista configuration - -use std::collections::HashMap; -use std::result; - -use crate::error::{BallistaError, Result}; - -use datafusion::arrow::datatypes::DataType; - -pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitions"; -pub const BALLISTA_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str = - "ballista.optimizer.hash_join_single_partition_threshold"; -pub const BALLISTA_DEFAULT_BATCH_SIZE: &str = "ballista.batch.size"; -pub const BALLISTA_REPARTITION_JOINS: &str = "ballista.repartition.joins"; -pub const BALLISTA_REPARTITION_AGGREGATIONS: &str = "ballista.repartition.aggregations"; -pub const BALLISTA_REPARTITION_WINDOWS: &str = "ballista.repartition.windows"; -pub const BALLISTA_PARQUET_PRUNING: &str = "ballista.parquet.pruning"; -pub const BALLISTA_COLLECT_STATISTICS: &str = "ballista.collect_statistics"; - -pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = "ballista.with_information_schema"; - -pub type ParseResult = result::Result; - -/// Configuration option meta-data -#[derive(Debug, Clone)] -pub struct ConfigEntry { - name: String, - _description: String, - _data_type: DataType, - default_value: Option, -} - -impl ConfigEntry { - fn new( - name: String, - _description: String, - _data_type: DataType, - default_value: Option, - ) -> Self { - Self { - name, - _description, - _data_type, - default_value, - } - } -} - -/// Ballista configuration builder -pub struct BallistaConfigBuilder { - settings: HashMap, -} - -impl Default for BallistaConfigBuilder { - /// Create a new config builder - fn default() -> Self { - Self { - settings: HashMap::new(), - } - } -} - -impl BallistaConfigBuilder { - /// Create a new config with an additional setting - pub fn set(&self, k: &str, v: &str) -> Self { - let mut settings = self.settings.clone(); - settings.insert(k.to_owned(), v.to_owned()); - Self { settings } - } - - pub fn build(&self) -> Result { - BallistaConfig::with_settings(self.settings.clone()) - } -} - -/// Ballista configuration -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct BallistaConfig { - /// Settings stored in map for easy serde - settings: HashMap, -} - -impl BallistaConfig { - /// Create a default configuration - pub fn new() -> Result { - Self::with_settings(HashMap::new()) - } - - /// Create a configuration builder - pub fn builder() -> BallistaConfigBuilder { - BallistaConfigBuilder::default() - } - - /// Create a new configuration based on key-value pairs - pub fn with_settings(settings: HashMap) -> Result { - let supported_entries = BallistaConfig::valid_entries(); - for (name, entry) in &supported_entries { - if let Some(v) = settings.get(name) { - // validate that we can parse the user-supplied value - Self::parse_value(v.as_str(), entry._data_type.clone()).map_err(|e| BallistaError::General(format!("Failed to parse user-supplied value '{name}' for configuration setting '{v}': {e}")))?; - } else if let Some(v) = entry.default_value.clone() { - Self::parse_value(v.as_str(), entry._data_type.clone()).map_err(|e| BallistaError::General(format!("Failed to parse default value '{name}' for configuration setting '{v}': {e}")))?; - } else if entry.default_value.is_none() { - // optional config - } else { - return Err(BallistaError::General(format!( - "No value specified for mandatory configuration setting '{name}'" - ))); - } - } - - Ok(Self { settings }) - } - - pub fn parse_value(val: &str, data_type: DataType) -> ParseResult<()> { - match data_type { - DataType::UInt16 => { - val.to_string() - .parse::() - .map_err(|e| format!("{e:?}"))?; - } - DataType::UInt32 => { - val.to_string() - .parse::() - .map_err(|e| format!("{e:?}"))?; - } - DataType::UInt64 => { - val.to_string() - .parse::() - .map_err(|e| format!("{e:?}"))?; - } - DataType::Boolean => { - val.to_string() - .parse::() - .map_err(|e| format!("{e:?}"))?; - } - DataType::Utf8 => { - val.to_string(); - } - _ => { - return Err(format!("not support data type: {data_type}")); - } - } - - Ok(()) - } - - /// All available configuration options - pub fn valid_entries() -> HashMap { - let entries = vec![ - ConfigEntry::new(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_string(), - "Sets the default number of partitions to create when repartitioning query stages".to_string(), - DataType::UInt16, Some("16".to_string())), - ConfigEntry::new(BALLISTA_DEFAULT_BATCH_SIZE.to_string(), - "Sets the default batch size".to_string(), - DataType::UInt16, Some("8192".to_string())), - ConfigEntry::new(BALLISTA_REPARTITION_JOINS.to_string(), - "Configuration for repartition joins".to_string(), - DataType::Boolean, Some("true".to_string())), - ConfigEntry::new(BALLISTA_REPARTITION_AGGREGATIONS.to_string(), - "Configuration for repartition aggregations".to_string(), - DataType::Boolean, Some("true".to_string())), - ConfigEntry::new(BALLISTA_REPARTITION_WINDOWS.to_string(), - "Configuration for repartition windows".to_string(), - DataType::Boolean, Some("true".to_string())), - ConfigEntry::new(BALLISTA_PARQUET_PRUNING.to_string(), - "Configuration for parquet prune".to_string(), - DataType::Boolean, Some("true".to_string())), - ConfigEntry::new(BALLISTA_WITH_INFORMATION_SCHEMA.to_string(), - "Sets whether enable information_schema".to_string(), - DataType::Boolean, Some("false".to_string())), - ConfigEntry::new(BALLISTA_HASH_JOIN_SINGLE_PARTITION_THRESHOLD.to_string(), - "Sets threshold in bytes for collecting the smaller side of the hash join in memory".to_string(), - DataType::UInt64, Some((1024 * 1024).to_string())), - ConfigEntry::new(BALLISTA_COLLECT_STATISTICS.to_string(), - "Configuration for collecting statistics during scan".to_string(), - DataType::Boolean, Some("false".to_string()) - ), - ]; - entries - .iter() - .map(|e| (e.name.clone(), e.clone())) - .collect::>() - } - - pub fn settings(&self) -> &HashMap { - &self.settings - } - - pub fn default_shuffle_partitions(&self) -> usize { - self.get_usize_setting(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS) - } - - pub fn default_batch_size(&self) -> usize { - self.get_usize_setting(BALLISTA_DEFAULT_BATCH_SIZE) - } - - pub fn hash_join_single_partition_threshold(&self) -> usize { - self.get_usize_setting(BALLISTA_HASH_JOIN_SINGLE_PARTITION_THRESHOLD) - } - - pub fn repartition_joins(&self) -> bool { - self.get_bool_setting(BALLISTA_REPARTITION_JOINS) - } - - pub fn repartition_aggregations(&self) -> bool { - self.get_bool_setting(BALLISTA_REPARTITION_AGGREGATIONS) - } - - pub fn repartition_windows(&self) -> bool { - self.get_bool_setting(BALLISTA_REPARTITION_WINDOWS) - } - - pub fn parquet_pruning(&self) -> bool { - self.get_bool_setting(BALLISTA_PARQUET_PRUNING) - } - - pub fn collect_statistics(&self) -> bool { - self.get_bool_setting(BALLISTA_COLLECT_STATISTICS) - } - - pub fn default_with_information_schema(&self) -> bool { - self.get_bool_setting(BALLISTA_WITH_INFORMATION_SCHEMA) - } - - fn get_usize_setting(&self, key: &str) -> usize { - if let Some(v) = self.settings.get(key) { - // infallible because we validate all configs in the constructor - v.parse().unwrap() - } else { - let entries = Self::valid_entries(); - // infallible because we validate all configs in the constructor - let v = entries.get(key).unwrap().default_value.as_ref().unwrap(); - v.parse().unwrap() - } - } - - fn get_bool_setting(&self, key: &str) -> bool { - if let Some(v) = self.settings.get(key) { - // infallible because we validate all configs in the constructor - v.parse::().unwrap() - } else { - let entries = Self::valid_entries(); - // infallible because we validate all configs in the constructor - let v = entries.get(key).unwrap().default_value.as_ref().unwrap(); - v.parse::().unwrap() - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn default_config() -> Result<()> { - let config = BallistaConfig::new()?; - assert_eq!(16, config.default_shuffle_partitions()); - assert!(!config.default_with_information_schema()); - Ok(()) - } - - #[test] - fn custom_config() -> Result<()> { - let config = BallistaConfig::builder() - .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "123") - .set(BALLISTA_WITH_INFORMATION_SCHEMA, "true") - .build()?; - assert_eq!(123, config.default_shuffle_partitions()); - assert!(config.default_with_information_schema()); - Ok(()) - } - - #[test] - fn custom_config_invalid() -> Result<()> { - let config = BallistaConfig::builder() - .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "true") - .build(); - assert!(config.is_err()); - assert_eq!("General(\"Failed to parse user-supplied value 'ballista.shuffle.partitions' for configuration setting 'true': ParseIntError { kind: InvalidDigit }\")", format!("{:?}", config.unwrap_err())); - - let config = BallistaConfig::builder() - .set(BALLISTA_WITH_INFORMATION_SCHEMA, "123") - .build(); - assert!(config.is_err()); - assert_eq!("General(\"Failed to parse user-supplied value 'ballista.with_information_schema' for configuration setting '123': ParseBoolError\")", format!("{:?}", config.unwrap_err())); - Ok(()) - } -} diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs index 8a4c9fbb..543a990a 100644 --- a/ballista/core/src/lib.rs +++ b/ballista/core/src/lib.rs @@ -19,7 +19,6 @@ pub const BALLISTA_VERSION: &str = env!("CARGO_PKG_VERSION"); pub mod client; -pub mod config; pub mod error; pub mod event_loop; pub mod execution_plans; diff --git a/ballista/scheduler/src/cluster/kv.rs b/ballista/scheduler/src/cluster/kv.rs index c515636f..e26dd047 100644 --- a/ballista/scheduler/src/cluster/kv.rs +++ b/ballista/scheduler/src/cluster/kv.rs @@ -26,7 +26,6 @@ use crate::state::session_manager::create_datafusion_context; use crate::state::task_manager::JobInfoCache; use crate::state::{decode_into, decode_protobuf}; use async_trait::async_trait; -use ballista_core::config::BallistaConfig; use ballista_core::error::{BallistaError, Result}; use ballista_core::serde::protobuf::job_status::Status; use ballista_core::serde::protobuf::{ @@ -67,7 +66,7 @@ pub struct KeyValueState< scheduler: String, /// In-memory store of queued jobs. Map from Job ID -> queued_at timestamp queued_jobs: DashMap, - //// `SessionBuilder` for constructing `SessionContext` from stored `BallistaConfig` + /// `SessionBuilder` for constructing `SessionContext` from stored `BallistaConfig` session_builder: SessionBuilder, } @@ -534,19 +533,21 @@ impl let settings: protobuf::SessionSettings = decode_protobuf(&value)?; - let mut config_builder = BallistaConfig::builder(); - for kv_pair in &settings.configs { - config_builder = config_builder.set(&kv_pair.key, &kv_pair.value); + let mut config = HashMap::new(); + for kv_pair in settings.configs { + config.insert(kv_pair.key, kv_pair.value); } - let config = config_builder.build()?; Ok(create_datafusion_context(&config, self.session_builder)) } - async fn create_session(&self, config: &BallistaConfig) -> Result> { + async fn create_session( + &self, + config: &HashMap, + ) -> Result> { let mut settings: Vec = vec![]; - for (key, value) in config.settings() { + for (key, value) in config { settings.push(KeyValuePair { key: key.clone(), value: value.clone(), diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs index 02b9c237..2b71b7c4 100644 --- a/ballista/scheduler/src/cluster/mod.rs +++ b/ballista/scheduler/src/cluster/mod.rs @@ -25,7 +25,6 @@ use datafusion_proto::physical_plan::AsExecutionPlan; use futures::Stream; use log::{debug, info, warn}; -use ballista_core::config::BallistaConfig; use ballista_core::error::{BallistaError, Result}; use ballista_core::serde::protobuf::{ job_status, AvailableTaskSlots, ExecutorHeartbeat, JobStatus, @@ -230,7 +229,8 @@ pub trait JobState: Send + Sync { async fn get_session(&self, session_id: &str) -> Result>; /// Create a new saved session - async fn create_session(&self, config: &BallistaConfig) -> Result>; + async fn create_session(&self, config: &HashMap) + -> Result>; } pub(crate) async fn bind_task_bias( diff --git a/ballista/scheduler/src/flight_sql.rs b/ballista/scheduler/src/flight_sql.rs index 0f642b0c..ff6b59cb 100644 --- a/ballista/scheduler/src/flight_sql.rs +++ b/ballista/scheduler/src/flight_sql.rs @@ -30,6 +30,7 @@ use arrow_flight::{ use base64::Engine; use futures::Stream; use log::{debug, error, warn}; +use std::collections::HashMap; use std::convert::TryFrom; use std::pin::Pin; use std::str::FromStr; @@ -43,7 +44,6 @@ use arrow_flight::flight_service_client::FlightServiceClient; use arrow_flight::sql::ProstMessageExt; use arrow_flight::utils::batches_to_flight_data; use arrow_flight::SchemaAsIpc; -use ballista_core::config::BallistaConfig; use ballista_core::serde::protobuf; use ballista_core::serde::protobuf::action::ActionType::FetchPartition; use ballista_core::serde::protobuf::job_status; @@ -124,15 +124,11 @@ impl FlightSqlServiceImpl { } async fn create_ctx(&self) -> Result { - let config_builder = BallistaConfig::builder(); - let config = config_builder - .build() - .map_err(|e| Status::internal(format!("Error building config: {e}")))?; let ctx = self .server .state .session_manager - .create_session(&config) + .create_session(&HashMap::new()) .await .map_err(|e| Status::internal(format!("Failed to create SessionContext: {e:?}")))?; let handle = Uuid::new_v4(); diff --git a/ballista/scheduler/src/state/session_manager.rs b/ballista/scheduler/src/state/session_manager.rs index ab0a5ff0..6a7eb07e 100644 --- a/ballista/scheduler/src/state/session_manager.rs +++ b/ballista/scheduler/src/state/session_manager.rs @@ -16,9 +16,9 @@ // under the License. use crate::scheduler_server::SessionBuilder; -use ballista_core::config::BallistaConfig; use ballista_core::error::Result; use datafusion::prelude::{SessionConfig, SessionContext}; +use std::collections::HashMap; use crate::cluster::JobState; use std::sync::Arc; @@ -33,7 +33,10 @@ impl SessionManager { Self { state } } - pub async fn create_session(&self, config: &BallistaConfig) -> Result> { + pub async fn create_session( + &self, + config: &HashMap, + ) -> Result> { self.state.create_session(config).await } @@ -44,23 +47,11 @@ impl SessionManager { /// Create a DataFusion session context that is compatible with Ballista Configuration pub fn create_datafusion_context( - ballista_config: &BallistaConfig, + ballista_config: &HashMap, session_builder: SessionBuilder, ) -> Arc { - let config = SessionConfig::from_string_hash_map(ballista_config.settings().clone()).unwrap(); - let config = config - .with_target_partitions(ballista_config.default_shuffle_partitions()) - .with_batch_size(ballista_config.default_batch_size()) - .with_repartition_joins(ballista_config.repartition_joins()) - .with_repartition_aggregations(ballista_config.repartition_aggregations()) - .with_repartition_windows(ballista_config.repartition_windows()) - .with_collect_statistics(ballista_config.collect_statistics()) - .with_parquet_pruning(ballista_config.parquet_pruning()) - .set_usize( - "datafusion.optimizer.hash_join_single_partition_threshold", - ballista_config.hash_join_single_partition_threshold(), - ) - .set_bool("datafusion.optimizer.enable_round_robin_repartition", false); + let config = SessionConfig::from_string_hash_map(ballista_config.clone()).unwrap(); + let config = config.set_bool("datafusion.optimizer.enable_round_robin_repartition", false); let session_state = session_builder(config); Arc::new(SessionContext::new_with_state(session_state)) } diff --git a/ballista/scheduler/src/test_utils.rs b/ballista/scheduler/src/test_utils.rs index 842fe88c..bba586bc 100644 --- a/ballista/scheduler/src/test_utils.rs +++ b/ballista/scheduler/src/test_utils.rs @@ -30,7 +30,6 @@ use crate::scheduler_server::{timestamp_millis, SchedulerServer}; use crate::state::executor_manager::ExecutorManager; use crate::state::task_manager::TaskLauncher; -use ballista_core::config::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS}; use ballista_core::serde::protobuf::job_status::Status; use ballista_core::serde::protobuf::{ task_status, FailedTask, JobStatus, MultiTaskDefinition, ShuffleWritePartition, SuccessfulTask, @@ -368,7 +367,7 @@ impl TaskLauncher for VirtualTaskLauncher { pub struct SchedulerTest { scheduler: SchedulerServer, - ballista_config: BallistaConfig, + ballista_config: HashMap, status_receiver: Option)>>, } @@ -382,14 +381,14 @@ impl SchedulerTest { let cluster = BallistaCluster::new_from_config(&config).await?; let ballista_config = if num_executors > 0 && task_slots_per_executor > 0 { - BallistaConfig::builder() - .set( - BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, - format!("{}", num_executors * task_slots_per_executor).as_str(), - ) - .build()? + let mut config = HashMap::new(); + config.insert( + "datafusion.execution.target_partitions".to_string(), + format!("{}", num_executors * task_slots_per_executor), + ); + config } else { - BallistaConfig::builder().build()? + HashMap::new() }; let runner = runner.unwrap_or_else(|| Arc::new(default_task_runner()));