Skip to content

Commit

Permalink
Add individual by_range sync requests
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Oct 18, 2024
1 parent 6ad2c18 commit ff9e2f4
Show file tree
Hide file tree
Showing 12 changed files with 732 additions and 426 deletions.
21 changes: 21 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,27 @@ impl OldBlocksByRangeRequest {
}
}

impl From<BlocksByRangeRequest> for OldBlocksByRangeRequest {
fn from(req: BlocksByRangeRequest) -> Self {
match req {
BlocksByRangeRequest::V1(ref req) => {
OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1 {
start_slot: req.start_slot,
count: req.count,
step: 1,
})
}
BlocksByRangeRequest::V2(ref req) => {
OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 {
start_slot: req.start_slot,
count: req.count,
step: 1,
})
}
}
}
}

/// Request a number of beacon block bodies from a peer.
#[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))]
#[derive(Clone, Debug, PartialEq)]
Expand Down
22 changes: 13 additions & 9 deletions beacon_node/lighthouse_network/src/rpc/self_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ mod tests {
use crate::rpc::rate_limiter::Quota;
use crate::rpc::self_limiter::SelfRateLimiter;
use crate::rpc::{Ping, Protocol, RequestType};
use crate::service::api_types::{AppRequestId, RequestId, SyncRequestId};
use crate::service::api_types::{AppRequestId, RequestId, SingleLookupReqId, SyncRequestId};
use libp2p::PeerId;
use std::time::Duration;
use types::MainnetEthSpec;
Expand All @@ -228,12 +228,16 @@ mod tests {
let mut limiter: SelfRateLimiter<RequestId, MainnetEthSpec> =
SelfRateLimiter::new(config, log).unwrap();
let peer_id = PeerId::random();
let lookup_id = 0;

for i in 1..=5u32 {
let _ = limiter.allows(
peer_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id: i,
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId {
lookup_id,
req_id: i,
},
})),
RequestType::Ping(Ping { data: i as u64 }),
);
Expand All @@ -251,9 +255,9 @@ mod tests {
for i in 2..=5u32 {
assert!(matches!(
iter.next().unwrap().request_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id,
})) if id == i
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
})) if req_id == i,
));
}

Expand All @@ -276,9 +280,9 @@ mod tests {
for i in 3..=5 {
assert!(matches!(
iter.next().unwrap().request_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id
})) if id == i
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
})) if req_id == i,
));
}

Expand Down
48 changes: 45 additions & 3 deletions beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use libp2p::swarm::ConnectionId;
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, Hash256, LightClientBootstrap,
BlobSidecar, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock,
};

Expand Down Expand Up @@ -31,8 +31,12 @@ pub enum SyncRequestId {
SingleBlob { id: SingleLookupReqId },
/// Request searching for a set of data columns given a hash and list of column indices.
DataColumnsByRoot(DataColumnsByRootRequestId),
/// Range request that is composed by both a block range request and a blob range request.
RangeBlockAndBlobs { id: Id },
/// Blocks by range request
BlocksByRange(BlocksByRangeRequestId),
/// Blobs by range request
BlobsByRange(BlobsByRangeRequestId),
/// Data columns by range request
DataColumnsByRange(DataColumnsByRangeRequestId),
}

/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly.
Expand All @@ -43,12 +47,50 @@ pub struct DataColumnsByRootRequestId {
pub requester: DataColumnsByRootRequester,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct BlocksByRangeRequestId {
pub id: Id,
pub requester: ComponentsByRangeRequestId,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct BlobsByRangeRequestId {
pub id: Id,
pub requester: ComponentsByRangeRequestId,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct DataColumnsByRangeRequestId {
pub id: Id,
pub requester: ComponentsByRangeRequestId,
}

/// Block components by range request for range sync. Includes an ID for downstream consumers to
/// handle retries and tie all their sub requests together.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct ComponentsByRangeRequestId {
pub id: Id,
pub requester: RangeRequestId,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RangeRequestId {
RangeSync { chain_id: u64, batch_id: Epoch },
BackfillSync { batch_id: Epoch },
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum DataColumnsByRootRequester {
Sampling(SamplingId),
Custody(CustodyId),
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RangeRequester {
RangeSync { chain_id: u64, batch_id: Epoch },
BackfillSync { batch_id: Epoch },
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct SamplingId {
pub id: SamplingRequester,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
AppRequestId::Sync(sync_id) => match sync_id {
id @ SyncRequestId::RangeBlockAndBlobs { .. } => id,
id @ SyncRequestId::BlocksByRange { .. } => id,
other => {
crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other);
return;
Expand Down
Loading

0 comments on commit ff9e2f4

Please sign in to comment.