Skip to content

Commit

Permalink
Add sync lookup custody request state (sigp#6257)
Browse files Browse the repository at this point in the history
* Add sync lookup custody request state

* Review PR

* clippy

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into peerdas-network-lookup
  • Loading branch information
dapplion authored and AgeManning committed Sep 3, 2024
1 parent 87a97b1 commit 6f75369
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 34 deletions.
18 changes: 18 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6852,6 +6852,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.data_availability_checker.data_availability_boundary()
}

/// Returns true if epoch is within the data availability boundary
pub fn da_check_required_for_epoch(&self, epoch: Epoch) -> bool {
self.data_availability_checker
.da_check_required_for_epoch(epoch)
}

/// Returns true if we should fetch blobs for this block
pub fn should_fetch_blobs(&self, block_epoch: Epoch) -> bool {
self.da_check_required_for_epoch(block_epoch)
&& !self.spec.is_peer_das_enabled_for_epoch(block_epoch)
}

/// Returns true if we should fetch custody columns for this block
pub fn should_fetch_custody_columns(&self, block_epoch: Epoch) -> bool {
self.da_check_required_for_epoch(block_epoch)
&& self.spec.is_peer_das_enabled_for_epoch(block_epoch)
}

pub fn logger(&self) -> &Logger {
&self.log
}
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

/// Return the set of imported custody column indexes for `block_root`. Returns None if there is
/// no block component for `block_root`.
pub fn imported_custody_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.map(|components| components.get_cached_data_columns_indices())
})
}

/// Get a blob from the availability cache.
pub fn get_blob(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use ssz_types::{FixedVector, VariableList};
use std::num::NonZeroUsize;
use std::sync::Arc;
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
use types::{BlobSidecar, ChainSpec, ColumnIndex, Epoch, EthSpec, Hash256, SignedBeaconBlock};

/// This represents the components of a partially available block
///
Expand Down Expand Up @@ -108,6 +108,14 @@ impl<E: EthSpec> PendingComponents<E> {
self.verified_data_columns.len()
}

/// Returns the indices of cached custody columns
pub fn get_cached_data_columns_indices(&self) -> Vec<ColumnIndex> {
self.verified_data_columns
.iter()
.map(|d| d.index())
.collect()
}

/// Inserts a block into the cache.
pub fn insert_block(&mut self, block: DietAvailabilityPendingExecutedBlock<E>) {
*self.get_cached_block_mut() = Some(block)
Expand Down
9 changes: 8 additions & 1 deletion beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::EnrExt;
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use parking_lot::RwLock;
use std::collections::HashSet;
use types::EthSpec;
use types::{ChainSpec, ColumnIndex, EthSpec};

pub struct NetworkGlobals<E: EthSpec> {
/// The current local ENR.
Expand Down Expand Up @@ -110,6 +110,13 @@ impl<E: EthSpec> NetworkGlobals<E> {
std::mem::replace(&mut *self.sync_state.write(), new_state)
}

/// Compute custody data columns the node is assigned to custody.
pub fn custody_columns(&self, _spec: &ChainSpec) -> Vec<ColumnIndex> {
let _enr = self.local_enr();
//TODO(das): implement ENR changes
vec![]
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(trusted_peers: Vec<PeerId>, log: &slog::Logger) -> NetworkGlobals<E> {
use crate::CombinedKeyExt;
Expand Down
66 changes: 54 additions & 12 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId};
use crate::sync::block_lookups::{
BlobRequestState, BlockRequestState, CustodyRequestState, PeerId,
};
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use lighthouse_network::service::api_types::Id;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::SignedBeaconBlock;
use types::{DataColumnSidecarList, SignedBeaconBlock};

use super::single_block_lookup::DownloadResult;
use super::SingleLookupId;
Expand All @@ -17,6 +19,7 @@ use super::SingleLookupId;
pub enum ResponseType {
Block,
Blob,
CustodyColumn,
}

/// This trait unifies common single block lookup functionality across blocks and blobs. This
Expand All @@ -38,7 +41,7 @@ pub trait RequestState<T: BeaconChainTypes> {
&self,
id: Id,
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError>;

Expand Down Expand Up @@ -73,7 +76,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
&self,
id: SingleLookupId,
peer_id: PeerId,
_: Option<usize>,
_: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root)
Expand Down Expand Up @@ -121,16 +124,11 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
&self,
id: Id,
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(
id,
peer_id,
self.block_root,
downloaded_block_expected_blobs,
)
.map_err(LookupRequestError::SendFailedNetwork)
cx.blob_lookup_request(id, peer_id, self.block_root, downloaded_block)
.map_err(LookupRequestError::SendFailedNetwork)
}

fn send_for_processing(
Expand Down Expand Up @@ -161,3 +159,47 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
&mut self.state
}
}

impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
type VerifiedResponseType = DataColumnSidecarList<T::EthSpec>;

fn make_request(
&self,
id: Id,
// TODO(das): consider selecting peers that have custody but are in this set
_peer_id: PeerId,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.custody_lookup_request(id, self.block_root, downloaded_block)
.map_err(LookupRequestError::SendFailedNetwork)
}

fn send_for_processing(
id: Id,
download_result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
let DownloadResult {
value,
block_root,
seen_timestamp,
..
} = download_result;
cx.send_custody_columns_for_processing(id, block_root, value, seen_timestamp)
.map_err(LookupRequestError::SendFailedProcessor)
}

fn response_type() -> ResponseType {
ResponseType::CustodyColumn
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.custody_request_state
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
}
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
&mut self.state
}
}
5 changes: 3 additions & 2 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use fnv::FnvHashMap;
use lighthouse_network::service::api_types::SingleLookupReqId;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::{BlobRequestState, BlockRequestState};
pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState};
use slog::{debug, error, warn, Logger};
use std::collections::hash_map::Entry;
use std::sync::Arc;
Expand Down Expand Up @@ -527,7 +527,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// if both components have been processed.
request_state.on_processing_success()?;

