diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index a4e734e9c..141e65fc3 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -24,7 +24,7 @@ use tracing::{debug, error, info, instrument, trace, warn, Span}; use restate_bifrost::Bifrost; use restate_core::network::{HasConnection, Incoming, Outgoing}; -use restate_core::{cancellation_watcher, TaskCenter, TaskKind}; +use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind}; use restate_partition_store::{PartitionStore, PartitionStoreTransaction}; use restate_storage_api::deduplication_table::{ DedupInformation, DedupSequenceNumber, DeduplicationTable, ProducerId, @@ -252,10 +252,19 @@ pub struct PartitionProcessor { partition_store: PartitionStore, } -#[derive(Debug)] +#[derive(Debug, derive_more::Display, thiserror::Error)] pub enum ProcessorStopReason { - Cancelled, - LogTrimGap { to_lsn: Lsn }, + TrimGapEncountered { + // last_applied_lsn: Lsn, + gap_to_lsn: Lsn, + }, + Storage(#[from] StorageError), + Bifrost(#[from] restate_bifrost::Error), + StateMachine(#[from] state_machine::Error), + ActionEffect(#[from] leadership::Error), + ShutdownError(#[from] ShutdownError), + LogReadStreamTerminated, + Other(#[from] anyhow::Error), } enum Record { @@ -263,12 +272,6 @@ enum Record { TrimGap(Lsn), } -impl Record { - fn is_trim_gap(&self) -> bool { - matches!(self, Record::TrimGap(_)) - } -} - impl PartitionProcessor where Codec: RawEntryCodec + Default + Debug, @@ -278,25 +281,25 @@ where level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty) )] - pub async fn run(mut self) -> anyhow::Result { + pub async fn run(mut self) -> Result<(), ProcessorStopReason> { info!("Starting the partition processor."); let res = tokio::select! { res = self.run_inner() => { match res.as_ref() { - Ok( ProcessorStopReason::LogTrimGap { to_lsn } ) => + Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."), + Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }) => info!( - trim_gap_to_lsn = ?to_lsn, + trim_gap_to_lsn = ?gap_to_lsn, "Shutting partition processor down because it encountered a trim gap in the log." ), - Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."), Err(err) => warn!("Shutting partition processor down because it failed: {err}"), } res }, _ = cancellation_watcher() => { debug!("Shutting partition processor down because it was cancelled."); - Ok(ProcessorStopReason::Cancelled) + Ok(()) }, }; @@ -318,9 +321,12 @@ where res } - async fn run_inner(&mut self) -> anyhow::Result { + async fn run_inner(&mut self) -> Result<(), ProcessorStopReason> { let mut partition_store = self.partition_store.clone(); - let last_applied_lsn = partition_store.get_applied_lsn().await?; + let last_applied_lsn = partition_store + .get_applied_lsn() + .await + .map_err(ProcessorStopReason::from)?; let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID); self.status.last_applied_log_lsn = Some(last_applied_lsn); @@ -329,7 +335,8 @@ where let current_tail = self .bifrost .find_tail(LogId::from(self.partition_id)) - .await?; + .await + .map_err(ProcessorStopReason::from)?; debug!( last_applied_lsn = %last_applied_lsn, @@ -376,7 +383,8 @@ where key_query.clone(), last_applied_lsn.next(), Lsn::MAX, - )? + ) + .map_err(ProcessorStopReason::from)? .map_ok(|entry| { trace!(?entry, "Read entry"); let lsn = entry.sequence_number(); @@ -386,6 +394,7 @@ where .map(|envelope| anyhow::Ok(Record::Envelope(lsn, envelope?))) .expect("data record is present") } else { + // read_commands will translate trim gaps to errors anyhow::Ok(Record::TrimGap( entry .trim_gap_to_sequence_number() @@ -464,14 +473,14 @@ where lsn, envelope, &mut transaction, - &mut action_collector).await?; + &mut action_collector).await.map_err(ProcessorStopReason::from)?; apply_command_latency.record(command_start.elapsed()); if let Some((header, announce_leader)) = leadership_change { // commit all changes so far, this is important so that the actuators see all changes // when becoming leader. - transaction.commit().await?; + transaction.commit().await.map_err(ProcessorStopReason::from)?; // We can ignore all actions collected so far because as a new leader we have to instruct the // actuators afresh. @@ -489,7 +498,7 @@ where self.status.last_observed_leader_node = announce_leader.node_id; } - let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await?; + let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorStopReason::from)?; Span::current().record("is_leader", is_leader); @@ -508,21 +517,21 @@ where } // Commit our changes and notify actuators about actions if we are the leader - transaction.commit().await?; + transaction.commit().await.map_err(ProcessorStopReason::from)?; let actions_start = Instant::now(); - self.leadership_state.handle_actions(action_collector.drain(..)).await?; + self.leadership_state.handle_actions(action_collector.drain(..)).await.map_err(ProcessorStopReason::from)?; record_actions_latency.record(actions_start.elapsed()); - if let Some(to_lsn) = trim_gap { - return Ok(ProcessorStopReason::LogTrimGap { to_lsn }) + if let Some(gap_to_lsn) = trim_gap { + return Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }) } }, result = self.leadership_state.run() => { - let action_effects = result?; + let action_effects = result.map_err(ProcessorStopReason::from)?; // We process the action_effects not directly in the run future because it // requires the run future to be cancellation safe. In the future this could be // implemented. - self.leadership_state.handle_action_effects(action_effects).await?; + self.leadership_state.handle_action_effects(action_effects).await.map_err(ProcessorStopReason::from)?; } } // Allow other tasks on this thread to run, but only if we have exhausted the coop @@ -861,12 +870,12 @@ where } /// Tries to read as many records from the `log_reader` as are immediately available and stops - /// reading at `max_batching_size`, or at a TrimGap - whichever comes first. + /// reading at `max_batching_size`. Trim gaps will result in an immediate error. async fn read_commands( log_reader: &mut S, max_batching_size: usize, record_buffer: &mut Vec, - ) -> anyhow::Result<()> + ) -> Result<(), ProcessorStopReason> where S: Stream, restate_bifrost::Error>> + Unpin, { @@ -874,8 +883,7 @@ where let first_record = log_reader.next().await; let Some(first_record) = first_record else { - // read stream terminated! - anyhow::bail!("Read stream terminated for partition processor"); + return Err(ProcessorStopReason::LogReadStreamTerminated); }; record_buffer.clear(); @@ -885,18 +893,13 @@ where // read more message from the stream but only if they are immediately available if let Some(record) = log_reader.next().now_or_never() { let Some(record) = record else { - // read stream terminated! - anyhow::bail!("Read stream terminated for partition processor"); + return Err(ProcessorStopReason::LogReadStreamTerminated); }; - - let record = record??; - let trim_gap = record.is_trim_gap(); - - record_buffer.push(record); - - // Ensure that a trim gap is always the last record in a batch - if trim_gap { - break; + match record?? { + Record::TrimGap(gap_to_lsn) => { + return Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }); + } + record => record_buffer.push(record), } } else { // no more immediately available records found diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index ad063cabc..4cc90f574 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -433,7 +433,9 @@ impl PartitionProcessorManager { .remove(processor.as_ref().expect("must be some").key_range()); match result { - Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => { + Err(ProcessorStopReason::TrimGapEncountered { + gap_to_lsn: to_lsn, + }) => { if self.snapshot_repository.is_some() { info!( trim_gap_to_lsn = ?to_lsn, @@ -442,13 +444,17 @@ impl PartitionProcessorManager { self.fast_forward_on_startup.insert(partition_id, to_lsn); } else { warn!( - "Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}", + trim_gap_to_lsn = ?to_lsn, + "Partition processor stopped due to a log trim gap, and no snapshot repository is configured", ); } } - _ => { + Err(_) => { warn!("Partition processor exited unexpectedly: {result:?}") } + Ok(_) => { + info!("Partition processor stopped.") + } } } ProcessorState::Stopping { @@ -511,7 +517,7 @@ impl PartitionProcessorManager { fn await_runtime_task_result( &mut self, partition_id: PartitionId, - runtime_task_handle: RuntimeTaskHandle>, + runtime_task_handle: RuntimeTaskHandle>, ) { self.asynchronous_operations.spawn( async move { @@ -946,10 +952,10 @@ enum EventKind { Started( anyhow::Result<( StartedProcessor, - RuntimeTaskHandle>, + RuntimeTaskHandle>, )>, ), - Stopped(anyhow::Result), + Stopped(Result<(), ProcessorStopReason>), NewLeaderEpoch { leader_epoch_token: LeaderEpochToken, result: anyhow::Result, diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index 0c702daaa..77d4be9c1 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -79,7 +79,7 @@ impl SpawnPartitionProcessorTask { self, ) -> anyhow::Result<( StartedProcessor, - RuntimeTaskHandle>, + RuntimeTaskHandle>, )> { let Self { task_name, @@ -151,11 +151,13 @@ impl SpawnPartitionProcessorTask { TaskKind::SystemService, invoker_name, invoker.run(invoker_config), - )?; + ) + .map_err(|e| ProcessorStopReason::from(anyhow::anyhow!(e)))?; pp_builder .build::(bifrost, partition_store) - .await? + .await + .map_err(ProcessorStopReason::from)? .run() .await } @@ -199,7 +201,7 @@ async fn open_partition_store( } // We don't have a local partition store initialized - or we have a fast-forward LSN target set. - // Attempt to get the latest available snapshot from the snapshots repository: + // Attempt to get the latest available snapshot from the snapshot repository: let snapshot = match snapshot_repository { Some(repository) => { debug!("Looking for partition snapshot from which to bootstrap partition store");