Skip to content

Commit

Permalink
Send import results to sync after reconstruction. Add more logging an…
Browse files Browse the repository at this point in the history
…d metrics.
  • Loading branch information
jimmygchen committed Jul 1, 2024
1 parent 4b30ebe commit 7d2d826
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ use crate::block_verification_types::{
};
use crate::data_availability_checker::{Availability, AvailabilityCheckError};
use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn};
use crate::metrics::{
KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTURCTION_FAILURES,
};
use crate::store::{DBColumn, KeyValueStore};
use crate::{metrics, BeaconChainTypes};
use kzg::Kzg;
use lru::LruCache;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use slog::{debug, trace, Logger};
use slog::{debug, error, trace, Logger};
use ssz::{Decode, Encode};
use ssz_types::{FixedVector, VariableList};
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -362,6 +365,10 @@ impl<E: EthSpec> PendingComponents<E> {
}

/// Mark reconstruction as started for this `PendingComponent`.
///
/// NOTE: currently this value never reverts to false once it's set here. This means
/// reconstruction will only be attempted once. This is intentional because currently
/// reconstruction could only fail due to code errors or kzg errors, which shouldn't be retried.
pub fn reconstruction_started(&mut self) {
self.reconstruction_started = true;
}
Expand Down Expand Up @@ -876,7 +883,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {

if should_reconstruct {
pending_components.reconstruction_started();

metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS);
let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME);

// Will only return an error if:
Expand All @@ -886,7 +893,16 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
kzg,
pending_components.verified_data_columns.as_slice(),
&self.spec,
)?;
)
.inspect_err(|e| {
error!(
self.log,
"Error reconstructing data columns";
"block_root" => ?block_root,
"error" => ?e
);
metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTURCTION_FAILURES);
})?;

// Check indices from cache again to make sure we don't publish components we've already received.
let Some(existing_column_indices) =
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,10 @@ lazy_static! {
"data_availability_reconstructed_columns_total",
"Total count of reconstructed columns"
);
pub static ref KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS : Result<IntCounter> =
try_create_int_counter("kzg_data_column_reconstruction_attempts", "Count of times data column reconstruction has been attempted");
pub static ref KZG_DATA_COLUMN_RECONSTURCTION_FAILURES : Result<IntCounter> =
try_create_int_counter("kzg_data_column_reconstruction_failures", "Count of times data column reconstruction has failed");

/*
* light_client server metrics
Expand Down
18 changes: 14 additions & 4 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,11 +781,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Attempt to reconstruct all data columns if the following conditions satisfies:
/// - Our custody requirement is all columns
/// - We >= 50% of columns, but not all columns
async fn attempt_data_column_reconstruction(&self, block_root: Hash256) {
///
/// Returns `Some(AvailabilityProcessingStatus)` if reconstruction is successfully performed,
/// otherwise returns `None`.
async fn attempt_data_column_reconstruction(
&self,
block_root: Hash256,
) -> Option<AvailabilityProcessingStatus> {
let result = self.chain.reconstruct_data_columns(block_root).await;
match result {
Ok(Some((availability, data_columns_to_publish))) => {
match availability {
Ok(Some((availability_processing_status, data_columns_to_publish))) => {
match &availability_processing_status {
AvailabilityProcessingStatus::Imported(hash) => {
debug!(
self.log,
Expand Down Expand Up @@ -817,20 +823,24 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
.collect(),
});

Some(availability_processing_status)
}
Ok(None) => {
debug!(
self.log,
"Reconstruction not required for block";
"block_hash" => %block_root,
)
);
None
}
Err(e) => {
error!(
self.log,
"Error during data column reconstruction";
"error" => ?e
);
None
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
_seen_timestamp: Duration,
process_type: BlockProcessType,
) {
let result = self
let mut result = self
.chain
.process_rpc_custody_columns(block_root, custody_columns)
.await;
Expand All @@ -352,7 +352,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
// Attempt reconstruction here before notifying sync, to avoid sending out more requests
// that we may no longer need.
self.attempt_data_column_reconstruction(block_root).await;
if let Some(availability) =
self.attempt_data_column_reconstruction(block_root).await
{
result = Ok(availability)
}
}
},
Err(BlockError::BlockIsAlreadyKnown(_)) => {
Expand Down

0 comments on commit 7d2d826

Please sign in to comment.