Skip to content

Commit

Permalink
Simplify {session_,}result_with_timings types (#1126)
Browse files Browse the repository at this point in the history
* Simplify {session_,}result_with_timings types

* Add anoter minor cleanup...
  • Loading branch information
ryoqun authored May 3, 2024
1 parent 2d24030 commit f291ca4
Showing 1 changed file with 22 additions and 26 deletions.
48 changes: 22 additions & 26 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,8 @@ struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
pool: Arc<SchedulerPool<S, TH>>,
new_task_sender: Sender<NewTaskPayload>,
new_task_receiver: Receiver<NewTaskPayload>,
session_result_sender: Sender<Option<ResultWithTimings>>,
session_result_receiver: Receiver<Option<ResultWithTimings>>,
session_result_sender: Sender<ResultWithTimings>,
session_result_receiver: Receiver<ResultWithTimings>,
session_result_with_timings: Option<ResultWithTimings>,
scheduler_thread: Option<JoinHandle<()>>,
handler_threads: Vec<JoinHandle<()>>,
Expand Down Expand Up @@ -542,6 +542,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
(result, timings): &mut ResultWithTimings,
executed_task: Box<ExecutedTask>,
) {
timings.accumulate(&executed_task.result_with_timings.1);
match executed_task.result_with_timings.0 {
Ok(()) => {}
Err(error) => {
Expand All @@ -553,7 +554,6 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
*result = Err(error);
}
}
timings.accumulate(&executed_task.result_with_timings.1);
}

fn take_session_result_with_timings(&mut self) -> ResultWithTimings {
Expand Down Expand Up @@ -660,7 +660,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let (finished_idle_task_sender, finished_idle_task_receiver) =
crossbeam_channel::unbounded::<Box<ExecutedTask>>();

let mut result_with_timings = self.session_result_with_timings.take();
assert_matches!(self.session_result_with_timings, None);

// High-level flow of new tasks:
// 1. the replay stage thread send a new task.
Expand Down Expand Up @@ -727,19 +727,19 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
};
let mut result_with_timings = initialized_result_with_timings();

loop {
if let Ok(NewTaskPayload::OpenSubchannel(context)) = new_task_receiver.recv() {
// signal about new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(context, handler_count)
.unwrap();
assert_matches!(
result_with_timings.replace(initialized_result_with_timings()),
None
);
} else {
unreachable!();
match new_task_receiver.recv() {
Ok(NewTaskPayload::OpenSubchannel(context)) => {
// signal about new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(context, handler_count)
.unwrap();
}
_ => {
unreachable!();
}
}

let mut is_finished = false;
Expand Down Expand Up @@ -769,8 +769,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let executed_task = executed_task.unwrap();

state_machine.deschedule_task(&executed_task.task);
let result_with_timings = result_with_timings.as_mut().unwrap();
Self::accumulate_result_with_timings(result_with_timings, executed_task);
Self::accumulate_result_with_timings(&mut result_with_timings, executed_task);
},
recv(dummy_unblocked_task_receiver) -> dummy => {
assert_matches!(dummy, Err(RecvError));
Expand Down Expand Up @@ -801,8 +800,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let executed_task = executed_task.unwrap();

state_machine.deschedule_task(&executed_task.task);
let result_with_timings = result_with_timings.as_mut().unwrap();
Self::accumulate_result_with_timings(result_with_timings, executed_task);
Self::accumulate_result_with_timings(&mut result_with_timings, executed_task);
},
};

Expand All @@ -812,10 +810,9 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
if session_ending {
state_machine.reinitialize();
session_result_sender
.send(Some(
result_with_timings
.take()
.unwrap_or_else(initialized_result_with_timings),
.send(std::mem::replace(
&mut result_with_timings,
initialized_result_with_timings(),
))
.unwrap();
session_ending = false;
Expand Down Expand Up @@ -894,9 +891,8 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
.send(NewTaskPayload::CloseSubchannel)
.unwrap();

if let Some(result_with_timings) = self.session_result_receiver.recv().unwrap() {
self.put_session_result_with_timings(result_with_timings);
}
let result_with_timings = self.session_result_receiver.recv().unwrap();
self.put_session_result_with_timings(result_with_timings);
}

fn start_session(&mut self, context: &SchedulingContext) {
Expand Down

0 comments on commit f291ca4

Please sign in to comment.