Skip to content

Commit

Permalink
feat(derive): Holocene flush signal (#612)
Browse files Browse the repository at this point in the history
  • Loading branch information
clabby authored Oct 3, 2024
1 parent 3699a61 commit e34cb93
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 19 deletions.
51 changes: 44 additions & 7 deletions crates/derive/src/pipeline/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use super::{
NextAttributes, OriginAdvancer, OriginProvider, Pipeline, PipelineError, PipelineResult,
ResettableStage, StepResult,
};
use crate::{errors::PipelineErrorKind, traits::Signal};
use crate::{
errors::PipelineErrorKind,
traits::{FlushableStage, Signal},
};
use alloc::{boxed::Box, collections::VecDeque, string::ToString, sync::Arc};
use async_trait::async_trait;
use core::fmt::Debug;
Expand All @@ -18,7 +21,13 @@ use tracing::{error, trace, warn};
#[derive(Debug)]
pub struct DerivationPipeline<S, P>
where
S: NextAttributes + ResettableStage + OriginProvider + OriginAdvancer + Debug + Send,
S: NextAttributes
+ ResettableStage
+ FlushableStage
+ OriginProvider
+ OriginAdvancer
+ Debug
+ Send,
P: L2ChainProvider + Send + Sync + Debug,
{
/// A handle to the next attributes.
Expand All @@ -35,7 +44,13 @@ where

impl<S, P> DerivationPipeline<S, P>
where
S: NextAttributes + ResettableStage + OriginProvider + OriginAdvancer + Debug + Send,
S: NextAttributes
+ ResettableStage
+ FlushableStage
+ OriginProvider
+ OriginAdvancer
+ Debug
+ Send,
P: L2ChainProvider + Send + Sync + Debug,
{
/// Creates a new instance of the [DerivationPipeline].
Expand All @@ -50,7 +65,13 @@ where

impl<S, P> OriginProvider for DerivationPipeline<S, P>
where
S: NextAttributes + ResettableStage + OriginProvider + OriginAdvancer + Debug + Send,
S: NextAttributes
+ ResettableStage
+ FlushableStage
+ OriginProvider
+ OriginAdvancer
+ Debug
+ Send,
P: L2ChainProvider + Send + Sync + Debug,
{
fn origin(&self) -> Option<BlockInfo> {
Expand All @@ -60,7 +81,14 @@ where

impl<S, P> Iterator for DerivationPipeline<S, P>
where
S: NextAttributes + ResettableStage + OriginProvider + OriginAdvancer + Debug + Send + Sync,
S: NextAttributes
+ ResettableStage
+ FlushableStage
+ OriginProvider
+ OriginAdvancer
+ Debug
+ Send
+ Sync,
P: L2ChainProvider + Send + Sync + Debug,
{
type Item = OptimismAttributesWithParent;
Expand All @@ -73,7 +101,14 @@ where
#[async_trait]
impl<S, P> Pipeline for DerivationPipeline<S, P>
where
S: NextAttributes + ResettableStage + OriginProvider + OriginAdvancer + Debug + Send + Sync,
S: NextAttributes
+ ResettableStage
+ FlushableStage
+ OriginProvider
+ OriginAdvancer
+ Debug
+ Send
+ Sync,
P: L2ChainProvider + Send + Sync + Debug,
{
/// Peeks at the next prepared [OptimismAttributesWithParent] from the pipeline.
Expand Down Expand Up @@ -118,7 +153,9 @@ where
}
}
}
_ => unimplemented!("Signal not implemented"),
Signal::FlushChannel => {
self.attributes.flush_channel().await?;
}
}
Ok(())
}
Expand Down
32 changes: 26 additions & 6 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
//! Contains the logic for the `AttributesQueue` stage.

use crate::{
batch::SingleBatch,
errors::{PipelineError, PipelineResult, ResetError},
traits::{
AttributesBuilder, FlushableStage, NextAttributes, OriginAdvancer, OriginProvider,
ResettableStage,
},
};
use alloc::{boxed::Box, sync::Arc};
use async_trait::async_trait;
use core::fmt::Debug;
Expand All @@ -8,12 +16,6 @@ use op_alloy_protocol::{BlockInfo, L2BlockInfo};
use op_alloy_rpc_types_engine::{OptimismAttributesWithParent, OptimismPayloadAttributes};
use tracing::info;

use crate::{
batch::SingleBatch,
errors::{PipelineError, PipelineResult, ResetError},
traits::{AttributesBuilder, NextAttributes, OriginAdvancer, OriginProvider, ResettableStage},
};

/// [AttributesProvider] is a trait abstraction that generalizes the [BatchQueue] stage.
///
/// [BatchQueue]: crate::stages::BatchQueue
Expand Down Expand Up @@ -207,6 +209,24 @@ where
}
}