if lookup.both_components_processed() {
if lookup.all_components_processed() {
// We don't request for other block components until being sure that the block has
// data. If we request blobs / columns to a peer we are sure those must exist.
// Therefore if all components are processed and we still receive `MissingComponents`
Expand Down Expand Up @@ -599,6 +599,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
match R::response_type() {
ResponseType::Block => "lookup_block_processing_failure",
ResponseType::Blob => "lookup_blobs_processing_failure",
ResponseType::CustodyColumn => "lookup_custody_processing_failure",
},
);

Expand Down
49 changes: 35 additions & 14 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::time::{Duration, Instant};
use store::Hash256;
use strum::IntoStaticStr;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{EthSpec, SignedBeaconBlock};
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock};

// Dedicated enum for LookupResult to force its usage
#[must_use = "LookupResult must be handled with on_lookup_result"]
Expand Down Expand Up @@ -63,6 +63,7 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id,
pub block_request_state: BlockRequestState<T::EthSpec>,
pub blob_request_state: BlobRequestState<T::EthSpec>,
pub custody_request_state: CustodyRequestState<T::EthSpec>,
/// Peers that claim to have imported this set of block components
#[derivative(Debug(format_with = "fmt_peer_set_as_len"))]
peers: HashSet<PeerId>,
Expand All @@ -82,6 +83,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
id,
block_request_state: BlockRequestState::new(requested_block_root),
blob_request_state: BlobRequestState::new(requested_block_root),
custody_request_state: CustodyRequestState::new(requested_block_root),
peers: HashSet::from_iter(peers.iter().copied()),
block_root: requested_block_root,
awaiting_parent,
Expand Down Expand Up @@ -138,16 +140,18 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}

