diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index caf48ee2cf..2ca79d92ac 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -22,6 +22,7 @@ pub use crate::canonical_head::CanonicalHead; use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, + DataColumnReconstructionResult, }; use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; @@ -3166,7 +3167,8 @@ impl BeaconChain { } let data_availability_checker = self.data_availability_checker.clone(); - let Some((availability, data_columns_to_publish)) = self + + let result = self .task_executor .spawn_blocking_handle( move || data_availability_checker.reconstruct_data_columns(&block_root), @@ -3174,22 +3176,33 @@ impl BeaconChain { ) .ok_or(BeaconChainError::RuntimeShutdown)? .await - .map_err(BeaconChainError::TokioJoin)?? - else { - return Ok(None); - }; + .map_err(BeaconChainError::TokioJoin)??; - let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { - return Ok(None); - }; + match result { + DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => { + let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { + // This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success. + return Ok(None); + }; - let r = self - .process_availability(slot, availability, || Ok(())) - .await; - self.remove_notified(&block_root, r) - .map(|availability_processing_status| { - Some((availability_processing_status, data_columns_to_publish)) - }) + let r = self + .process_availability(slot, availability, || Ok(())) + .await; + self.remove_notified(&block_root, r) + .map(|availability_processing_status| { + Some((availability_processing_status, data_columns_to_publish)) + }) + } + DataColumnReconstructionResult::NotRequired(reason) + | DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => { + // We use metric here because logging this would be *very* noisy. + metrics::inc_counter_vec( + &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, + &[reason], + ); + Ok(None) + } + } } /// Remove any block components from the *processing cache* if we no longer require them. If the diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 6b1a1d9e08..29e613da4e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -2,7 +2,9 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, Kzg use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, }; -use crate::data_availability_checker::overflow_lru_cache::DataAvailabilityCheckerInner; +use crate::data_availability_checker::overflow_lru_cache::{ + DataAvailabilityCheckerInner, ReconstructColumnsDecision, +}; use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore}; use kzg::Kzg; use slog::{debug, error, Logger}; @@ -77,6 +79,13 @@ pub struct DataAvailabilityChecker { pub type AvailabilityAndReconstructedColumns = (Availability, DataColumnSidecarList); +#[derive(Debug)] +pub enum DataColumnReconstructionResult { + Success(AvailabilityAndReconstructedColumns), + NotRequired(&'static str), + RecoveredColumnsNotImported(&'static str), +} + /// This type is returned after adding a block / blob to the `DataAvailabilityChecker`. /// /// Indicates if the block is fully `Available` or if we need blobs or blocks @@ -211,7 +220,7 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::InvalidBlobs)?; self.availability_cache - .put_kzg_verified_blobs(block_root, epoch, verified_blobs) + .put_kzg_verified_blobs(block_root, epoch, verified_blobs, &self.log) } /// Put a list of custody columns received via RPC into the availability cache. This performs KZG @@ -241,6 +250,7 @@ impl DataAvailabilityChecker { block_root, epoch, verified_custody_columns, + &self.log, ) } @@ -257,6 +267,7 @@ impl DataAvailabilityChecker { gossip_blob.block_root(), gossip_blob.epoch(), vec![gossip_blob.into_inner()], + &self.log, ) } @@ -279,8 +290,12 @@ impl DataAvailabilityChecker { .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) .collect::>(); - self.availability_cache - .put_kzg_verified_data_columns(block_root, epoch, custody_columns) + self.availability_cache.put_kzg_verified_data_columns( + block_root, + epoch, + custody_columns, + &self.log, + ) } /// Check if we have all the blobs for a block. Returns `Availability` which has information @@ -290,7 +305,7 @@ impl DataAvailabilityChecker { executed_block: AvailabilityPendingExecutedBlock, ) -> Result, AvailabilityCheckError> { self.availability_cache - .put_pending_executed_block(executed_block) + .put_pending_executed_block(executed_block, &self.log) } pub fn remove_pending_components(&self, block_root: Hash256) { @@ -514,14 +529,15 @@ impl DataAvailabilityChecker { pub fn reconstruct_data_columns( &self, block_root: &Hash256, - ) -> Result>, AvailabilityCheckError> - { - let Some(pending_components) = self + ) -> Result, AvailabilityCheckError> { + let pending_components = match self .availability_cache .check_and_set_reconstruction_started(block_root) - else { - // Reconstruction not required or already in progress - return Ok(None); + { + ReconstructColumnsDecision::Yes(pending_components) => pending_components, + ReconstructColumnsDecision::No(reason) => { + return Ok(DataColumnReconstructionResult::NotRequired(reason)); + } }; metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS); @@ -547,11 +563,9 @@ impl DataAvailabilityChecker { // Check indices from cache again to make sure we don't publish components we've already received. let Some(existing_column_indices) = self.cached_data_column_indexes(block_root) else { - // If block is already imported (no longer in cache), abort publishing data columns - // TODO(das) This assumes only supernodes do reconstructions (i.e. custody - // requirement = all columns). This behaviour is likely to change in the future when we - // get to 2D PeerDAS. - return Ok(None); + return Ok(DataColumnReconstructionResult::RecoveredColumnsNotImported( + "block already imported", + )); }; let data_columns_to_publish = all_data_columns @@ -559,12 +573,13 @@ impl DataAvailabilityChecker { .filter(|d| !existing_column_indices.contains(&d.index())) .collect::>(); - // Return if there's no data columns to publish. let Some(slot) = data_columns_to_publish .first() .map(|d| d.as_data_column().slot()) else { - return Ok(None); + return Ok(DataColumnReconstructionResult::RecoveredColumnsNotImported( + "No new columns to import and publish", + )); }; metrics::stop_timer(timer); @@ -584,17 +599,17 @@ impl DataAvailabilityChecker { *block_root, slot.epoch(T::EthSpec::slots_per_epoch()), data_columns_to_publish.clone(), + &self.log, ) .map(|availability| { - ( + DataColumnReconstructionResult::Success(( availability, data_columns_to_publish .into_iter() .map(|d| d.clone_arc()) .collect::>(), - ) + )) }) - .map(Some) } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index cb95be6b8a..7bdd33e2ee 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -9,6 +9,7 @@ use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::BeaconChainTypes; use lru::LruCache; use parking_lot::RwLock; +use slog::{debug, Logger}; use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; use std::sync::Arc; @@ -90,7 +91,7 @@ impl PendingComponents { /// block. /// /// This corresponds to the number of commitments that are present in a block. - pub fn num_expected_blobs(&self) -> Option { + pub fn block_kzg_commitments_count(&self) -> Option { self.get_cached_block() .as_ref() .map(|b| b.get_commitments().len()) @@ -198,21 +199,61 @@ impl PendingComponents { /// /// Returns `true` if both the block exists and the number of received blobs / custody columns /// matches the number of expected blobs / custody columns. - pub fn is_available(&self, block_import_requirement: &BlockImportRequirement) -> bool { + pub fn is_available( + &self, + block_import_requirement: &BlockImportRequirement, + log: &Logger, + ) -> bool { + let block_kzg_commitments_count_opt = self.block_kzg_commitments_count(); + match block_import_requirement { - BlockImportRequirement::AllBlobs => self - .num_expected_blobs() - .map_or(false, |num_expected_blobs| { - num_expected_blobs == self.num_received_blobs() - }), + BlockImportRequirement::AllBlobs => { + let received_blobs = self.num_received_blobs(); + let expected_blobs_msg = block_kzg_commitments_count_opt + .as_ref() + .map(|num| num.to_string()) + .unwrap_or("unknown".to_string()); + + debug!(log, + "Component(s) added to data availability checker"; + "block_root" => ?self.block_root, + "received_block" => block_kzg_commitments_count_opt.is_some(), + "received_blobs" => received_blobs, + "expected_blobs" => expected_blobs_msg, + ); + + block_kzg_commitments_count_opt.map_or(false, |num_expected_blobs| { + num_expected_blobs == received_blobs + }) + } BlockImportRequirement::ColumnSampling(num_expected_columns) => { - let num_received_data_columns = self.num_received_data_columns(); // No data columns when there are 0 blobs - self.num_expected_blobs() - .map_or(false, |num_expected_blobs| { - num_expected_blobs == 0 - || *num_expected_columns == num_received_data_columns - }) + let expected_columns_opt = block_kzg_commitments_count_opt.map(|blob_count| { + if blob_count > 0 { + *num_expected_columns + } else { + 0 + } + }); + + let expected_columns_msg = expected_columns_opt + .as_ref() + .map(|num| num.to_string()) + .unwrap_or("unknown".to_string()); + + let num_received_columns = self.num_received_data_columns(); + + debug!(log, + "Component(s) added to data availability checker"; + "block_root" => ?self.block_root, + "received_block" => block_kzg_commitments_count_opt.is_some(), + "received_columns" => num_received_columns, + "expected_columns" => expected_columns_msg, + ); + + expected_columns_opt.map_or(false, |num_expected_columns| { + num_expected_columns == num_received_columns + }) } } } @@ -349,6 +390,12 @@ pub struct DataAvailabilityCheckerInner { spec: Arc, } +#[allow(clippy::large_enum_variant)] +pub(crate) enum ReconstructColumnsDecision { + Yes(PendingComponents), + No(&'static str), +} + impl DataAvailabilityCheckerInner { pub fn new( capacity: NonZeroUsize, @@ -444,6 +491,7 @@ impl DataAvailabilityCheckerInner { block_root: Hash256, epoch: Epoch, kzg_verified_blobs: I, + log: &Logger, ) -> Result, AvailabilityCheckError> { let mut fixed_blobs = FixedVector::default(); @@ -465,7 +513,7 @@ impl DataAvailabilityCheckerInner { pending_components.merge_blobs(fixed_blobs); let block_import_requirement = self.block_import_requirement(epoch)?; - if pending_components.is_available(&block_import_requirement) { + if pending_components.is_available(&block_import_requirement, log) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); @@ -486,6 +534,7 @@ impl DataAvailabilityCheckerInner { block_root: Hash256, epoch: Epoch, kzg_verified_data_columns: I, + log: &Logger, ) -> Result, AvailabilityCheckError> { let mut write_lock = self.critical.write(); @@ -500,7 +549,7 @@ impl DataAvailabilityCheckerInner { let block_import_requirement = self.block_import_requirement(epoch)?; - if pending_components.is_available(&block_import_requirement) { + if pending_components.is_available(&block_import_requirement, log) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); @@ -515,39 +564,43 @@ impl DataAvailabilityCheckerInner { /// Check whether data column reconstruction should be attempted. /// - /// If reconstruction is required, returns `Some(PendingComponents)` which contains the - /// components to be used as inputs to reconstruction, otherwise return `None`. + /// Potentially trigger reconstruction if: + /// - Our custody requirement is all columns (supernode), and we haven't got all columns + /// - We have >= 50% of columns, but not all columns + /// - Reconstruction hasn't been started for the block + /// + /// If reconstruction is required, returns `PendingComponents` which contains the + /// components to be used as inputs to reconstruction, otherwise returns a `reason`. pub fn check_and_set_reconstruction_started( &self, block_root: &Hash256, - ) -> Option> { + ) -> ReconstructColumnsDecision { let mut write_lock = self.critical.write(); let Some(pending_components) = write_lock.get_mut(block_root) else { // Block may have been imported as it does not exist in availability cache. - return None; + return ReconstructColumnsDecision::No("block already imported"); }; - // Potentially trigger reconstruction if: - // - Our custody requirement is all columns, and we haven't got all columns - // - We have >= 50% of columns, but not all columns - // - Reconstruction hasn't been started for the block - let should_reconstruct = { - let received_column_count = pending_components.verified_data_columns.len(); - // If we're sampling all columns, it means we must be custodying all columns. - let custody_column_count = self.sampling_column_count(); - let total_column_count = self.spec.number_of_columns; - custody_column_count == total_column_count - && received_column_count < total_column_count - && received_column_count >= total_column_count / 2 - && !pending_components.reconstruction_started - }; + // If we're sampling all columns, it means we must be custodying all columns. + let custody_column_count = self.sampling_column_count(); + let total_column_count = self.spec.number_of_columns; + let received_column_count = pending_components.verified_data_columns.len(); - if should_reconstruct { - pending_components.reconstruction_started = true; - Some(pending_components.clone()) - } else { - None + if pending_components.reconstruction_started { + return ReconstructColumnsDecision::No("already started"); + } + if custody_column_count != total_column_count { + return ReconstructColumnsDecision::No("not required for full node"); } + if received_column_count == self.spec.number_of_columns { + return ReconstructColumnsDecision::No("all columns received"); + } + if received_column_count < total_column_count / 2 { + return ReconstructColumnsDecision::No("not enough columns"); + } + + pending_components.reconstruction_started = true; + ReconstructColumnsDecision::Yes(pending_components.clone()) } /// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`. @@ -565,6 +618,7 @@ impl DataAvailabilityCheckerInner { pub fn put_pending_executed_block( &self, executed_block: AvailabilityPendingExecutedBlock, + log: &Logger, ) -> Result, AvailabilityCheckError> { let mut write_lock = self.critical.write(); let block_root = executed_block.import_data.block_root; @@ -586,7 +640,7 @@ impl DataAvailabilityCheckerInner { // Check if we have all components and entire set is consistent. let block_import_requirement = self.block_import_requirement(epoch)?; - if pending_components.is_available(&block_import_requirement) { + if pending_components.is_available(&block_import_requirement, log) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); @@ -884,7 +938,7 @@ mod test { ); assert!(cache.critical.read().is_empty(), "cache should be empty"); let availability = cache - .put_pending_executed_block(pending_block) + .put_pending_executed_block(pending_block, harness.logger()) .expect("should put block"); if blobs_expected == 0 { assert!( @@ -923,7 +977,7 @@ mod test { for (blob_index, gossip_blob) in blobs.into_iter().enumerate() { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone()) + .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone(), harness.logger()) .expect("should put blob"); if blob_index == blobs_expected - 1 { assert!(matches!(availability, Availability::Available(_))); @@ -950,7 +1004,7 @@ mod test { for gossip_blob in blobs { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone()) + .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone(), harness.logger()) .expect("should put blob"); assert_eq!( availability, @@ -960,7 +1014,7 @@ mod test { assert_eq!(cache.critical.read().len(), 1); } let availability = cache - .put_pending_executed_block(pending_block) + .put_pending_executed_block(pending_block, harness.logger()) .expect("should put block"); assert!( matches!(availability, Availability::Available(_)), @@ -1028,7 +1082,7 @@ mod test { // put the block in the cache let availability = cache - .put_pending_executed_block(pending_block) + .put_pending_executed_block(pending_block, harness.logger()) .expect("should put block"); // grab the diet block from the cache for later testing diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 55327d0fb0..0b5608f084 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1903,6 +1903,15 @@ pub static KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES: LazyLock> ) }); +pub static KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL: LazyLock> = + LazyLock::new(|| { + try_create_int_counter_vec( + "kzg_data_column_reconstruction_incomplete_total", + "Count of times data column reconstruction attempts did not result in an import", + &["reason"], + ) + }); + /* * light_client server metrics */ diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 1b995dcadf..295d6491a3 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -899,6 +899,7 @@ impl NetworkBeaconProcessor { Some(availability_processing_status) } Ok(None) => { + // reason is tracked via the `KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL` metric trace!( self.log, "Reconstruction not required for block";