Skip to content

Commit

Permalink
Address review
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Oct 18, 2024
1 parent ff9e2f4 commit 5fa2912
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 37 deletions.
20 changes: 14 additions & 6 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::BlockLookups;
use super::network_context::{
BlockOrBlob, CustodyByRootResult, RangeRequestId, RpcEvent, SyncNetworkContext,
CustodyByRootResult, RangeBlockComponent, RangeRequestId, RpcEvent, SyncNetworkContext,
};
use super::peer_sampling::{Sampling, SamplingConfig, SamplingResult};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
Expand Down Expand Up @@ -1150,7 +1150,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
block: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) {
if let Some(resp) = self.network.on_blocks_by_range_response(id, peer_id, block) {
self.on_range_components_response(id.requester, peer_id, BlockOrBlob::Block(resp));
self.on_range_components_response(
id.requester,
peer_id,
RangeBlockComponent::Block(resp),
);
}
}

Expand All @@ -1161,7 +1165,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) {
if let Some(resp) = self.network.on_blobs_by_range_response(id, peer_id, blob) {
self.on_range_components_response(id.requester, peer_id, BlockOrBlob::Blob(resp));
self.on_range_components_response(
id.requester,
peer_id,
RangeBlockComponent::Blob(resp),
);
}
}

Expand All @@ -1178,7 +1186,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.on_range_components_response(
id.requester,
peer_id,
BlockOrBlob::CustodyColumns(resp),
RangeBlockComponent::CustodyColumns(resp),
);
}
}
Expand Down Expand Up @@ -1237,11 +1245,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
&mut self,
id: ComponentsByRangeRequestId,
peer_id: PeerId,
block_or_blob: BlockOrBlob<T::EthSpec>,
block_or_blob: RangeBlockComponent<T::EthSpec>,
) {
if let Some(resp) = self
.network
.range_block_and_blob_response(id, block_or_blob)
.range_block_component_response(id, block_or_blob)
{
match resp {
Ok(blocks) => {
Expand Down
36 changes: 12 additions & 24 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
}

/// Small enumeration to make dealing with block and blob requests easier.
pub enum BlockOrBlob<E: EthSpec> {
pub enum RangeBlockComponent<E: EthSpec> {
Block(RpcResponseResult<Vec<Arc<SignedBeaconBlock<E>>>>),
Blob(RpcResponseResult<Vec<Arc<BlobSidecar<E>>>>),
CustodyColumns(RpcResponseResult<Vec<Arc<DataColumnSidecar<E>>>>),
Expand Down Expand Up @@ -449,10 +449,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {

/// Received a blocks by range or blobs by range response for a request that couples blocks '
/// and blobs.
pub fn range_block_and_blob_response(
pub fn range_block_component_response(
&mut self,
id: ComponentsByRangeRequestId,
block_or_blob: BlockOrBlob<T::EthSpec>,
block_or_blob: RangeBlockComponent<T::EthSpec>,
) -> Option<Result<Vec<RpcBlock<T::EthSpec>>, RpcResponseError>> {
let Entry::Occupied(mut entry) = self.components_by_range_requests.entry(id) else {
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["range_blocks"]);
Expand All @@ -462,27 +462,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
if let Err(e) = {
let request = entry.get_mut();
match block_or_blob {
BlockOrBlob::Block(resp) => match resp {
Ok((blocks, _)) => {
request.add_blocks(blocks);
Ok(())
}
Err(e) => Err(e),
},
BlockOrBlob::Blob(resp) => match resp {
Ok((blobs, _)) => {
request.add_blobs(blobs);
Ok(())
}
Err(e) => Err(e),
},
BlockOrBlob::CustodyColumns(resp) => match resp {
Ok((custody_columns, _)) => {
request.add_custody_columns(custody_columns);
Ok(())
}
Err(e) => Err(e),
},
RangeBlockComponent::Block(resp) => resp.map(|(blocks, _)| {
request.add_blocks(blocks);
}),
RangeBlockComponent::Blob(resp) => resp.map(|(blobs, _)| {
request.add_blobs(blobs);
}),
RangeBlockComponent::CustodyColumns(resp) => resp.map(|(custody_columns, _)| {
request.add_custody_columns(custody_columns);
}),
}
} {
entry.remove();
Expand Down
14 changes: 7 additions & 7 deletions beacon_node/network/src/sync/range_sync/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ mod tests {
use crate::NetworkMessage;

use super::*;
use crate::sync::network_context::{BlockOrBlob, RangeRequestId, RpcResponseResult};
use crate::sync::network_context::{RangeBlockComponent, RangeRequestId, RpcResponseResult};
use beacon_chain::builder::Witness;
use beacon_chain::eth1_chain::CachingEth1Backend;
use beacon_chain::parking_lot::RwLock;
Expand Down Expand Up @@ -569,17 +569,17 @@ mod tests {
if blob_req_opt.is_some() {
match block_req {
AppRequestId::Sync(SyncRequestId::BlocksByRange(id)) => {
let result = self.cx.range_block_and_blob_response(
let result = self.cx.range_block_component_response(
id.requester,
BlockOrBlob::Block(empty_response()),
RangeBlockComponent::Block(empty_response()),
);
if result.is_some() {
panic!("range components should not complete yet");
}
self.cx
.range_block_and_blob_response(
.range_block_component_response(
id.requester,
BlockOrBlob::Blob(empty_response()),
RangeBlockComponent::Blob(empty_response()),
)
.unwrap()
.unwrap();
Expand All @@ -593,9 +593,9 @@ mod tests {
match block_req {
AppRequestId::Sync(SyncRequestId::BlocksByRange(id)) => {
self.cx
.range_block_and_blob_response(
.range_block_component_response(
id.requester,
BlockOrBlob::Block(empty_response()),
RangeBlockComponent::Block(empty_response()),
)
.unwrap()
.unwrap();
Expand Down

0 comments on commit 5fa2912

Please sign in to comment.