Skip to content

Commit

Permalink
remove TaskSchedulingPolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jan 10, 2024
1 parent 7ff7280 commit c10a0df
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 80 deletions.
21 changes: 0 additions & 21 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,27 +294,6 @@ impl BallistaConfig {
}
}

// an enum used to configure the scheduler policy
// needs to be visible to code generated by configure_me
#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
pub enum TaskSchedulingPolicy {
PushStaged,
}

impl std::str::FromStr for TaskSchedulingPolicy {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
ArgEnum::from_str(s, true)
}
}

impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy {
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
write!(writer, "The scheduler policy for the scheduler")
}
}

// an enum used to configure the log rolling policy
// needs to be visible to code generated by configure_me
#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
Expand Down
7 changes: 0 additions & 7 deletions ballista/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,6 @@ type = "usize"
default = "0" # defaults to all available cores if left as zero
doc = "Max concurrent tasks."

[[param]]
abbr = "s"
name = "task_scheduling_policy"
type = "ballista_core::config::TaskSchedulingPolicy"
doc = "The task scheduing policy for the scheduler, possible values: pull-staged, push-staged. Default: pull-staged"
default = "ballista_core::config::TaskSchedulingPolicy::PushStaged"

[[param]]
name = "job_data_clean_up_interval_seconds"
type = "u64"
Expand Down
1 change: 0 additions & 1 deletion ballista/executor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ async fn main() -> Result<()> {
scheduler_port: opt.scheduler_port,
scheduler_connect_timeout_seconds: opt.scheduler_connect_timeout_seconds,
concurrent_tasks: opt.concurrent_tasks,
task_scheduling_policy: opt.task_scheduling_policy,
work_dir: opt.work_dir,
log_dir: opt.log_dir,
log_file_name_prefix,
Expand Down
32 changes: 13 additions & 19 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use uuid::Uuid;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};

