From f2abdd4b72fed38170d66b10f2a544381c9fa480 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 16 Aug 2024 12:24:18 +0200 Subject: [PATCH] Add data columns by root sync request --- .../src/service/api_types.rs | 6 + beacon_node/network/src/sync/manager.rs | 75 ++++++++--- .../network/src/sync/network_context.rs | 124 ++++++++++++++++-- .../src/sync/network_context/requests.rs | 10 +- .../requests/data_columns_by_root.rs | 104 +++++++++++++++ 5 files changed, 287 insertions(+), 32 deletions(-) create mode 100644 beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 11df591ae4c..756f4bd1326 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -212,3 +212,9 @@ impl slog::Value for RequestId { } } } + +impl std::fmt::Display for DataColumnsByRootRequestId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index e8e6896cd69..e494f1f94fc 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -53,7 +53,9 @@ use beacon_chain::{ }; use futures::StreamExt; use lighthouse_network::rpc::RPCError; -use lighthouse_network::service::api_types::{Id, SingleLookupReqId, SyncRequestId}; +use lighthouse_network::service::api_types::{ + DataColumnsByRootRequestId, Id, SingleLookupReqId, SyncRequestId, +}; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; use lighthouse_network::{PeerAction, PeerId}; @@ -345,9 +347,13 @@ impl SyncManager { SyncRequestId::SingleBlob { id } => { self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) } - SyncRequestId::DataColumnsByRoot { .. } => { - // TODO(das) - } + SyncRequestId::DataColumnsByRoot(req_id, requester) => self + .on_data_columns_by_root_response( + req_id, + requester, + peer_id, + RpcEvent::RPCError(error), + ), SyncRequestId::RangeBlockAndBlobs { id } => { if let Some(sender_id) = self.network.range_request_failed(id) { match sender_id { @@ -860,15 +866,12 @@ impl SyncManager { None => RpcEvent::StreamTermination, }, ), - SyncRequestId::SingleBlob { .. } => { - crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id ); - } - SyncRequestId::DataColumnsByRoot { .. } => { - // TODO(das) - } SyncRequestId::RangeBlockAndBlobs { id } => { self.range_block_and_blobs_response(id, peer_id, block.into()) } + _ => { + crit!(self.log, "bad request id for block"; "peer_id" => %peer_id ); + } } } @@ -897,9 +900,6 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - SyncRequestId::SingleBlock { .. } => { - crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id ); - } SyncRequestId::SingleBlob { id } => self.on_single_blob_response( id, peer_id, @@ -908,23 +908,41 @@ impl SyncManager { None => RpcEvent::StreamTermination, }, ), - SyncRequestId::DataColumnsByRoot { .. } => { - // TODO(das) - } SyncRequestId::RangeBlockAndBlobs { id } => { self.range_block_and_blobs_response(id, peer_id, blob.into()) } + _ => { + crit!(self.log, "bad request id for blob"; "peer_id" => %peer_id); + } } } fn rpc_data_column_received( &mut self, - _request_id: SyncRequestId, - _peer_id: PeerId, - _data_column: Option>>, - _seen_timestamp: Duration, + request_id: SyncRequestId, + peer_id: PeerId, + data_column: Option>>, + seen_timestamp: Duration, ) { - // TODO(das): implement handler + match request_id { + SyncRequestId::DataColumnsByRoot(req_id, requester) => { + self.on_data_columns_by_root_response( + req_id, + requester, + peer_id, + match data_column { + Some(data_column) => RpcEvent::Response(data_column, seen_timestamp), + None => RpcEvent::StreamTermination, + }, + ); + } + SyncRequestId::RangeBlockAndBlobs { id: _ } => { + // TODO(das): implement custody range sync + } + _ => { + crit!(self.log, "bad request id for data_column"; "peer_id" => %peer_id); + } + } } fn on_single_blob_response( @@ -944,6 +962,21 @@ impl SyncManager { } } + fn on_data_columns_by_root_response( + &mut self, + req_id: DataColumnsByRootRequestId, + _requester: SingleLookupReqId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) { + if let Some(_resp) = self + .network + .on_data_columns_by_root_response(req_id, peer_id, rpc_event) + { + // TODO(das): pass data_columns_by_root result to consumer + } + } + /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. fn range_block_and_blobs_response( diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 7bcc8ae9f27..fa9159f7f8e 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -16,16 +16,21 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineStat use fnv::FnvHashMap; use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; -use lighthouse_network::service::api_types::{AppRequestId, Id, SingleLookupReqId, SyncRequestId}; +use lighthouse_network::service::api_types::{ + AppRequestId, DataColumnsByRootRequestId, Id, SingleLookupReqId, SyncRequestId, +}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; pub use requests::LookupVerifyError; +use requests::{ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest}; use slog::{debug, error, trace, warn}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock}; +use types::{ + BlobSidecar, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock, +}; mod requests; @@ -96,10 +101,10 @@ impl From for RpcResponseError { /// Sequential ID that uniquely identifies ReqResp outgoing requests pub type ReqId = u32; -pub enum LookupRequestResult { +pub enum LookupRequestResult { /// A request is sent. Sync MUST receive an event from the network in the future for either: /// completed response or failed request - RequestSent(ReqId), + RequestSent(I), /// No request is sent, and no further action is necessary to consider this request completed NoRequestNeeded, /// No request is sent, but the request is not completed. Sync MUST receive some future event @@ -123,6 +128,10 @@ pub struct SyncNetworkContext { /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. blobs_by_root_requests: FnvHashMap>, + /// A mapping of active DataColumnsByRoot requests + data_columns_by_root_requests: + FnvHashMap>, + /// BlocksByRange requests paired with BlobsByRange range_blocks_and_blobs_requests: FnvHashMap)>, @@ -171,6 +180,7 @@ impl SyncNetworkContext { request_id: 1, blocks_by_root_requests: <_>::default(), blobs_by_root_requests: <_>::default(), + data_columns_by_root_requests: <_>::default(), range_blocks_and_blobs_requests: FnvHashMap::default(), network_beacon_processor, chain, @@ -211,10 +221,21 @@ impl SyncNetworkContext { None } }); + let failed_data_column_by_root_ids = + self.data_columns_by_root_requests + .iter() + .filter_map(|(req_id, request)| { + if request.peer_id == *peer_id { + Some(SyncRequestId::DataColumnsByRoot(*req_id, request.requester)) + } else { + None + } + }); failed_range_ids .chain(failed_block_ids) .chain(failed_blob_ids) + .chain(failed_data_column_by_root_ids) .collect() } @@ -529,6 +550,43 @@ impl SyncNetworkContext { Ok(LookupRequestResult::RequestSent(req_id)) } + /// Request to send a single `data_columns_by_root` request to the network. + pub fn data_column_lookup_request( + &mut self, + requester: SingleLookupReqId, + peer_id: PeerId, + request: DataColumnsByRootSingleBlockRequest, + ) -> Result, &'static str> { + let req_id = DataColumnsByRootRequestId(self.next_id()); + debug!( + self.log, + "Sending DataColumnsByRoot Request"; + "method" => "DataColumnsByRoot", + "block_root" => ?request.block_root, + "indices" => ?request.indices, + "peer" => %peer_id, + "requester" => ?requester, + "req_id" => %req_id, + ); + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: Request::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)), + request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id, requester)), + })?; + + self.data_columns_by_root_requests.insert( + req_id, + ActiveDataColumnsByRootRequest::new(request, peer_id, requester), + ); + + Ok(LookupRequestResult::RequestSent(req_id)) + } + + /// Request to fetch all needed custody columns of a specific block. This function may not send + /// any request to the network if no columns have to be fetched based on the import state of the + /// node. A custody request is a "super request" that may trigger 0 or more `data_columns_by_root` + /// requests. pub fn custody_lookup_request( &mut self, lookup_id: SingleLookupId, @@ -707,14 +765,14 @@ impl SyncNetworkContext { &mut self, request_id: SingleLookupReqId, peer_id: PeerId, - block: RpcEvent>>, + rpc_event: RpcEvent>>, ) -> Option>>> { let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blocks_by_root"]); return None; }; - let resp = match block { + let resp = match rpc_event { RpcEvent::Response(block, seen_timestamp) => { match request.get_mut().add_response(block) { Ok(block) => Ok((block, seen_timestamp)), @@ -745,14 +803,14 @@ impl SyncNetworkContext { &mut self, request_id: SingleLookupReqId, peer_id: PeerId, - blob: RpcEvent>>, + rpc_event: RpcEvent>>, ) -> Option>> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blobs_by_root"]); return None; }; - let resp = match blob { + let resp = match rpc_event { RpcEvent::Response(blob, seen_timestamp) => { let request = request.get_mut(); match request.add_response(blob) { @@ -791,6 +849,54 @@ impl SyncNetworkContext { } } + #[allow(clippy::type_complexity)] + pub fn on_data_columns_by_root_response( + &mut self, + id: DataColumnsByRootRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>>> { + let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else { + return None; + }; + + let resp = match rpc_event { + RpcEvent::Response(data_column, seen_timestamp) => { + let request = request.get_mut(); + match request.add_response(data_column) { + Ok(Some(data_columns)) => Ok((data_columns, seen_timestamp)), + Ok(None) => return None, + Err(e) => Err((e.into(), request.resolve())), + } + } + RpcEvent::StreamTermination => match request.remove().terminate() { + Ok(_) => return None, + // (err, false = not resolved) because terminate returns Ok() if resolved + Err(e) => Err((e.into(), false)), + }, + RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())), + }; + + match resp { + Ok(resp) => Some(Ok(resp)), + // Track if this request has already returned some value downstream. Ensure that + // downstream code only receives a single Result per request. If the serving peer does + // multiple penalizable actions per request, downscore and return None. This allows to + // catch if a peer is returning more columns than requested or if the excess blobs are + // invalid. + Err((e, resolved)) => { + if let RpcResponseError::VerifyError(e) = &e { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); + } + if resolved { + None + } else { + Some(Err(e)) + } + } + } + } + pub fn send_block_for_processing( &self, id: Id, @@ -900,7 +1006,7 @@ fn to_fixed_blob_sidecar_list( let index = blob.index as usize; *fixed_list .get_mut(index) - .ok_or(LookupVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob) + .ok_or(LookupVerifyError::UnrequestedIndex(index as u64))? = Some(blob) } Ok(fixed_list) } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 8387e9b0e1a..94eecff42d3 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -9,13 +9,19 @@ use types::{ blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock, }; +pub use data_columns_by_root::{ + ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest, +}; + +mod data_columns_by_root; + #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { NoResponseReturned, NotEnoughResponsesReturned { expected: usize, actual: usize }, TooManyResponses, UnrequestedBlockRoot(Hash256), - UnrequestedBlobIndex(u64), + UnrequestedIndex(u64), InvalidInclusionProof, DuplicateData, } @@ -131,7 +137,7 @@ impl ActiveBlobsByRootRequest { return Err(LookupVerifyError::InvalidInclusionProof); } if !self.request.indices.contains(&blob.index) { - return Err(LookupVerifyError::UnrequestedBlobIndex(blob.index)); + return Err(LookupVerifyError::UnrequestedIndex(blob.index)); } if self.blobs.iter().any(|b| b.index == blob.index) { return Err(LookupVerifyError::DuplicateData); diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs new file mode 100644 index 00000000000..a45916905ce --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs @@ -0,0 +1,104 @@ +use lighthouse_network::{ + rpc::methods::DataColumnsByRootRequest, service::api_types::SingleLookupReqId, PeerId, +}; +use std::sync::Arc; +use types::{ChainSpec, DataColumnIdentifier, DataColumnSidecar, EthSpec, Hash256}; + +use super::LookupVerifyError; + +#[derive(Debug, Clone)] +pub struct DataColumnsByRootSingleBlockRequest { + pub block_root: Hash256, + pub indices: Vec, +} + +impl DataColumnsByRootSingleBlockRequest { + pub fn into_request(self, spec: &ChainSpec) -> DataColumnsByRootRequest { + DataColumnsByRootRequest::new( + self.indices + .into_iter() + .map(|index| DataColumnIdentifier { + block_root: self.block_root, + index, + }) + .collect(), + spec, + ) + } +} + +pub struct ActiveDataColumnsByRootRequest { + request: DataColumnsByRootSingleBlockRequest, + items: Vec>>, + resolved: bool, + pub(crate) peer_id: PeerId, + pub(crate) requester: SingleLookupReqId, +} + +impl ActiveDataColumnsByRootRequest { + pub fn new( + request: DataColumnsByRootSingleBlockRequest, + peer_id: PeerId, + requester: SingleLookupReqId, + ) -> Self { + Self { + request, + items: vec![], + resolved: false, + peer_id, + requester, + } + } + + /// Appends a chunk to this multi-item request. If all expected chunks are received, this + /// method returns `Some`, resolving the request before the stream terminator. + /// The active request SHOULD be dropped after `add_response` returns an error + pub fn add_response( + &mut self, + data_column: Arc>, + ) -> Result>>>, LookupVerifyError> { + if self.resolved { + return Err(LookupVerifyError::TooManyResponses); + } + + let block_root = data_column.block_root(); + if self.request.block_root != block_root { + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + } + if !data_column.verify_inclusion_proof() { + return Err(LookupVerifyError::InvalidInclusionProof); + } + if !self.request.indices.contains(&data_column.index) { + return Err(LookupVerifyError::UnrequestedIndex(data_column.index)); + } + if self.items.iter().any(|d| d.index == data_column.index) { + return Err(LookupVerifyError::DuplicateData); + } + + self.items.push(data_column); + if self.items.len() >= self.request.indices.len() { + // All expected chunks received, return result early + self.resolved = true; + Ok(Some(std::mem::take(&mut self.items))) + } else { + Ok(None) + } + } + + pub fn terminate(self) -> Result<(), LookupVerifyError> { + if self.resolved { + Ok(()) + } else { + Err(LookupVerifyError::NotEnoughResponsesReturned { + expected: self.request.indices.len(), + actual: self.items.len(), + }) + } + } + + /// Mark request as resolved (= has returned something downstream) while marking this status as + /// true for future calls. + pub fn resolve(&mut self) -> bool { + std::mem::replace(&mut self.resolved, true) + } +}