From f291ca4da11ab69acc58bda9ccb98a3015b21c0f Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 3 May 2024 10:31:28 +0900 Subject: [PATCH] Simplify {session_,}result_with_timings types (#1126) * Simplify {session_,}result_with_timings types * Add anoter minor cleanup... --- unified-scheduler-pool/src/lib.rs | 48 ++++++++++++++----------------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 0b7c5495b0accc..5c4f47ac7043f0 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -484,8 +484,8 @@ struct ThreadManager, TH: TaskHandler> { pool: Arc>, new_task_sender: Sender, new_task_receiver: Receiver, - session_result_sender: Sender>, - session_result_receiver: Receiver>, + session_result_sender: Sender, + session_result_receiver: Receiver, session_result_with_timings: Option, scheduler_thread: Option>, handler_threads: Vec>, @@ -542,6 +542,7 @@ impl, TH: TaskHandler> ThreadManager { (result, timings): &mut ResultWithTimings, executed_task: Box, ) { + timings.accumulate(&executed_task.result_with_timings.1); match executed_task.result_with_timings.0 { Ok(()) => {} Err(error) => { @@ -553,7 +554,6 @@ impl, TH: TaskHandler> ThreadManager { *result = Err(error); } } - timings.accumulate(&executed_task.result_with_timings.1); } fn take_session_result_with_timings(&mut self) -> ResultWithTimings { @@ -660,7 +660,7 @@ impl, TH: TaskHandler> ThreadManager { let (finished_idle_task_sender, finished_idle_task_receiver) = crossbeam_channel::unbounded::>(); - let mut result_with_timings = self.session_result_with_timings.take(); + assert_matches!(self.session_result_with_timings, None); // High-level flow of new tasks: // 1. the replay stage thread send a new task. @@ -727,19 +727,19 @@ impl, TH: TaskHandler> ThreadManager { let mut state_machine = unsafe { SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() }; + let mut result_with_timings = initialized_result_with_timings(); loop { - if let Ok(NewTaskPayload::OpenSubchannel(context)) = new_task_receiver.recv() { - // signal about new SchedulingContext to handler threads - runnable_task_sender - .send_chained_channel(context, handler_count) - .unwrap(); - assert_matches!( - result_with_timings.replace(initialized_result_with_timings()), - None - ); - } else { - unreachable!(); + match new_task_receiver.recv() { + Ok(NewTaskPayload::OpenSubchannel(context)) => { + // signal about new SchedulingContext to handler threads + runnable_task_sender + .send_chained_channel(context, handler_count) + .unwrap(); + } + _ => { + unreachable!(); + } } let mut is_finished = false; @@ -769,8 +769,7 @@ impl, TH: TaskHandler> ThreadManager { let executed_task = executed_task.unwrap(); state_machine.deschedule_task(&executed_task.task); - let result_with_timings = result_with_timings.as_mut().unwrap(); - Self::accumulate_result_with_timings(result_with_timings, executed_task); + Self::accumulate_result_with_timings(&mut result_with_timings, executed_task); }, recv(dummy_unblocked_task_receiver) -> dummy => { assert_matches!(dummy, Err(RecvError)); @@ -801,8 +800,7 @@ impl, TH: TaskHandler> ThreadManager { let executed_task = executed_task.unwrap(); state_machine.deschedule_task(&executed_task.task); - let result_with_timings = result_with_timings.as_mut().unwrap(); - Self::accumulate_result_with_timings(result_with_timings, executed_task); + Self::accumulate_result_with_timings(&mut result_with_timings, executed_task); }, }; @@ -812,10 +810,9 @@ impl, TH: TaskHandler> ThreadManager { if session_ending { state_machine.reinitialize(); session_result_sender - .send(Some( - result_with_timings - .take() - .unwrap_or_else(initialized_result_with_timings), + .send(std::mem::replace( + &mut result_with_timings, + initialized_result_with_timings(), )) .unwrap(); session_ending = false; @@ -894,9 +891,8 @@ impl, TH: TaskHandler> ThreadManager { .send(NewTaskPayload::CloseSubchannel) .unwrap(); - if let Some(result_with_timings) = self.session_result_receiver.recv().unwrap() { - self.put_session_result_with_timings(result_with_timings); - } + let result_with_timings = self.session_result_receiver.recv().unwrap(); + self.put_session_result_with_timings(result_with_timings); } fn start_session(&mut self, context: &SchedulingContext) {