#[async_trait]
impl<P, AB> FlushableStage for AttributesQueue<P, AB>
where
P: AttributesProvider
+ OriginAdvancer
+ OriginProvider
+ ResettableStage
+ FlushableStage
+ Debug
+ Send,
AB: AttributesBuilder + Debug + Send,
{
async fn flush_channel(&mut self) -> PipelineResult<()> {
self.batch = None;
self.prev.flush_channel().await
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
22 changes: 21 additions & 1 deletion crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
batch::{Batch, BatchValidity, BatchWithInclusionBlock, SingleBatch},
errors::{PipelineEncodingError, PipelineError, PipelineErrorKind, PipelineResult, ResetError},
stages::attributes_queue::AttributesProvider,
traits::{OriginAdvancer, OriginProvider, ResettableStage},
traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage},
};
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use async_trait::async_trait;
Expand Down Expand Up @@ -467,6 +467,26 @@ where
}
}

#[async_trait]
impl<P, BF> FlushableStage for BatchQueue<P, BF>
where
P: BatchQueueProvider
+ OriginAdvancer
+ OriginProvider
+ ResettableStage
+ FlushableStage
+ Send
+ Debug,
BF: L2ChainProvider + Send + Debug,
{
async fn flush_channel(&mut self) -> PipelineResult<()> {
self.batches.clear();
self.l1_blocks.clear();
self.next_spans.clear();
self.prev.flush_channel().await
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
22 changes: 21 additions & 1 deletion crates/derive/src/stages/batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
errors::{PipelineEncodingError, PipelineError, PipelineResult},
pipeline::L2ChainProvider,
stages::BatchQueueProvider,
traits::{OriginAdvancer, OriginProvider, ResettableStage},
traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage},
};
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
use async_trait::async_trait;
Expand Down Expand Up @@ -195,12 +195,32 @@ where
{
async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()> {
self.prev.reset(base, cfg).await?;
self.buffer.clear();
self.span.take();
crate::inc!(STAGE_RESETS, &["batch-span"]);
Ok(())
}
}

#[async_trait]
impl<P, BF> FlushableStage for BatchStream<P, BF>
where
P: BatchStreamProvider
+ OriginAdvancer
+ OriginProvider
+ ResettableStage
+ FlushableStage
+ Debug
+ Send,
BF: L2ChainProvider + Debug + Send,
{
async fn flush_channel(&mut self) -> PipelineResult<()> {
self.span.take();
self.buffer.clear();
self.prev.flush_channel().await
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
15 changes: 14 additions & 1 deletion crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
batch::Batch,
errors::{PipelineError, PipelineResult},
stages::{decompress_brotli, BatchStreamProvider},
traits::{OriginAdvancer, OriginProvider, ResettableStage},
traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage},
};
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use alloy_primitives::Bytes;
Expand Down Expand Up @@ -157,6 +157,19 @@ where
}
}

#[async_trait]
impl<P> FlushableStage for ChannelReader<P>
where
P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send,
{
async fn flush_channel(&mut self) -> PipelineResult<()> {
// Drop the current in-progress channel.
warn!(target: "channel-reader", "Flushed channel");
self.next_batch = None;
Ok(())
}
}

/// Batch Reader provides a function that iteratively consumes batches from the reader.
/// The L1Inclusion block is also provided at creation time.
/// Warning: the batch reader can read every batch-type.
Expand Down
3 changes: 1 addition & 2 deletions crates/derive/src/traits/attributes.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
//! Contains traits for working with payload attributes and their providers.

use crate::errors::PipelineResult;
use alloc::boxed::Box;
use alloy_eips::BlockNumHash;
use async_trait::async_trait;
use op_alloy_protocol::L2BlockInfo;
use op_alloy_rpc_types_engine::{OptimismAttributesWithParent, OptimismPayloadAttributes};

use crate::errors::PipelineResult;

/// [NextAttributes] defines the interface for pulling attributes from
/// the top level `AttributesQueue` stage of the pipeline.
#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/traits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod reset;
pub use reset::ResetProvider;

mod stages;
pub use stages::{OriginAdvancer, OriginProvider, ResettableStage};
pub use stages::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage};

#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
7 changes: 7 additions & 0 deletions crates/derive/src/traits/stages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ pub trait ResettableStage {
async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()>;
}

/// Describes the functionality of a stage that can receive a channel flush signal.
#[async_trait]
pub trait FlushableStage {
/// Flushes the current channel.
async fn flush_channel(&mut self) -> PipelineResult<()>;
}

/// Provides a method for accessing the pipeline's current L1 origin.
pub trait OriginProvider {
/// Returns the optional L1 [BlockInfo] origin.
Expand Down

0 comments on commit e34cb93

Please sign in to comment.