Skip to content

Commit

Permalink
Model TrimGapEncountered as an error within the PP
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 24, 2024
1 parent acd5c37 commit e2e487e
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 53 deletions.
89 changes: 46 additions & 43 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tracing::{debug, error, info, instrument, trace, warn, Span};

use restate_bifrost::Bifrost;
use restate_core::network::{HasConnection, Incoming, Outgoing};
use restate_core::{cancellation_watcher, TaskCenter, TaskKind};
use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind};
use restate_partition_store::{PartitionStore, PartitionStoreTransaction};
use restate_storage_api::deduplication_table::{
DedupInformation, DedupSequenceNumber, DeduplicationTable, ProducerId,
Expand Down Expand Up @@ -252,23 +252,26 @@ pub struct PartitionProcessor<Codec, InvokerSender> {
partition_store: PartitionStore,
}

#[derive(Debug)]
#[derive(Debug, derive_more::Display, thiserror::Error)]
pub enum ProcessorStopReason {
Cancelled,
LogTrimGap { to_lsn: Lsn },
TrimGapEncountered {
// last_applied_lsn: Lsn,
gap_to_lsn: Lsn,
},
Storage(#[from] StorageError),
Bifrost(#[from] restate_bifrost::Error),
StateMachine(#[from] state_machine::Error),
ActionEffect(#[from] leadership::Error),
ShutdownError(#[from] ShutdownError),
LogReadStreamTerminated,
Other(#[from] anyhow::Error),
}

enum Record {
Envelope(Lsn, Arc<Envelope>),
TrimGap(Lsn),
}

impl Record {
fn is_trim_gap(&self) -> bool {
matches!(self, Record::TrimGap(_))
}
}

impl<Codec, InvokerSender> PartitionProcessor<Codec, InvokerSender>
where
Codec: RawEntryCodec + Default + Debug,
Expand All @@ -278,25 +281,25 @@ where
level = "error", skip_all,
fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty)
)]
pub async fn run(mut self) -> anyhow::Result<ProcessorStopReason> {
pub async fn run(mut self) -> Result<(), ProcessorStopReason> {
info!("Starting the partition processor.");

let res = tokio::select! {
res = self.run_inner() => {
match res.as_ref() {
Ok( ProcessorStopReason::LogTrimGap { to_lsn } ) =>
Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."),
Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }) =>
info!(
trim_gap_to_lsn = ?to_lsn,
trim_gap_to_lsn = ?gap_to_lsn,
"Shutting partition processor down because it encountered a trim gap in the log."
),
Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."),
Err(err) => warn!("Shutting partition processor down because it failed: {err}"),
}
res
},
_ = cancellation_watcher() => {
debug!("Shutting partition processor down because it was cancelled.");
Ok(ProcessorStopReason::Cancelled)
Ok(())
},
};

Expand All @@ -318,9 +321,12 @@ where
res
}

async fn run_inner(&mut self) -> anyhow::Result<ProcessorStopReason> {
async fn run_inner(&mut self) -> Result<(), ProcessorStopReason> {
let mut partition_store = self.partition_store.clone();
let last_applied_lsn = partition_store.get_applied_lsn().await?;
let last_applied_lsn = partition_store
.get_applied_lsn()
.await
.map_err(ProcessorStopReason::from)?;
let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID);

self.status.last_applied_log_lsn = Some(last_applied_lsn);
Expand All @@ -329,7 +335,8 @@ where
let current_tail = self
.bifrost
.find_tail(LogId::from(self.partition_id))
.await?;
.await
.map_err(ProcessorStopReason::from)?;

debug!(
last_applied_lsn = %last_applied_lsn,
Expand Down Expand Up @@ -376,7 +383,8 @@ where
key_query.clone(),
last_applied_lsn.next(),
Lsn::MAX,
)?
)
.map_err(ProcessorStopReason::from)?
.map_ok(|entry| {
trace!(?entry, "Read entry");
let lsn = entry.sequence_number();
Expand All @@ -386,6 +394,7 @@ where
.map(|envelope| anyhow::Ok(Record::Envelope(lsn, envelope?)))
.expect("data record is present")
} else {
// read_commands will translate trim gaps to errors
anyhow::Ok(Record::TrimGap(
entry
.trim_gap_to_sequence_number()
Expand Down Expand Up @@ -464,14 +473,14 @@ where
lsn,
envelope,
&mut transaction,
&mut action_collector).await?;
&mut action_collector).await.map_err(ProcessorStopReason::from)?;

apply_command_latency.record(command_start.elapsed());

if let Some((header, announce_leader)) = leadership_change {
// commit all changes so far, this is important so that the actuators see all changes
// when becoming leader.
transaction.commit().await?;
transaction.commit().await.map_err(ProcessorStopReason::from)?;

// We can ignore all actions collected so far because as a new leader we have to instruct the
// actuators afresh.
Expand All @@ -489,7 +498,7 @@ where
self.status.last_observed_leader_node = announce_leader.node_id;
}

let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await?;
let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorStopReason::from)?;

Span::current().record("is_leader", is_leader);

Expand All @@ -508,21 +517,21 @@ where
}

// Commit our changes and notify actuators about actions if we are the leader
transaction.commit().await?;
transaction.commit().await.map_err(ProcessorStopReason::from)?;
let actions_start = Instant::now();
self.leadership_state.handle_actions(action_collector.drain(..)).await?;
self.leadership_state.handle_actions(action_collector.drain(..)).await.map_err(ProcessorStopReason::from)?;
record_actions_latency.record(actions_start.elapsed());

if let Some(to_lsn) = trim_gap {
return Ok(ProcessorStopReason::LogTrimGap { to_lsn })
if let Some(gap_to_lsn) = trim_gap {
return Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn })
}
},
result = self.leadership_state.run() => {
let action_effects = result?;
let action_effects = result.map_err(ProcessorStopReason::from)?;
// We process the action_effects not directly in the run future because it
// requires the run future to be cancellation safe. In the future this could be
// implemented.
self.leadership_state.handle_action_effects(action_effects).await?;
self.leadership_state.handle_action_effects(action_effects).await.map_err(ProcessorStopReason::from)?;
}
}
// Allow other tasks on this thread to run, but only if we have exhausted the coop
Expand Down Expand Up @@ -861,21 +870,20 @@ where
}

