Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving blob propagation post-PeerDAS with Decentralized Blob Building #6268

Merged
merged 46 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
662d6cf
Get blobs from EL.
jimmygchen Aug 29, 2024
0c23848
Avoid cloning blobs after fetching blobs.
jimmygchen Aug 29, 2024
89dfaaa
Address review comments and refactor code.
jimmygchen Aug 29, 2024
401231b
Fix lint.
jimmygchen Aug 29, 2024
2efc99b
Move blob computation metric to the right spot.
jimmygchen Aug 29, 2024
db6318e
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Aug 30, 2024
aa79ec6
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Sep 5, 2024
36b23b2
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Sep 9, 2024
6bff4ab
Gradual publication of data columns for supernodes.
michaelsproul Sep 13, 2024
7977999
Recompute head after importing block with blobs from the EL.
jimmygchen Sep 13, 2024
5e75527
Fix lint
jimmygchen Sep 17, 2024
3444281
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Sep 17, 2024
e76d21f
Use blocking task instead of async when computing cells.
jimmygchen Sep 19, 2024
4b2956f
Merge branch 'das-fetch-blobs' of github.com:jimmygchen/lighthouse in…
jimmygchen Sep 19, 2024
a6fbb3c
Merge remote-tracking branch 'origin/unstable' into das-fetch-blobs
michaelsproul Sep 27, 2024
4639146
Fix semantic conflicts
michaelsproul Sep 27, 2024
41c2e7d
Downgrade error log.
jimmygchen Oct 4, 2024
d8eedf3
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Oct 17, 2024
a65030b
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Oct 17, 2024
0a2c6f7
Publish block without waiting for blob and column proof computation.
jimmygchen Oct 18, 2024
0a9d5a0
Address review comments and refactor.
jimmygchen Oct 18, 2024
6511c93
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Oct 18, 2024
7939888
Fix test and docs.
jimmygchen Oct 18, 2024
5ec9756
Comment cleanups.
jimmygchen Oct 18, 2024
422fc1b
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Oct 22, 2024
bf19769
Address review comments and cleanup
jimmygchen Oct 22, 2024
6e44763
Address review comments and cleanup
jimmygchen Oct 22, 2024
1872ae5
Refactor to de-duplicate gradual publication logic.
jimmygchen Oct 28, 2024
81493f5
Add more logging.
jimmygchen Oct 28, 2024
4278e6a
Merge remote-tracking branch 'origin/unstable' into das-fetch-blobs
jimmygchen Oct 29, 2024
756230e
Fix incorrect comparison on `num_fetched_blobs`.
jimmygchen Oct 29, 2024
0f32b73
Implement gradual blob publication.
jimmygchen Oct 29, 2024
dfdfccb
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Oct 29, 2024
1701565
Inline `publish_fn`.
jimmygchen Oct 29, 2024
b081dfc
Merge branch 'das-fetch-blobs' of github.com:jimmygchen/lighthouse in…
jimmygchen Oct 29, 2024
1aa125e
Gossip verify blobs before publishing
michaelsproul Nov 1, 2024
7fa2f44
Avoid queries for 0 blobs and error for duplicates
michaelsproul Nov 1, 2024
ca7a78f
Gossip verified engine blob before processing them, and use observe c…
jimmygchen Nov 1, 2024
b41083f
Merge branch 'das-fetch-blobs' of github.com:jimmygchen/lighthouse in…
jimmygchen Nov 1, 2024
9fd74fd
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Nov 1, 2024
2f11f3c
Fix invalid commitment inclusion proofs in blob sidecars created from…
jimmygchen Nov 1, 2024
1b6d9ed
Only publish EL blobs triggered from gossip block, and not RPC block.
jimmygchen Nov 4, 2024
9c9de63
Downgrade gossip blob log to `debug`.
jimmygchen Nov 4, 2024
8060e86
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Nov 5, 2024
d2c75a2
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Nov 6, 2024
0f06877
Grammar
michaelsproul Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions beacon_node/beacon_chain/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ fn all_benches(c: &mut Criterion) {

let kzg = get_kzg(&spec);
for blob_count in [1, 2, 3, 6] {
let kzg = kzg.clone();
let (signed_block, blob_sidecars) = create_test_block_and_blobs::<E>(blob_count, &spec);
let (signed_block, blobs) = create_test_block_and_blobs::<E>(blob_count, &spec);

let column_sidecars =
blobs_to_data_column_sidecars(&blob_sidecars, &signed_block, &kzg.clone(), &spec)
.unwrap();
let column_sidecars = blobs_to_data_column_sidecars(
&blobs.iter().collect::<Vec<_>>(),
&signed_block,
&kzg,
&spec,
)
.unwrap();

let spec = spec.clone();

Expand Down
287 changes: 195 additions & 92 deletions beacon_node/beacon_chain/src/beacon_chain.rs

Large diffs are not rendered by default.

96 changes: 69 additions & 27 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use derivative::Derivative;
use slot_clock::SlotClock;
use std::marker::PhantomData;
use std::sync::Arc;

use crate::beacon_chain::{BeaconChain, BeaconChainTypes};
Expand All @@ -8,11 +9,11 @@ use crate::block_verification::{
BlockSlashInfo,
};
use crate::kzg_utils::{validate_blob, validate_blobs};
use crate::observed_data_sidecars::{DoNotObserve, ObservationStrategy, Observe};
use crate::{metrics, BeaconChainError};
use kzg::{Error as KzgError, Kzg, KzgCommitment};
use slog::debug;
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use std::time::Duration;
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
Expand Down Expand Up @@ -156,20 +157,16 @@ impl From<BeaconStateError> for GossipBlobError {
}
}

pub type GossipVerifiedBlobList<T> = VariableList<
GossipVerifiedBlob<T>,
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
>;

/// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on
/// the p2p network.
#[derive(Debug)]
pub struct GossipVerifiedBlob<T: BeaconChainTypes> {
pub struct GossipVerifiedBlob<T: BeaconChainTypes, O: ObservationStrategy = Observe> {
block_root: Hash256,
blob: KzgVerifiedBlob<T::EthSpec>,
_phantom: PhantomData<O>,
}

impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedBlob<T, O> {
pub fn new(
blob: Arc<BlobSidecar<T::EthSpec>>,
subnet_id: u64,
Expand All @@ -178,7 +175,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
let header = blob.signed_block_header.clone();
// We only process slashing info if the gossip verification failed
// since we do not process the blob any further in that case.
validate_blob_sidecar_for_gossip(blob, subnet_id, chain).map_err(|e| {
validate_blob_sidecar_for_gossip::<T, O>(blob, subnet_id, chain).map_err(|e| {
process_block_slash_info::<_, GossipBlobError>(
chain,
BlockSlashInfo::from_early_error_blob(header, e),
Expand All @@ -195,6 +192,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
blob,
seen_timestamp: Duration::from_secs(0),
},
_phantom: PhantomData,
}
}
pub fn id(&self) -> BlobIdentifier {
Expand Down Expand Up @@ -335,6 +333,25 @@ impl<E: EthSpec> KzgVerifiedBlobList<E> {
verified_blobs: blobs,
})
}

/// Create a `KzgVerifiedBlobList` from `blobs` that are already KZG verified.
///
/// This should be used with caution, as used incorrectly it could result in KZG verification
/// being skipped and invalid blobs being deemed valid.
pub fn from_verified<I: IntoIterator<Item = Arc<BlobSidecar<E>>>>(
blobs: I,
seen_timestamp: Duration,
) -> Self {
Self {
verified_blobs: blobs
.into_iter()
.map(|blob| KzgVerifiedBlob {
blob,
seen_timestamp,
})
.collect(),
}
}
}

impl<E: EthSpec> IntoIterator for KzgVerifiedBlobList<E> {
Expand Down Expand Up @@ -364,11 +381,11 @@ where
validate_blobs::<E>(kzg, commitments.as_slice(), blobs, proofs.as_slice())
}

pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes, O: ObservationStrategy>(
blob_sidecar: Arc<BlobSidecar<T::EthSpec>>,
subnet: u64,
chain: &BeaconChain<T>,
) -> Result<GossipVerifiedBlob<T>, GossipBlobError> {
) -> Result<GossipVerifiedBlob<T, O>, GossipBlobError> {
let blob_slot = blob_sidecar.slot();
let blob_index = blob_sidecar.index;
let block_parent_root = blob_sidecar.block_parent_root();
Expand Down Expand Up @@ -568,16 +585,45 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
)
.map_err(|e| GossipBlobError::BeaconChainError(e.into()))?;

if O::observe() {
observe_gossip_blob(&kzg_verified_blob.blob, chain)?;
}

Ok(GossipVerifiedBlob {
block_root,
blob: kzg_verified_blob,
_phantom: PhantomData,
})
}

impl<T: BeaconChainTypes> GossipVerifiedBlob<T, DoNotObserve> {
pub fn observe(
self,
chain: &BeaconChain<T>,
) -> Result<GossipVerifiedBlob<T, Observe>, GossipBlobError> {
observe_gossip_blob(&self.blob.blob, chain)?;
Ok(GossipVerifiedBlob {
block_root: self.block_root,
blob: self.blob,
_phantom: PhantomData,
})
}
}

fn observe_gossip_blob<T: BeaconChainTypes>(
blob_sidecar: &BlobSidecar<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<(), GossipBlobError> {
// Now the signature is valid, store the proposal so we don't accept another blob sidecar
// with the same `BlobIdentifier`.
// It's important to double-check that the proposer still hasn't been observed so we don't
// have a race-condition when verifying two blocks simultaneously.
// with the same `BlobIdentifier`. It's important to double-check that the proposer still
// hasn't been observed so we don't have a race-condition when verifying two blocks
// simultaneously.
//
// Note: If this BlobSidecar goes on to fail full verification, we do not evict it from the seen_cache
// as alternate blob_sidecars for the same identifier can still be retrieved
// over rpc. Evicting them from this cache would allow faster propagation over gossip. So we allow
// retrieval of potentially valid blocks over rpc, but try to punish the proposer for signing
// invalid messages. Issue for more background
// Note: If this BlobSidecar goes on to fail full verification, we do not evict it from the
// seen_cache as alternate blob_sidecars for the same identifier can still be retrieved over
// rpc. Evicting them from this cache would allow faster propagation over gossip. So we
// allow retrieval of potentially valid blocks over rpc, but try to punish the proposer for
// signing invalid messages. Issue for more background
// https://github.com/ethereum/consensus-specs/issues/3261
if chain
.observed_blob_sidecars
Expand All @@ -586,16 +632,12 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
.map_err(|e| GossipBlobError::BeaconChainError(e.into()))?
{
return Err(GossipBlobError::RepeatBlob {
proposer: proposer_index as u64,
slot: blob_slot,
index: blob_index,
proposer: blob_sidecar.block_proposer_index(),
slot: blob_sidecar.slot(),
index: blob_sidecar.index,
});
}

Ok(GossipVerifiedBlob {
block_root,
blob: kzg_verified_blob,
})
Ok(())
}

/// Returns the canonical root of the given `blob`.
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ pub struct SignatureVerifiedBlock<T: BeaconChainTypes> {
consensus_context: ConsensusContext<T::EthSpec>,
}

/// Used to await the result of executing payload with a remote EE.
/// Used to await the result of executing payload with an EE.
type PayloadVerificationHandle = JoinHandle<Option<Result<PayloadVerificationOutcome, BlockError>>>;

/// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and
Expand Down Expand Up @@ -750,7 +750,8 @@ pub fn build_blob_data_column_sidecars<T: BeaconChainTypes>(
&metrics::DATA_COLUMN_SIDECAR_COMPUTATION,
&[&blobs.len().to_string()],
);
let sidecars = blobs_to_data_column_sidecars(&blobs, block, &chain.kzg, &chain.spec)
let blob_refs = blobs.iter().collect::<Vec<_>>();
let sidecars = blobs_to_data_column_sidecars(&blob_refs, block, &chain.kzg, &chain.spec)
.discard_timer_on_break(&mut timer)?;
drop(timer);
Ok(sidecars)
Expand Down Expand Up @@ -1343,7 +1344,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
/*
* Perform cursory checks to see if the block is even worth processing.
*/

check_block_relevancy(block.as_block(), block_root, chain)?;

// Define a future that will verify the execution payload with an execution engine.
Expand Down
8 changes: 8 additions & 0 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ pub struct ChainConfig {
pub malicious_withhold_count: usize,
/// Enable peer sampling on blocks.
pub enable_sampling: bool,
/// Number of batches that the node splits blobs or data columns into during publication.
/// This doesn't apply if the node is the block proposer. For PeerDAS only.
pub blob_publication_batches: usize,
/// The delay in milliseconds applied by the node between sending each blob or data column batch.
/// This doesn't apply if the node is the block proposer.
pub blob_publication_batch_interval: Duration,
}

impl Default for ChainConfig {
Expand Down Expand Up @@ -121,6 +127,8 @@ impl Default for ChainConfig {
enable_light_client_server: false,
malicious_withhold_count: 0,
enable_sampling: false,
blob_publication_batches: 4,
blob_publication_batch_interval: Duration::from_millis(300),
}
}
}
Expand Down
53 changes: 31 additions & 22 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use task_executor::TaskExecutor;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, Slot,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock,
};

mod error;
Expand Down Expand Up @@ -146,6 +146,10 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.sampling_column_count()
}

pub(crate) fn is_supernode(&self) -> bool {
self.get_sampling_column_count() == self.spec.number_of_columns
}

/// Checks if the block root is currenlty in the availability cache awaiting import because
/// of missing components.
pub fn get_execution_valid_block(
Expand Down Expand Up @@ -201,7 +205,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn put_rpc_blobs(
&self,
block_root: Hash256,
epoch: Epoch,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let seen_timestamp = self
Expand All @@ -212,15 +215,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
// Note: currently not reporting which specific blob is invalid because we fetch all blobs
// from the same peer for both lookup and range sync.

let verified_blobs = KzgVerifiedBlobList::new(
Vec::from(blobs).into_iter().flatten(),
&self.kzg,
seen_timestamp,
)
.map_err(AvailabilityCheckError::InvalidBlobs)?;
let verified_blobs =
KzgVerifiedBlobList::new(blobs.iter().flatten().cloned(), &self.kzg, seen_timestamp)
.map_err(AvailabilityCheckError::InvalidBlobs)?;

self.availability_cache
.put_kzg_verified_blobs(block_root, epoch, verified_blobs, &self.log)
.put_kzg_verified_blobs(block_root, verified_blobs, &self.log)
}

/// Put a list of custody columns received via RPC into the availability cache. This performs KZG
Expand All @@ -229,7 +229,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn put_rpc_custody_columns(
&self,
block_root: Hash256,
epoch: Epoch,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
// TODO(das): report which column is invalid for proper peer scoring
Expand All @@ -248,12 +247,32 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

self.availability_cache.put_kzg_verified_data_columns(
block_root,
epoch,
verified_custody_columns,
&self.log,
)
}

/// Put a list of blobs received from the EL pool into the availability cache.
///
/// This DOES NOT perform KZG verification because the KZG proofs should have been constructed
/// immediately prior to calling this function so they are assumed to be valid.
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
pub fn put_engine_blobs(
&self,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let seen_timestamp = self
.slot_clock
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;

let verified_blobs =
KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp);

self.availability_cache
.put_kzg_verified_blobs(block_root, verified_blobs, &self.log)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
/// have a block cached, return the `Availability` variant triggering block import.
/// Otherwise cache the blob sidecar.
Expand All @@ -265,7 +284,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
self.availability_cache.put_kzg_verified_blobs(
gossip_blob.block_root(),
gossip_blob.epoch(),
vec![gossip_blob.into_inner()],
&self.log,
)
Expand All @@ -279,20 +297,16 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
#[allow(clippy::type_complexity)]
pub fn put_gossip_data_columns(
&self,
slot: Slot,
block_root: Hash256,
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());

let custody_columns = gossip_data_columns
.into_iter()
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
.collect::<Vec<_>>();

self.availability_cache.put_kzg_verified_data_columns(
block_root,
epoch,
custody_columns,
&self.log,
)
Expand Down Expand Up @@ -595,12 +609,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
);

self.availability_cache
.put_kzg_verified_data_columns(
*block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
data_columns_to_publish.clone(),
&self.log,
)
.put_kzg_verified_data_columns(*block_root, data_columns_to_publish.clone(), &self.log)
.map(|availability| {
DataColumnReconstructionResult::Success((
availability,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub enum Error {
blob_commitment: KzgCommitment,
block_commitment: KzgCommitment,
},
UnableToDetermineImportRequirement,
Unexpected,
SszTypes(ssz_types::Error),
MissingBlobs,
Expand Down Expand Up @@ -44,7 +43,6 @@ impl Error {
| Error::Unexpected
| Error::ParentStateMissing(_)
| Error::BlockReplayError(_)
| Error::UnableToDetermineImportRequirement
| Error::RebuildingStateCaches(_)
| Error::SlotClockError => ErrorCategory::Internal,
Error::InvalidBlobs { .. }
Expand Down
Loading
Loading