diff --git a/ballista/scheduler/src/scheduler_server/event.rs b/ballista/scheduler/src/scheduler_server/event.rs index a564579f..7cac7acf 100644 --- a/ballista/scheduler/src/scheduler_server/event.rs +++ b/ballista/scheduler/src/scheduler_server/event.rs @@ -60,7 +60,6 @@ pub enum QueryStageSchedulerEvent { JobCancel(String), TaskUpdating(String, Vec), ReviveOffers, - ExecutorLost(String, Option), CancelTasks(Vec), } @@ -117,12 +116,6 @@ impl Debug for QueryStageSchedulerEvent { QueryStageSchedulerEvent::ReviveOffers => { write!(f, "ReviveOffers.") } - QueryStageSchedulerEvent::ExecutorLost(executor_id, reason) => { - write!( - f, - "ExecutorLost : executor_id={executor_id}, reason:[{reason:?}]." - ) - } QueryStageSchedulerEvent::CancelTasks(status) => { write!(f, "CancelTasks : status:[{status:?}].") } diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index cc05edb1..980b773b 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use ballista_core::error::Result; -use ballista_core::event_loop::{EventLoop, EventSender}; +use ballista_core::event_loop::EventLoop; use ballista_core::serde::protobuf::TaskStatus; use ballista_core::serde::BallistaCodec; @@ -154,15 +154,12 @@ impl SchedulerServer Result<()> { let state = self.state.clone(); - let event_sender = self.query_stage_event_loop.get_sender()?; tokio::task::spawn(async move { loop { let expired_executors = state.executor_manager.get_expired_executors(); for expired in expired_executors { let executor_id = expired.executor_id.clone(); - let sender_clone = event_sender.clone(); - let terminating = matches!( expired .status @@ -190,7 +187,6 @@ impl SchedulerServer SchedulerServer, executor_id: &str, reason: Option, wait_secs: u64, @@ -224,13 +219,6 @@ impl SchedulerServer EventAction { self.state.revive_offers(event_sender).await?; } - QueryStageSchedulerEvent::ExecutorLost(executor_id, _) => { - match self.state.task_manager.executor_lost(&executor_id).await { - Ok(tasks) => { - if !tasks.is_empty() { - if let Err(e) = self - .state - .executor_manager - .cancel_running_tasks(tasks) - .await - { - warn!("Fail to cancel running tasks due to {:?}", e); - } - } - } - Err(e) => { - let msg = - format!("TaskManager error to handle Executor {executor_id} lost: {e}"); - error!("{}", msg); - } - } - } QueryStageSchedulerEvent::CancelTasks(tasks) => { if let Err(e) = self .state diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index a7ab4d32..0406ef97 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -166,22 +166,6 @@ impl SchedulerState { - if !tasks.is_empty() { - if let Err(e) = self.executor_manager.cancel_running_tasks(tasks).await { - warn!("Fail to cancel running tasks due to {:?}", e); - } - } - } - Err(e) => { - error!( - "TaskManager error to handle Executor {} lost: {}", - executor_id, e - ); - } - } } /// Given a vector of bound tasks,