Skip to content

Commit

Permalink
Fix race condition by making check and mutation atomic as suggested b…
Browse files Browse the repository at this point in the history
…y Lion. Also added error handling to reconstruction failure.
  • Loading branch information
jimmygchen committed Oct 10, 2024
1 parent 430870b commit b0c3379
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 41 deletions.
35 changes: 5 additions & 30 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, Kzg
use crate::block_verification_types::{
AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock,
};
use crate::data_availability_checker::overflow_lru_cache::{
DataAvailabilityCheckerInner, PendingComponents,
};
use crate::data_availability_checker::overflow_lru_cache::DataAvailabilityCheckerInner;
use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore};
use kzg::Kzg;
use slog::{debug, error, Logger};
Expand Down Expand Up @@ -518,24 +516,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root: &Hash256,
) -> Result<Option<AvailabilityAndReconstructedColumns<T::EthSpec>>, AvailabilityCheckError>
{
// Clone the pending components, so we don't hold the read lock during reconstruction
let Some(pending_components) = self
.availability_cache
.peek_pending_components(block_root, |pending_components_opt| {
pending_components_opt.cloned()
})
.check_and_set_reconstruction_started(block_root)
else {
// Block may have been imported as it does not exist in availability cache.
// Reconstruction not required or already in progress
return Ok(None);
};

if !self.should_reconstruct(&pending_components) {
return Ok(None);
}

self.availability_cache
.set_reconstruction_started(block_root);

metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS);
let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME);

Expand All @@ -551,6 +539,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
"block_root" => ?block_root,
"error" => ?e
);
self.availability_cache
.handle_reconstruction_failure(block_root);
metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES);
AvailabilityCheckError::ReconstructColumnsError(e)
})?;
Expand Down Expand Up @@ -606,21 +596,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
.map(Some)
}

/// Potentially trigger reconstruction if:
/// - Our custody requirement is all columns, and we haven't got all columns
/// - We have >= 50% of columns, but not all columns
/// - Reconstruction hasn't been started for the block
fn should_reconstruct(&self, pending_components: &PendingComponents<T::EthSpec>) -> bool {
let received_column_count = pending_components.verified_data_columns.len();
// If we're sampling all columns, it means we must be custodying all columns.
let custody_column_count = self.availability_cache.sampling_column_count();
let total_column_count = self.spec.number_of_columns;
custody_column_count == total_column_count
&& received_column_count < total_column_count
&& received_column_count >= total_column_count / 2
&& !pending_components.reconstruction_started
}
}

/// Helper struct to group data availability checker metrics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,6 @@ impl<E: EthSpec> PendingComponents<E> {
)))
}

/// Mark reconstruction as started for this `PendingComponent`.
///
/// NOTE: currently this value never reverts to false once it's set here. This means
/// reconstruction will only be attempted once. This is intentional because currently
/// reconstruction could only fail due to code errors or kzg errors, which shouldn't be retried.
pub fn reconstruction_started(&mut self) {
self.reconstruction_started = true;
}

/// Returns the epoch of the block if it is cached, otherwise returns the epoch of the first blob.
pub fn epoch(&self) -> Option<Epoch> {
self.executed_block
Expand Down Expand Up @@ -522,9 +513,50 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
}

pub fn set_reconstruction_started(&self, block_root: &Hash256) {
/// Check whether data column reconstruction should be attempted.
///
/// If reconstruction is required, returns `Some(PendingComponents)` which contains the
/// components to be used as inputs to reconstruction, otherwise return `None`.
pub fn check_and_set_reconstruction_started(
&self,
block_root: &Hash256,
) -> Option<PendingComponents<T::EthSpec>> {
let mut write_lock = self.critical.write();
let Some(pending_components) = write_lock.get_mut(block_root) else {
// Block may have been imported as it does not exist in availability cache.
return None;
};

// Potentially trigger reconstruction if:
// - Our custody requirement is all columns, and we haven't got all columns
// - We have >= 50% of columns, but not all columns
// - Reconstruction hasn't been started for the block
let should_reconstruct = {
let received_column_count = pending_components.verified_data_columns.len();
// If we're sampling all columns, it means we must be custodying all columns.
let custody_column_count = self.sampling_column_count();
let total_column_count = self.spec.number_of_columns;
custody_column_count == total_column_count
&& received_column_count < total_column_count
&& received_column_count >= total_column_count / 2
&& !pending_components.reconstruction_started
};

if should_reconstruct {
pending_components.reconstruction_started = true;
Some(pending_components.clone())
} else {
None
}
}

/// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`.
/// In this case, we remove all data columns in `PendingComponents`, reset reconstruction
/// status so that we can attempt to retrieve columns from peers again.
pub fn handle_reconstruction_failure(&self, block_root: &Hash256) {
if let Some(pending_components_mut) = self.critical.write().get_mut(block_root) {
pending_components_mut.reconstruction_started();
pending_components_mut.verified_data_columns = vec![];
pending_components_mut.reconstruction_started = false;
}
}

Expand Down

0 comments on commit b0c3379

Please sign in to comment.