Skip to content

Commit

Permalink
Add reconstruction reason metric and more debug logging to da checker.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Oct 11, 2024
1 parent b0c3379 commit 0e6eaa2
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 80 deletions.
43 changes: 28 additions & 15 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
DataColumnReconstructionResult,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
Expand Down Expand Up @@ -3166,30 +3167,42 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

let data_availability_checker = self.data_availability_checker.clone();
let Some((availability, data_columns_to_publish)) = self

let result = self
.task_executor
.spawn_blocking_handle(
move || data_availability_checker.reconstruct_data_columns(&block_root),
"reconstruct_data_columns",
)
.ok_or(BeaconChainError::RuntimeShutdown)?
.await
.map_err(BeaconChainError::TokioJoin)??
else {
return Ok(None);
};
.map_err(BeaconChainError::TokioJoin)??;

let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
return Ok(None);
};
match result {
DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => {
let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
// This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success.
return Ok(None);
};

let r = self
.process_availability(slot, availability, || Ok(()))
.await;
self.remove_notified(&block_root, r)
.map(|availability_processing_status| {
Some((availability_processing_status, data_columns_to_publish))
})
let r = self
.process_availability(slot, availability, || Ok(()))
.await;
self.remove_notified(&block_root, r)
.map(|availability_processing_status| {
Some((availability_processing_status, data_columns_to_publish))
})
}
DataColumnReconstructionResult::NotRequired(reason)
| DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => {
// We use metric here because logging this would be *very* noisy.
metrics::inc_counter_vec(
&metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL,
&[reason],
);
Ok(None)
}
}
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
Expand Down
57 changes: 36 additions & 21 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, Kzg
use crate::block_verification_types::{
AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock,
};
use crate::data_availability_checker::overflow_lru_cache::DataAvailabilityCheckerInner;
use crate::data_availability_checker::overflow_lru_cache::{
DataAvailabilityCheckerInner, ReconstructColumnsDecision,
};
use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore};
use kzg::Kzg;
use slog::{debug, error, Logger};
Expand Down Expand Up @@ -77,6 +79,13 @@ pub struct DataAvailabilityChecker<T: BeaconChainTypes> {

pub type AvailabilityAndReconstructedColumns<E> = (Availability<E>, DataColumnSidecarList<E>);

#[derive(Debug)]
pub enum DataColumnReconstructionResult<E: EthSpec> {
Success(AvailabilityAndReconstructedColumns<E>),
NotRequired(&'static str),
RecoveredColumnsNotImported(&'static str),
}

/// This type is returned after adding a block / blob to the `DataAvailabilityChecker`.
///
/// Indicates if the block is fully `Available` or if we need blobs or blocks
Expand Down Expand Up @@ -211,7 +220,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map_err(AvailabilityCheckError::InvalidBlobs)?;

self.availability_cache
.put_kzg_verified_blobs(block_root, epoch, verified_blobs)
.put_kzg_verified_blobs(block_root, epoch, verified_blobs, &self.log)
}

/// Put a list of custody columns received via RPC into the availability cache. This performs KZG
Expand Down Expand Up @@ -241,6 +250,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
epoch,
verified_custody_columns,
&self.log,
)
}

Expand All @@ -257,6 +267,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
gossip_blob.block_root(),
gossip_blob.epoch(),
vec![gossip_blob.into_inner()],
&self.log,
)
}

Expand All @@ -279,8 +290,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
.collect::<Vec<_>>();

self.availability_cache
.put_kzg_verified_data_columns(block_root, epoch, custody_columns)
self.availability_cache.put_kzg_verified_data_columns(
block_root,
epoch,
custody_columns,
&self.log,
)
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
Expand All @@ -290,7 +305,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
executed_block: AvailabilityPendingExecutedBlock<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
self.availability_cache
.put_pending_executed_block(executed_block)
.put_pending_executed_block(executed_block, &self.log)
}

pub fn remove_pending_components(&self, block_root: Hash256) {
Expand Down Expand Up @@ -514,14 +529,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn reconstruct_data_columns(
&self,
block_root: &Hash256,
) -> Result<Option<AvailabilityAndReconstructedColumns<T::EthSpec>>, AvailabilityCheckError>
{
let Some(pending_components) = self
) -> Result<DataColumnReconstructionResult<T::EthSpec>, AvailabilityCheckError> {
let pending_components = match self
.availability_cache
.check_and_set_reconstruction_started(block_root)
else {
// Reconstruction not required or already in progress
return Ok(None);
{
ReconstructColumnsDecision::Yes(pending_components) => pending_components,
ReconstructColumnsDecision::No(reason) => {
return Ok(DataColumnReconstructionResult::NotRequired(reason));
}
};

metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS);
Expand All @@ -547,24 +563,23 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

// Check indices from cache again to make sure we don't publish components we've already received.
let Some(existing_column_indices) = self.cached_data_column_indexes(block_root) else {
// If block is already imported (no longer in cache), abort publishing data columns
// TODO(das) This assumes only supernodes do reconstructions (i.e. custody
// requirement = all columns). This behaviour is likely to change in the future when we
// get to 2D PeerDAS.
return Ok(None);
return Ok(DataColumnReconstructionResult::RecoveredColumnsNotImported(
"block already imported",
));
};

let data_columns_to_publish = all_data_columns
.into_iter()
.filter(|d| !existing_column_indices.contains(&d.index()))
.collect::<Vec<_>>();

// Return if there's no data columns to publish.
let Some(slot) = data_columns_to_publish
.first()
.map(|d| d.as_data_column().slot())
else {
return Ok(None);
return Ok(DataColumnReconstructionResult::RecoveredColumnsNotImported(
"No new columns to import and publish",
));
};

metrics::stop_timer(timer);
Expand All @@ -584,17 +599,17 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
*block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
data_columns_to_publish.clone(),
&self.log,
)
.map(|availability| {
(
DataColumnReconstructionResult::Success((
availability,
data_columns_to_publish
.into_iter()
.map(|d| d.clone_arc())
.collect::<Vec<_>>(),
)
))
})
.map(Some)
}
}

Expand Down
Loading

0 comments on commit 0e6eaa2

Please sign in to comment.