From 694cf90c971e58797a1b7d645a778e723fee8e00 Mon Sep 17 00:00:00 2001 From: meship-starkware Date: Tue, 16 Jul 2024 10:51:07 +0300 Subject: [PATCH] fix(concurrency): stopping the program if one of the threads panics --- .../src/blockifier/transaction_executor.rs | 20 ++++++++++++++++--- .../blockifier/src/concurrency/scheduler.rs | 2 +- .../src/concurrency/worker_logic.rs | 9 ++++++++- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/crates/blockifier/src/blockifier/transaction_executor.rs b/crates/blockifier/src/blockifier/transaction_executor.rs index 7e2941fc79..7a855b2a1f 100644 --- a/crates/blockifier/src/blockifier/transaction_executor.rs +++ b/crates/blockifier/src/blockifier/transaction_executor.rs @@ -1,6 +1,8 @@ #[cfg(feature = "concurrency")] use std::collections::{HashMap, HashSet}; #[cfg(feature = "concurrency")] +use std::panic::{self, catch_unwind, AssertUnwindSafe}; +#[cfg(feature = "concurrency")] use std::sync::Arc; #[cfg(feature = "concurrency")] use std::sync::Mutex; @@ -12,6 +14,7 @@ use thiserror::Error; use crate::blockifier::config::TransactionExecutorConfig; use crate::bouncer::{Bouncer, BouncerWeights}; +use crate::concurrency::scheduler::TransactionCommitter; #[cfg(feature = "concurrency")] use crate::concurrency::worker_logic::WorkerExecutor; use crate::context::BlockContext; @@ -233,10 +236,22 @@ impl TransactionExecutor { // TODO(barak, 01/07/2024): Consider using tokio and spawn tasks that will be served by some // upper level tokio thread pool (Runtime in tokio terminology). std::thread::scope(|s| { - for _ in 0..self.config.concurrency_config.n_workers { + for i in 0..self.config.concurrency_config.n_workers { let worker_executor = Arc::clone(&worker_executor); s.spawn(move || { - worker_executor.run(); + let result = catch_unwind(AssertUnwindSafe(|| { + worker_executor.run(); + })); + if let Err(err) = result { + println!("Thread {} caught a panic, propagating it.", i); + let panic_idx = Mutex::new(1); + let mut tx_commiter = TransactionCommitter::new( + &worker_executor.scheduler, + panic_idx.lock().unwrap(), + ); + tx_commiter.halt_scheduler(); + panic::resume_unwind(err); + } }); } }); @@ -270,7 +285,6 @@ impl TransactionExecutor { }) .commit_chunk_and_recover_block_state(n_committed_txs, visited_pcs); self.block_state.replace(block_state_after_commit); - tx_execution_results } } diff --git a/crates/blockifier/src/concurrency/scheduler.rs b/crates/blockifier/src/concurrency/scheduler.rs index 172918b365..d298bf5328 100644 --- a/crates/blockifier/src/concurrency/scheduler.rs +++ b/crates/blockifier/src/concurrency/scheduler.rs @@ -231,7 +231,7 @@ impl Scheduler { } /// Returns the done marker. - fn done(&self) -> bool { + pub(crate) fn done(&self) -> bool { self.done_marker.load(Ordering::Acquire) } diff --git a/crates/blockifier/src/concurrency/worker_logic.rs b/crates/blockifier/src/concurrency/worker_logic.rs index 60d5b19e71..5122cf7f6e 100644 --- a/crates/blockifier/src/concurrency/worker_logic.rs +++ b/crates/blockifier/src/concurrency/worker_logic.rs @@ -84,6 +84,9 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { } pub fn run(&self) { + if self.scheduler.done() { + return; + }; let mut task = Task::AskForTask; loop { self.commit_while_possible(); @@ -118,7 +121,10 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { fn execute(&self, tx_index: TxIndex) { self.execute_tx(tx_index); - self.scheduler.finish_execution(tx_index) + if tx_index == 1 { + panic!("test concurrency panic behaviour"); + } + self.scheduler.finish_execution(tx_index); } fn execute_tx(&self, tx_index: TxIndex) { @@ -189,6 +195,7 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { /// * Else (execution failed), commit the transaction without fixing the call info or /// updating the sequencer balance. fn commit_tx(&self, tx_index: TxIndex) -> bool { + println!("commit_tx {}", tx_index); let execution_output = lock_mutex_in_array(&self.execution_outputs, tx_index); let execution_output_ref = execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR); let reads = &execution_output_ref.reads;