use ballista_core::config::{LogRotationPolicy, TaskSchedulingPolicy};
use ballista_core::config::LogRotationPolicy;
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::executor_resource::Resource;
use ballista_core::serde::protobuf::executor_status::Status;
Expand Down Expand Up @@ -74,7 +74,6 @@ pub struct ExecutorProcessConfig {
pub scheduler_port: u16,
pub scheduler_connect_timeout_seconds: u16,
pub concurrent_tasks: usize,
pub task_scheduling_policy: TaskSchedulingPolicy,
pub log_dir: Option<String>,
pub work_dir: Option<String>,
pub special_mod_log_level: String,
Expand Down Expand Up @@ -242,7 +241,6 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
let default_codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
BallistaCodec::default();

let scheduler_policy = opt.task_scheduling_policy;
let job_data_ttl_seconds = opt.job_data_ttl_seconds;

// Graceful shutdown notification
Expand Down Expand Up @@ -284,22 +282,18 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
// Channels used to receive stop requests from Executor grpc service.
let (stop_send, mut stop_recv) = mpsc::channel::<bool>(10);

match scheduler_policy {
TaskSchedulingPolicy::PushStaged => {
service_handlers.push(
//If there is executor registration error during startup, return the error and stop early.
executor_server::startup(
scheduler.clone(),
opt.clone(),
executor.clone(),
default_codec,
stop_send,
&shutdown_noti,
)
.await?,
);
}
};
service_handlers.push(
//If there is executor registration error during startup, return the error and stop early.
executor_server::startup(
scheduler.clone(),
opt.clone(),
executor.clone(),
default_codec,
stop_send,
&shutdown_noti,
)
.await?,
);
service_handlers.push(tokio::spawn(flight_server_run(
addr,
shutdown_noti.subscribe_for_shutdown(),
Expand Down
7 changes: 0 additions & 7 deletions ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,6 @@ type = "u16"
default = "50050"
doc = "bind port. Default: 50050"

[[param]]
abbr = "s"
name = "scheduler_policy"
type = "ballista_core::config::TaskSchedulingPolicy"
doc = "The scheduing policy for the scheduler, possible values: pull-staged, push-staged. Default: pull-staged"
default = "ballista_core::config::TaskSchedulingPolicy::PushStaged"

[[param]]
name = "event_loop_buffer_size"
type = "u32"
Expand Down
1 change: 0 additions & 1 deletion ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ async fn main() -> Result<()> {
namespace: opt.namespace,
external_host: opt.external_host,
bind_port: opt.bind_port,
scheduling_policy: opt.scheduler_policy,
event_loop_buffer_size: opt.event_loop_buffer_size,
task_distribution,
finished_job_data_clean_up_interval_seconds: opt
Expand Down
12 changes: 2 additions & 10 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

//! Ballista scheduler specific configuration
use ballista_core::config::TaskSchedulingPolicy;
use clap::ArgEnum;
use std::fmt;

Expand All @@ -32,8 +31,6 @@ pub struct SchedulerConfig {
pub external_host: String,
/// The bind port for the scheduler's gRPC service
pub bind_port: u16,
/// The task scheduling policy for the scheduler
pub scheduling_policy: TaskSchedulingPolicy,
/// The event loop buffer size. for a system of high throughput, a larger value like 1000000 is recommended
pub event_loop_buffer_size: u32,
/// Policy of distributing tasks to available executor slots. For a cluster with single scheduler, round-robin is recommended
Expand Down Expand Up @@ -70,7 +67,6 @@ impl Default for SchedulerConfig {
namespace: String::default(),
external_host: "localhost".to_string(),
bind_port: 50050,
scheduling_policy: TaskSchedulingPolicy::PushStaged,
event_loop_buffer_size: 10000,
task_distribution: TaskDistributionPolicy::Bias,
finished_job_data_clean_up_interval_seconds: 300,
Expand All @@ -94,7 +90,8 @@ impl SchedulerConfig {
}

pub fn is_push_staged_scheduling(&self) -> bool {
matches!(self.scheduling_policy, TaskSchedulingPolicy::PushStaged)
// TODO lwz
true
}

pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
Expand All @@ -112,11 +109,6 @@ impl SchedulerConfig {
self
}

pub fn with_scheduler_policy(mut self, policy: TaskSchedulingPolicy) -> Self {
self.scheduling_policy = policy;
self
}

pub fn with_event_loop_buffer_size(mut self, buffer_size: u32) -> Self {
self.event_loop_buffer_size = buffer_size;
self
Expand Down
5 changes: 1 addition & 4 deletions ballista/scheduler/src/scheduler_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ pub async fn start_server(
BALLISTA_VERSION, addr
);
// Should only call SchedulerServer::new() once in the process
info!(
"Starting Scheduler grpc server with task scheduling policy of {:?}",
config.scheduling_policy
);
info!("Starting Scheduler grpc server with push task scheduling policy",);

let metrics_collector = default_metrics_collector()?;

Expand Down
10 changes: 3 additions & 7 deletions ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ mod test {

use datafusion::test_util::scan_empty;

use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::error::Result;

use crate::config::SchedulerConfig;
Expand All @@ -364,8 +363,7 @@ mod test {
let metrics_collector = Arc::new(TestMetricsCollector::default());

let mut test = SchedulerTest::new(
SchedulerConfig::default()
.with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
SchedulerConfig::default(),
metrics_collector.clone(),
4,
1,
Expand Down Expand Up @@ -439,8 +437,7 @@ mod test {
let metrics_collector = Arc::new(TestMetricsCollector::default());

let mut test = SchedulerTest::new(
SchedulerConfig::default()
.with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
SchedulerConfig::default(),
metrics_collector.clone(),
4,
1,
Expand Down Expand Up @@ -474,8 +471,7 @@ mod test {
async fn test_planning_failure() -> Result<()> {
let metrics_collector = Arc::new(TestMetricsCollector::default());
let mut test = SchedulerTest::new(
SchedulerConfig::default()
.with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
SchedulerConfig::default(),
metrics_collector.clone(),
4,
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
mod tests {
use crate::config::SchedulerConfig;
use crate::test_utils::{await_condition, SchedulerTest, TestMetricsCollector};
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::error::Result;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_expr::{col, sum, LogicalPlan};
Expand All @@ -371,8 +370,7 @@ mod tests {
let metrics_collector = Arc::new(TestMetricsCollector::default());

let mut test = SchedulerTest::new(
SchedulerConfig::default()
.with_scheduler_policy(TaskSchedulingPolicy::PushStaged),
SchedulerConfig::default(),
metrics_collector.clone(),
1,
1,
Expand Down

0 comments on commit c10a0df

Please sign in to comment.