Skip to content

Commit

Permalink
feat(derive): Add Signal API (#611)
Browse files Browse the repository at this point in the history
  • Loading branch information
clabby authored Oct 3, 2024
1 parent 54c1356 commit 3699a61
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use kona_derive::{
AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal,
},
traits::{BlobProvider, OriginProvider},
traits::{BlobProvider, OriginProvider, Signal},
};
use kona_executor::{KonaHandleRegister, StatelessL2BlockExecutor};
use kona_mpt::{TrieHinter, TrieProvider};
Expand Down Expand Up @@ -229,12 +229,13 @@ where
// Reset the pipeline to the initial L2 safe head and L1 origin,
// and try again.
self.pipeline
.reset(
self.l2_safe_head.block_info,
self.pipeline
.signal(Signal::Reset {
l2_safe_head: self.l2_safe_head,
l1_origin: self
.pipeline
.origin()
.ok_or_else(|| anyhow!("Missing L1 origin"))?,
)
})
.await?;
}
PipelineErrorKind::Critical(_) => return Err(e.into()),
Expand Down
42 changes: 23 additions & 19 deletions crates/derive/src/pipeline/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{
NextAttributes, OriginAdvancer, OriginProvider, Pipeline, PipelineError, PipelineResult,
ResettableStage, StepResult,
};
use crate::errors::PipelineErrorKind;
use crate::{errors::PipelineErrorKind, traits::Signal};
use alloc::{boxed::Box, collections::VecDeque, string::ToString, sync::Arc};
use async_trait::async_trait;
use core::fmt::Debug;
Expand Down Expand Up @@ -95,26 +95,30 @@ where
///
/// The `l1_block_info` is the new L1 origin set in the [crate::stages::L1Traversal]
/// stage.
async fn reset(
&mut self,
l2_block_info: BlockInfo,
l1_block_info: BlockInfo,
) -> PipelineResult<()> {
let system_config = self
.l2_chain_provider
.system_config_by_number(l2_block_info.number, Arc::clone(&self.rollup_config))
.await
.map_err(|e| PipelineError::Provider(e.to_string()).temp())?;
match self.attributes.reset(l1_block_info, &system_config).await {
Ok(()) => trace!(target: "pipeline", "Stages reset"),
Err(err) => {
if let PipelineErrorKind::Temporary(PipelineError::Eof) = err {
trace!(target: "pipeline", "Stages reset with EOF");
} else {
error!(target: "pipeline", "Stage reset errored: {:?}", err);
return Err(err);
async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
match signal {
Signal::Reset { l2_safe_head, l1_origin } => {
let system_config = self
.l2_chain_provider
.system_config_by_number(
l2_safe_head.block_info.number,
Arc::clone(&self.rollup_config),
)
.await
.map_err(|e| PipelineError::Provider(e.to_string()).temp())?;
match self.attributes.reset(l1_origin, &system_config).await {
Ok(()) => trace!(target: "pipeline", "Stages reset"),
Err(err) => {
if let PipelineErrorKind::Temporary(PipelineError::Eof) = err {
trace!(target: "pipeline", "Stages reset with EOF");
} else {
error!(target: "pipeline", "Stage reset errored: {:?}", err);
return Err(err);
}
}
}
}
_ => unimplemented!("Signal not implemented"),
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/traits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! pipeline.

mod pipeline;
pub use pipeline::{Pipeline, StepResult};
pub use pipeline::{Pipeline, Signal, StepResult};

mod attributes;
pub use attributes::{AttributesBuilder, NextAttributes};
Expand Down
17 changes: 16 additions & 1 deletion crates/derive/src/traits/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,29 @@ pub enum StepResult {
StepFailed(PipelineErrorKind),
}

/// A signal to send to the pipeline.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(clippy::large_enum_variant)]
pub enum Signal {
/// Reset the pipeline.
Reset {
/// The L2 safe head to reset to.
l2_safe_head: L2BlockInfo,
/// The L1 origin to reset to.
l1_origin: BlockInfo,
},
/// Flush the currently active channel.
FlushChannel,
}

/// This trait defines the interface for interacting with the derivation pipeline.
#[async_trait]
pub trait Pipeline: OriginProvider + Iterator<Item = OptimismAttributesWithParent> {
/// Peeks at the next [OptimismAttributesWithParent] from the pipeline.
fn peek(&self) -> Option<&OptimismAttributesWithParent>;

/// Resets the pipeline on the next [Pipeline::step] call.
async fn reset(&mut self, l2_block_info: BlockInfo, origin: BlockInfo) -> PipelineResult<()>;
async fn signal(&mut self, signal: Signal) -> PipelineResult<()>;

/// Attempts to progress the pipeline.
async fn step(&mut self, cursor: L2BlockInfo) -> StepResult;
Expand Down
1 change: 1 addition & 0 deletions examples/trusted-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ homepage.workspace = true

[dependencies]
# Workspace
kona-derive.workspace = true
kona-primitives = { workspace = true, features = ["serde"] }
kona-providers-alloy = { workspace = true, features = ["metrics"] }

Expand Down
21 changes: 15 additions & 6 deletions examples/trusted-sync/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use clap::Parser;
use kona_derive::traits::Signal;
use kona_providers_alloy::prelude::*;
use std::sync::Arc;
use superchain::ROLLUP_CONFIGS;
Expand Down Expand Up @@ -142,7 +143,10 @@ async fn sync(cli: cli::Cli) -> Result<()> {
continue;
};
info!(target: LOG_TARGET, "Resetting pipeline with l1 block info: {:?}", l1_block_info);
if let Err(e) = pipeline.reset(c.block_info, l1_block_info).await {
if let Err(e) = pipeline
.signal(Signal::Reset { l2_safe_head: c, l1_origin: l1_block_info })
.await
{
error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e);
continue;
}
Expand All @@ -169,7 +173,10 @@ async fn sync(cli: cli::Cli) -> Result<()> {
continue;
};
info!(target: LOG_TARGET, "Resetting pipeline with l1 block info: {:?}", l1_block_info);
if let Err(e) = pipeline.reset(c.block_info, l1_block_info).await {
if let Err(e) = pipeline
.signal(Signal::Reset { l2_safe_head: c, l1_origin: l1_block_info })
.await
{
error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e);
continue;
}
Expand Down Expand Up @@ -232,10 +239,12 @@ async fn sync(cli: cli::Cli) -> Result<()> {
metrics::PIPELINE_STEPS.with_label_values(&["reset"]).inc();
warn!(target: "loop", "Resetting pipeline: {:?}", e);
pipeline
.reset(
cursor.block_info,
pipeline.origin().ok_or(anyhow::anyhow!("Missing origin"))?,
)
.signal(Signal::Reset {
l2_safe_head: cursor,
l1_origin: pipeline
.origin()
.ok_or(anyhow::anyhow!("Missing origin"))?,
})
.await?;
}
PipelineErrorKind::Critical(_) => {
Expand Down

0 comments on commit 3699a61

Please sign in to comment.