From 4ba8aa827f1948c84b9e48001734b3096b8024bd Mon Sep 17 00:00:00 2001 From: Linwei Zhang Date: Wed, 10 Jan 2024 10:52:50 +0800 Subject: [PATCH] remove is_push_staged_scheduling method --- ballista/scheduler/src/config.rs | 5 ---- .../scheduler/src/scheduler_server/mod.rs | 10 ++----- .../scheduler_server/query_stage_scheduler.rs | 26 +++++++------------ 3 files changed, 12 insertions(+), 29 deletions(-) diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs index 5e579760..e43bd6d1 100644 --- a/ballista/scheduler/src/config.rs +++ b/ballista/scheduler/src/config.rs @@ -89,11 +89,6 @@ impl SchedulerConfig { format!("{}:{}", self.external_host, self.bind_port) } - pub fn is_push_staged_scheduling(&self) -> bool { - // TODO lwz - true - } - pub fn with_namespace(mut self, namespace: impl Into) -> Self { self.namespace = namespace.into(); self diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index d7f7b0f9..cd14c14f 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -178,9 +178,7 @@ impl SchedulerServer, ) -> Result<()> { // We might receive buggy task updates from dead executors. - if self.state.config.is_push_staged_scheduling() - && self.state.executor_manager.is_dead_executor(executor_id) - { + if self.state.executor_manager.is_dead_executor(executor_id) { let error_msg = format!( "Receive buggy tasks status from dead Executor {executor_id}, task status update ignored." ); @@ -306,11 +304,7 @@ impl SchedulerServer info!("Job {} submitted", job_id); - if self.state.config.is_push_staged_scheduling() { - event_sender - .post_event(QueryStageSchedulerEvent::ReviveOffers) - .await?; - } + event_sender + .post_event(QueryStageSchedulerEvent::ReviveOffers) + .await?; } QueryStageSchedulerEvent::JobPlanningFailed { job_id, @@ -257,23 +255,19 @@ impl ); let num_status = tasks_status.len(); - if self.state.config.is_push_staged_scheduling() { - self.state - .executor_manager - .unbind_tasks(vec![(executor_id.clone(), num_status as u32)]) - .await?; - } + self.state + .executor_manager + .unbind_tasks(vec![(executor_id.clone(), num_status as u32)]) + .await?; match self .state .update_task_statuses(&executor_id, tasks_status) .await { Ok(stage_events) => { - if self.state.config.is_push_staged_scheduling() { - event_sender - .post_event(QueryStageSchedulerEvent::ReviveOffers) - .await?; - } + event_sender + .post_event(QueryStageSchedulerEvent::ReviveOffers) + .await?; for stage_event in stage_events { event_sender.post_event(stage_event).await?;