/// Tries to read as many records from the `log_reader` as are immediately available and stops
/// reading at `max_batching_size`, or at a TrimGap - whichever comes first.
/// reading at `max_batching_size`. Trim gaps will result in an immediate error.
async fn read_commands<S>(
log_reader: &mut S,
max_batching_size: usize,
record_buffer: &mut Vec<Record>,
) -> anyhow::Result<()>
) -> Result<(), ProcessorStopReason>
where
S: Stream<Item = Result<anyhow::Result<Record>, restate_bifrost::Error>> + Unpin,
{
// beyond this point we must not await; otherwise we are no longer cancellation safe
let first_record = log_reader.next().await;

let Some(first_record) = first_record else {
// read stream terminated!
anyhow::bail!("Read stream terminated for partition processor");
return Err(ProcessorStopReason::LogReadStreamTerminated);
};

record_buffer.clear();
Expand All @@ -885,18 +893,13 @@ where
// read more message from the stream but only if they are immediately available
if let Some(record) = log_reader.next().now_or_never() {
let Some(record) = record else {
// read stream terminated!
anyhow::bail!("Read stream terminated for partition processor");
return Err(ProcessorStopReason::LogReadStreamTerminated);
};

let record = record??;
let trim_gap = record.is_trim_gap();

record_buffer.push(record);

// Ensure that a trim gap is always the last record in a batch
if trim_gap {
break;
match record?? {
Record::TrimGap(gap_to_lsn) => {
return Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn });
}
record => record_buffer.push(record),
}
} else {
// no more immediately available records found
Expand Down
18 changes: 12 additions & 6 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,9 @@ impl PartitionProcessorManager {
.remove(processor.as_ref().expect("must be some").key_range());

match result {
Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => {
Err(ProcessorStopReason::TrimGapEncountered {
gap_to_lsn: to_lsn,
}) => {
if self.snapshot_repository.is_some() {
info!(
trim_gap_to_lsn = ?to_lsn,
Expand All @@ -442,13 +444,17 @@ impl PartitionProcessorManager {
self.fast_forward_on_startup.insert(partition_id, to_lsn);
} else {
warn!(
"Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}",
trim_gap_to_lsn = ?to_lsn,
"Partition processor stopped due to a log trim gap, and no snapshot repository is configured",
);
}
}
_ => {
Err(_) => {
warn!("Partition processor exited unexpectedly: {result:?}")
}
Ok(_) => {
info!("Partition processor stopped.")
}
}
}
ProcessorState::Stopping {
Expand Down Expand Up @@ -511,7 +517,7 @@ impl PartitionProcessorManager {
fn await_runtime_task_result(
&mut self,
partition_id: PartitionId,
runtime_task_handle: RuntimeTaskHandle<anyhow::Result<ProcessorStopReason>>,
runtime_task_handle: RuntimeTaskHandle<Result<(), ProcessorStopReason>>,
) {
self.asynchronous_operations.spawn(
async move {
Expand Down Expand Up @@ -946,10 +952,10 @@ enum EventKind {
Started(
anyhow::Result<(
StartedProcessor,
RuntimeTaskHandle<anyhow::Result<ProcessorStopReason>>,
RuntimeTaskHandle<Result<(), ProcessorStopReason>>,
)>,
),
Stopped(anyhow::Result<ProcessorStopReason>),
Stopped(Result<(), ProcessorStopReason>),
NewLeaderEpoch {
leader_epoch_token: LeaderEpochToken,
result: anyhow::Result<LeaderEpoch>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl SpawnPartitionProcessorTask {
self,
) -> anyhow::Result<(
StartedProcessor,
RuntimeTaskHandle<anyhow::Result<ProcessorStopReason>>,
RuntimeTaskHandle<Result<(), ProcessorStopReason>>,
)> {
let Self {
task_name,
Expand Down Expand Up @@ -151,11 +151,13 @@ impl SpawnPartitionProcessorTask {
TaskKind::SystemService,
invoker_name,
invoker.run(invoker_config),
)?;
)
.map_err(|e| ProcessorStopReason::from(anyhow::anyhow!(e)))?;

pp_builder
.build::<ProtobufRawEntryCodec>(bifrost, partition_store)
.await?
.await
.map_err(ProcessorStopReason::from)?
.run()
.await
}
Expand Down Expand Up @@ -199,7 +201,7 @@ async fn open_partition_store(
}

// We don't have a local partition store initialized - or we have a fast-forward LSN target set.
// Attempt to get the latest available snapshot from the snapshots repository:
// Attempt to get the latest available snapshot from the snapshot repository:
let snapshot = match snapshot_repository {
Some(repository) => {
debug!("Looking for partition snapshot from which to bootstrap partition store");
Expand Down

0 comments on commit e2e487e

Please sign in to comment.