Skip to content

Commit

Permalink
remove is_push_staged_scheduling method
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jan 10, 2024
1 parent c10a0df commit 4ba8aa8
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 29 deletions.
5 changes: 0 additions & 5 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ impl SchedulerConfig {
format!("{}:{}", self.external_host, self.bind_port)
}

pub fn is_push_staged_scheduling(&self) -> bool {
// TODO lwz
true
}

pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
self.namespace = namespace.into();
self
Expand Down
10 changes: 2 additions & 8 deletions ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
tasks_status: Vec<TaskStatus>,
) -> Result<()> {
// We might receive buggy task updates from dead executors.
if self.state.config.is_push_staged_scheduling()
&& self.state.executor_manager.is_dead_executor(executor_id)
{
if self.state.executor_manager.is_dead_executor(executor_id) {
let error_msg = format!(
"Receive buggy tasks status from dead Executor {executor_id}, task status update ignored."
);
Expand Down Expand Up @@ -306,11 +304,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
.register_executor(metadata, executor_data)
.await?;

// If we are using push-based scheduling then reserve this executors slots and send
// them for scheduling tasks.
if self.state.config.is_push_staged_scheduling() {
self.revive_offers().await?;
}
self.revive_offers().await?;

Ok(())
}
Expand Down
26 changes: 10 additions & 16 deletions ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>

info!("Job {} submitted", job_id);

if self.state.config.is_push_staged_scheduling() {
event_sender
.post_event(QueryStageSchedulerEvent::ReviveOffers)
.await?;
}
event_sender
.post_event(QueryStageSchedulerEvent::ReviveOffers)
.await?;
}
QueryStageSchedulerEvent::JobPlanningFailed {
job_id,
Expand Down Expand Up @@ -257,23 +255,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
);

let num_status = tasks_status.len();
if self.state.config.is_push_staged_scheduling() {
self.state
.executor_manager
.unbind_tasks(vec![(executor_id.clone(), num_status as u32)])
.await?;
}
self.state
.executor_manager
.unbind_tasks(vec![(executor_id.clone(), num_status as u32)])
.await?;
match self
.state
.update_task_statuses(&executor_id, tasks_status)
.await
{
Ok(stage_events) => {
if self.state.config.is_push_staged_scheduling() {
event_sender
.post_event(QueryStageSchedulerEvent::ReviveOffers)
.await?;
}
event_sender
.post_event(QueryStageSchedulerEvent::ReviveOffers)
.await?;

for stage_event in stage_events {
event_sender.post_event(stage_event).await?;
Expand Down

0 comments on commit 4ba8aa8

Please sign in to comment.