/// Returns true if the block has already been downloaded.
pub fn both_components_processed(&self) -> bool {
pub fn all_components_processed(&self) -> bool {
self.block_request_state.state.is_processed()
&& self.blob_request_state.state.is_processed()
&& self.custody_request_state.state.is_processed()
}

/// Returns true if this request is expecting some event to make progress
pub fn is_awaiting_event(&self) -> bool {
self.awaiting_parent.is_some()
|| self.block_request_state.state.is_awaiting_event()
|| self.blob_request_state.state.is_awaiting_event()
|| self.custody_request_state.state.is_awaiting_event()
}

/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
Expand All @@ -159,13 +163,12 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// TODO: Check what's necessary to download, specially for blobs
self.continue_request::<BlockRequestState<T::EthSpec>>(cx)?;
self.continue_request::<BlobRequestState<T::EthSpec>>(cx)?;
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx)?;

// If all components of this lookup are already processed, there will be no future events
// that can make progress so it must be dropped. Consider the lookup completed.
// This case can happen if we receive the components from gossip during a retry.
if self.block_request_state.state.is_processed()
&& self.blob_request_state.state.is_processed()
{
if self.all_components_processed() {
Ok(LookupResult::Completed)
} else {
Ok(LookupResult::Pending)
Expand All @@ -179,11 +182,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
) -> Result<(), LookupRequestError> {
let id = self.id;
let awaiting_parent = self.awaiting_parent.is_some();
let downloaded_block_expected_blobs = self
let downloaded_block = self
.block_request_state
.state
.peek_downloaded_data()
.map(|block| block.num_expected_blobs());
.cloned();
let block_is_processed = self.block_request_state.state.is_processed();
let request = R::request_state_mut(self);

Expand All @@ -210,7 +213,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
};

let request = R::request_state_mut(self);
match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
match request.make_request(id, peer_id, downloaded_block, cx)? {
LookupRequestResult::RequestSent(req_id) => {
// Lookup sync event safety: If make_request returns `RequestSent`, we are
// guaranteed that `BlockLookups::on_download_response` will be called exactly
Expand Down Expand Up @@ -289,6 +292,24 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}

/// The state of the block request component of a `SingleBlockLookup`.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct BlockRequestState<E: EthSpec> {
#[derivative(Debug = "ignore")]
pub requested_block_root: Hash256,
pub state: SingleLookupRequestState<Arc<SignedBeaconBlock<E>>>,
}

impl<E: EthSpec> BlockRequestState<E> {
pub fn new(block_root: Hash256) -> Self {
Self {
requested_block_root: block_root,
state: SingleLookupRequestState::new(),
}
}
}

/// The state of the blob request component of a `SingleBlockLookup`.
#[derive(Derivative)]
#[derivative(Debug)]
Expand All @@ -307,19 +328,19 @@ impl<E: EthSpec> BlobRequestState<E> {
}
}

/// The state of the block request component of a `SingleBlockLookup`.
/// The state of the custody request component of a `SingleBlockLookup`.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct BlockRequestState<E: EthSpec> {
pub struct CustodyRequestState<E: EthSpec> {
#[derivative(Debug = "ignore")]
pub requested_block_root: Hash256,
pub state: SingleLookupRequestState<Arc<SignedBeaconBlock<E>>>,
pub block_root: Hash256,
pub state: SingleLookupRequestState<DataColumnSidecarList<E>>,
}

impl<E: EthSpec> BlockRequestState<E> {
impl<E: EthSpec> CustodyRequestState<E> {
pub fn new(block_root: Hash256) -> Self {
Self {
requested_block_root: block_root,
block_root,
state: SingleLookupRequestState::new(),
}
}
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,8 @@ impl TestRig {
(ev.work_type() == beacon_processor::RPC_BLOBS).then_some(())
})
.unwrap_or_else(|e| panic!("Expected blobs work event: {e}")),
// TODO(das): remove todo when adding tests for custody sync lookup
ResponseType::CustodyColumn => todo!(),
}
}

Expand Down
Loading

0 comments on commit 6f75369

Please sign in to comment.