Skip to content

Commit

Permalink
Remove ExecutorLost event
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Aug 20, 2024
1 parent c2da93f commit b28b84f
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 57 deletions.
7 changes: 0 additions & 7 deletions ballista/scheduler/src/scheduler_server/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ pub enum QueryStageSchedulerEvent {
JobCancel(String),
TaskUpdating(String, Vec<TaskStatus>),
ReviveOffers,
ExecutorLost(String, Option<String>),
CancelTasks(Vec<RunningTaskInfo>),
}

Expand Down Expand Up @@ -117,12 +116,6 @@ impl Debug for QueryStageSchedulerEvent {
QueryStageSchedulerEvent::ReviveOffers => {
write!(f, "ReviveOffers.")
}
QueryStageSchedulerEvent::ExecutorLost(executor_id, reason) => {
write!(
f,
"ExecutorLost : executor_id={executor_id}, reason:[{reason:?}]."
)
}
QueryStageSchedulerEvent::CancelTasks(status) => {
write!(f, "CancelTasks : status:[{status:?}].")
}
Expand Down
14 changes: 1 addition & 13 deletions ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use ballista_core::error::Result;
use ballista_core::event_loop::{EventLoop, EventSender};
use ballista_core::event_loop::EventLoop;
use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::BallistaCodec;

Expand Down Expand Up @@ -154,15 +154,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
/// expire the dead executors
fn expire_dead_executors(&self) -> Result<()> {
let state = self.state.clone();
let event_sender = self.query_stage_event_loop.get_sender()?;
tokio::task::spawn(async move {
loop {
let expired_executors = state.executor_manager.get_expired_executors();
for expired in expired_executors {
let executor_id = expired.executor_id.clone();

let sender_clone = event_sender.clone();

let terminating = matches!(
expired
.status
Expand Down Expand Up @@ -190,7 +187,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
// If executor is expired, remove it immediately
Self::remove_executor(
state.executor_manager.clone(),
sender_clone,
&executor_id,
Some(stop_reason.clone()),
0,
Expand All @@ -207,7 +203,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T

pub(crate) fn remove_executor(
executor_manager: ExecutorManager,
event_sender: EventSender<QueryStageSchedulerEvent>,
executor_id: &str,
reason: Option<String>,
wait_secs: u64,
Expand All @@ -224,13 +219,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
{
error!("error removing executor {executor_id}: {e:?}");
}

if let Err(e) = event_sender
.post_event(QueryStageSchedulerEvent::ExecutorLost(executor_id, reason))
.await
{
error!("error sending ExecutorLost event: {e:?}");
}
});
}

Expand Down
21 changes: 0 additions & 21 deletions ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,27 +227,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> EventAction<Query
QueryStageSchedulerEvent::ReviveOffers => {
self.state.revive_offers(event_sender).await?;
}
QueryStageSchedulerEvent::ExecutorLost(executor_id, _) => {
match self.state.task_manager.executor_lost(&executor_id).await {
Ok(tasks) => {
if !tasks.is_empty() {
if let Err(e) = self
.state
.executor_manager
.cancel_running_tasks(tasks)
.await
{
warn!("Fail to cancel running tasks due to {:?}", e);
}
}
}
Err(e) => {
let msg =
format!("TaskManager error to handle Executor {executor_id} lost: {e}");
error!("{}", msg);
}
}
}
QueryStageSchedulerEvent::CancelTasks(tasks) => {
if let Err(e) = self
.state
Expand Down
16 changes: 0 additions & 16 deletions ballista/scheduler/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
{
warn!("Fail to remove executor {}: {}", executor_id, e);
}

match self.task_manager.executor_lost(executor_id).await {
Ok(tasks) => {
if !tasks.is_empty() {
if let Err(e) = self.executor_manager.cancel_running_tasks(tasks).await {
warn!("Fail to cancel running tasks due to {:?}", e);
}
}
}
Err(e) => {
error!(
"TaskManager error to handle Executor {} lost: {}",
executor_id, e
);
}
}
}

/// Given a vector of bound tasks,
Expand Down

0 comments on commit b28b84f

Please sign in to comment.