From c10a0df8a7e60301bff343468d5c7bdbcb18c37f Mon Sep 17 00:00:00 2001 From: Linwei Zhang Date: Wed, 10 Jan 2024 10:39:48 +0800 Subject: [PATCH] remove TaskSchedulingPolicy --- ballista/core/src/config.rs | 21 ------------ ballista/executor/executor_config_spec.toml | 7 ---- ballista/executor/src/bin/main.rs | 1 - ballista/executor/src/executor_process.rs | 32 ++++++++----------- ballista/scheduler/scheduler_config_spec.toml | 7 ---- ballista/scheduler/src/bin/main.rs | 1 - ballista/scheduler/src/config.rs | 12 ++----- ballista/scheduler/src/scheduler_process.rs | 5 +-- .../scheduler/src/scheduler_server/mod.rs | 10 ++---- .../scheduler_server/query_stage_scheduler.rs | 4 +-- 10 files changed, 20 insertions(+), 80 deletions(-) diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs index 4be9ba2f2..39a472cd8 100644 --- a/ballista/core/src/config.rs +++ b/ballista/core/src/config.rs @@ -294,27 +294,6 @@ impl BallistaConfig { } } -// an enum used to configure the scheduler policy -// needs to be visible to code generated by configure_me -#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)] -pub enum TaskSchedulingPolicy { - PushStaged, -} - -impl std::str::FromStr for TaskSchedulingPolicy { - type Err = String; - - fn from_str(s: &str) -> std::result::Result { - ArgEnum::from_str(s, true) - } -} - -impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy { - fn describe_type(mut writer: W) -> fmt::Result { - write!(writer, "The scheduler policy for the scheduler") - } -} - // an enum used to configure the log rolling policy // needs to be visible to code generated by configure_me #[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)] diff --git a/ballista/executor/executor_config_spec.toml b/ballista/executor/executor_config_spec.toml index 2e03c27b1..d2272de80 100644 --- a/ballista/executor/executor_config_spec.toml +++ b/ballista/executor/executor_config_spec.toml @@ -78,13 +78,6 @@ type = "usize" default = "0" # defaults to all available cores if left as zero doc = "Max concurrent tasks." -[[param]] -abbr = "s" -name = "task_scheduling_policy" -type = "ballista_core::config::TaskSchedulingPolicy" -doc = "The task scheduing policy for the scheduler, possible values: pull-staged, push-staged. Default: pull-staged" -default = "ballista_core::config::TaskSchedulingPolicy::PushStaged" - [[param]] name = "job_data_clean_up_interval_seconds" type = "u64" diff --git a/ballista/executor/src/bin/main.rs b/ballista/executor/src/bin/main.rs index 203481480..5dee0c0f0 100644 --- a/ballista/executor/src/bin/main.rs +++ b/ballista/executor/src/bin/main.rs @@ -70,7 +70,6 @@ async fn main() -> Result<()> { scheduler_port: opt.scheduler_port, scheduler_connect_timeout_seconds: opt.scheduler_connect_timeout_seconds, concurrent_tasks: opt.concurrent_tasks, - task_scheduling_policy: opt.task_scheduling_policy, work_dir: opt.work_dir, log_dir: opt.log_dir, log_file_name_prefix, diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index e6ed1c09e..c481d4018 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -40,7 +40,7 @@ use uuid::Uuid; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode}; -use ballista_core::config::{LogRotationPolicy, TaskSchedulingPolicy}; +use ballista_core::config::LogRotationPolicy; use ballista_core::error::BallistaError; use ballista_core::serde::protobuf::executor_resource::Resource; use ballista_core::serde::protobuf::executor_status::Status; @@ -74,7 +74,6 @@ pub struct ExecutorProcessConfig { pub scheduler_port: u16, pub scheduler_connect_timeout_seconds: u16, pub concurrent_tasks: usize, - pub task_scheduling_policy: TaskSchedulingPolicy, pub log_dir: Option, pub work_dir: Option, pub special_mod_log_level: String, @@ -242,7 +241,6 @@ pub async fn start_executor_process(opt: Arc) -> Result<( let default_codec: BallistaCodec = BallistaCodec::default(); - let scheduler_policy = opt.task_scheduling_policy; let job_data_ttl_seconds = opt.job_data_ttl_seconds; // Graceful shutdown notification @@ -284,22 +282,18 @@ pub async fn start_executor_process(opt: Arc) -> Result<( // Channels used to receive stop requests from Executor grpc service. let (stop_send, mut stop_recv) = mpsc::channel::(10); - match scheduler_policy { - TaskSchedulingPolicy::PushStaged => { - service_handlers.push( - //If there is executor registration error during startup, return the error and stop early. - executor_server::startup( - scheduler.clone(), - opt.clone(), - executor.clone(), - default_codec, - stop_send, - &shutdown_noti, - ) - .await?, - ); - } - }; + service_handlers.push( + //If there is executor registration error during startup, return the error and stop early. + executor_server::startup( + scheduler.clone(), + opt.clone(), + executor.clone(), + default_codec, + stop_send, + &shutdown_noti, + ) + .await?, + ); service_handlers.push(tokio::spawn(flight_server_run( addr, shutdown_noti.subscribe_for_shutdown(), diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml index 4833d4071..a1e7409cf 100644 --- a/ballista/scheduler/scheduler_config_spec.toml +++ b/ballista/scheduler/scheduler_config_spec.toml @@ -69,13 +69,6 @@ type = "u16" default = "50050" doc = "bind port. Default: 50050" -[[param]] -abbr = "s" -name = "scheduler_policy" -type = "ballista_core::config::TaskSchedulingPolicy" -doc = "The scheduing policy for the scheduler, possible values: pull-staged, push-staged. Default: pull-staged" -default = "ballista_core::config::TaskSchedulingPolicy::PushStaged" - [[param]] name = "event_loop_buffer_size" type = "u32" diff --git a/ballista/scheduler/src/bin/main.rs b/ballista/scheduler/src/bin/main.rs index fce81fce0..97202c388 100644 --- a/ballista/scheduler/src/bin/main.rs +++ b/ballista/scheduler/src/bin/main.rs @@ -132,7 +132,6 @@ async fn main() -> Result<()> { namespace: opt.namespace, external_host: opt.external_host, bind_port: opt.bind_port, - scheduling_policy: opt.scheduler_policy, event_loop_buffer_size: opt.event_loop_buffer_size, task_distribution, finished_job_data_clean_up_interval_seconds: opt diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs index e3010c6de..5e579760b 100644 --- a/ballista/scheduler/src/config.rs +++ b/ballista/scheduler/src/config.rs @@ -18,7 +18,6 @@ //! Ballista scheduler specific configuration -use ballista_core::config::TaskSchedulingPolicy; use clap::ArgEnum; use std::fmt; @@ -32,8 +31,6 @@ pub struct SchedulerConfig { pub external_host: String, /// The bind port for the scheduler's gRPC service pub bind_port: u16, - /// The task scheduling policy for the scheduler - pub scheduling_policy: TaskSchedulingPolicy, /// The event loop buffer size. for a system of high throughput, a larger value like 1000000 is recommended pub event_loop_buffer_size: u32, /// Policy of distributing tasks to available executor slots. For a cluster with single scheduler, round-robin is recommended @@ -70,7 +67,6 @@ impl Default for SchedulerConfig { namespace: String::default(), external_host: "localhost".to_string(), bind_port: 50050, - scheduling_policy: TaskSchedulingPolicy::PushStaged, event_loop_buffer_size: 10000, task_distribution: TaskDistributionPolicy::Bias, finished_job_data_clean_up_interval_seconds: 300, @@ -94,7 +90,8 @@ impl SchedulerConfig { } pub fn is_push_staged_scheduling(&self) -> bool { - matches!(self.scheduling_policy, TaskSchedulingPolicy::PushStaged) + // TODO lwz + true } pub fn with_namespace(mut self, namespace: impl Into) -> Self { @@ -112,11 +109,6 @@ impl SchedulerConfig { self } - pub fn with_scheduler_policy(mut self, policy: TaskSchedulingPolicy) -> Self { - self.scheduling_policy = policy; - self - } - pub fn with_event_loop_buffer_size(mut self, buffer_size: u32) -> Self { self.event_loop_buffer_size = buffer_size; self diff --git a/ballista/scheduler/src/scheduler_process.rs b/ballista/scheduler/src/scheduler_process.rs index 93570aa27..b9406ee07 100644 --- a/ballista/scheduler/src/scheduler_process.rs +++ b/ballista/scheduler/src/scheduler_process.rs @@ -50,10 +50,7 @@ pub async fn start_server( BALLISTA_VERSION, addr ); // Should only call SchedulerServer::new() once in the process - info!( - "Starting Scheduler grpc server with task scheduling policy of {:?}", - config.scheduling_policy - ); + info!("Starting Scheduler grpc server with push task scheduling policy",); let metrics_collector = default_metrics_collector()?; diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index b7b0ce1af..d7f7b0f9f 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -339,7 +339,6 @@ mod test { use datafusion::test_util::scan_empty; - use ballista_core::config::TaskSchedulingPolicy; use ballista_core::error::Result; use crate::config::SchedulerConfig; @@ -364,8 +363,7 @@ mod test { let metrics_collector = Arc::new(TestMetricsCollector::default()); let mut test = SchedulerTest::new( - SchedulerConfig::default() - .with_scheduler_policy(TaskSchedulingPolicy::PushStaged), + SchedulerConfig::default(), metrics_collector.clone(), 4, 1, @@ -439,8 +437,7 @@ mod test { let metrics_collector = Arc::new(TestMetricsCollector::default()); let mut test = SchedulerTest::new( - SchedulerConfig::default() - .with_scheduler_policy(TaskSchedulingPolicy::PushStaged), + SchedulerConfig::default(), metrics_collector.clone(), 4, 1, @@ -474,8 +471,7 @@ mod test { async fn test_planning_failure() -> Result<()> { let metrics_collector = Arc::new(TestMetricsCollector::default()); let mut test = SchedulerTest::new( - SchedulerConfig::default() - .with_scheduler_policy(TaskSchedulingPolicy::PushStaged), + SchedulerConfig::default(), metrics_collector.clone(), 4, 1, diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs index 138f70900..86aecc619 100644 --- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs +++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs @@ -351,7 +351,6 @@ impl mod tests { use crate::config::SchedulerConfig; use crate::test_utils::{await_condition, SchedulerTest, TestMetricsCollector}; - use ballista_core::config::TaskSchedulingPolicy; use ballista_core::error::Result; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::logical_expr::{col, sum, LogicalPlan}; @@ -371,8 +370,7 @@ mod tests { let metrics_collector = Arc::new(TestMetricsCollector::default()); let mut test = SchedulerTest::new( - SchedulerConfig::default() - .with_scheduler_policy(TaskSchedulingPolicy::PushStaged), + SchedulerConfig::default(), metrics_collector.clone(), 1, 1,