Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Cache block requests on the gateway #3369

Merged
merged 2 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 56 additions & 40 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,30 +546,35 @@ impl<N: Network> Gateway<N> {
bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
}
// Rate limit for duplicate requests.
if matches!(&event, &Event::CertificateRequest(_) | &Event::CertificateResponse(_)) {
// Retrieve the certificate ID.
let certificate_id = match &event {
Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
_ => unreachable!(),
};
// Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate).
let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
if num_events >= self.max_cache_duplicates() {
return Ok(());
match event {
Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
// Retrieve the certificate ID.
let certificate_id = match &event {
Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
_ => unreachable!(),
};
// Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate).
let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
if num_events >= self.max_cache_duplicates() {
return Ok(());
}
}
Event::TransmissionRequest(TransmissionRequest { transmission_id })
| Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
// Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate).
let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
if num_events >= self.max_cache_duplicates() {
return Ok(());
}
}
} else if matches!(&event, &Event::TransmissionRequest(_) | Event::TransmissionResponse(_)) {
// Retrieve the transmission ID.
let transmission_id = match &event {
Event::TransmissionRequest(TransmissionRequest { transmission_id }) => *transmission_id,
Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => *transmission_id,
_ => unreachable!(),
};
// Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate).
let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
if num_events >= self.max_cache_duplicates() {
return Ok(());
Event::BlockRequest(_) => {
let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
if num_events >= self.max_cache_duplicates() {
return Ok(());
}
}
_ => {}
}
trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());

Expand Down Expand Up @@ -631,6 +636,10 @@ impl<N: Network> Gateway<N> {
if let Some(sync_sender) = self.sync_sender.get() {
// Retrieve the block response.
let BlockResponse { request, blocks } = block_response;
// Check the response corresponds to a request.
if !self.cache.remove_outbound_block_request(peer_ip, &request) {
bail!("Unsolicited block response from '{peer_ip}'")
}
// Perform the deferred non-blocking deserialization of the blocks.
let blocks = blocks.deserialize().await.map_err(|error| anyhow!("[BlockResponse] {error}"))?;
// Ensure the block response is well-formed.
Expand Down Expand Up @@ -979,24 +988,30 @@ impl<N: Network> Transport<N> for Gateway<N> {
}};
}

// If the event type is a certificate request, increment the cache.
if matches!(event, Event::CertificateRequest(_)) | matches!(event, Event::CertificateResponse(_)) {
// Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
// Send the event to the peer.
send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
}
// If the event type is a transmission request, increment the cache.
else if matches!(event, Event::TransmissionRequest(_)) | matches!(event, Event::TransmissionResponse(_)) {
// Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
// Send the event to the peer.
send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
}
// Otherwise, employ a general rate limit.
else {
// Send the event to the peer.
send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
// Increment the cache for certificate, transmission and block events.
match event {
Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
// Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
// Send the event to the peer.
send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
}
Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
// Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
// Send the event to the peer.
send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
}
Event::BlockRequest(request) => {
// Insert the outbound request so we can match it to responses.
self.cache.insert_outbound_block_request(peer_ip, request);
// Send the event to the peer and updatet the outbound event cache, use the general rate limit.
send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
}
_ => {
// Send the event to the peer, use the general rate limit.
send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
}
}
}

Expand Down Expand Up @@ -1090,6 +1105,7 @@ impl<N: Network> Disconnect for Gateway<N> {
// This is sufficient to avoid infinite growth as the committee has a fixed number
// of members.
self.cache.clear_outbound_validators_requests(peer_ip);
self.cache.clear_outbound_block_requests(peer_ip);
}
}
}
Expand Down
33 changes: 32 additions & 1 deletion node/bft/src/helpers/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::events::BlockRequest;
use snarkvm::{console::types::Field, ledger::narwhal::TransmissionID, prelude::Network};

use core::hash::Hash;
use parking_lot::RwLock;
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, HashSet},
net::{IpAddr, SocketAddr},
};
use time::OffsetDateTime;
Expand All @@ -32,6 +33,8 @@ pub struct Cache<N: Network> {
seen_inbound_certificates: RwLock<BTreeMap<i64, HashMap<Field<N>, u32>>>,
/// The ordered timestamp map of transmission IDs and cache hits.
seen_inbound_transmissions: RwLock<BTreeMap<i64, HashMap<TransmissionID<N>, u32>>>,
/// The ordered timestamp map of inbound block requests and cache hits.
seen_inbound_block_requests: RwLock<BTreeMap<i64, HashMap<SocketAddr, u32>>>,
/// The ordered timestamp map of peer IPs and their cache hits on outbound events.
seen_outbound_events: RwLock<BTreeMap<i64, HashMap<SocketAddr, u32>>>,
/// The ordered timestamp map of peer IPs and their cache hits on certificate requests.
Expand All @@ -40,6 +43,8 @@ pub struct Cache<N: Network> {
seen_outbound_transmissions: RwLock<BTreeMap<i64, HashMap<SocketAddr, u32>>>,
/// The map of IPs to the number of validators requests.
seen_outbound_validators_requests: RwLock<HashMap<SocketAddr, u32>>,
/// The ordered timestamp map of outbound block requests and cache hits.
seen_outbound_block_requests: RwLock<HashMap<SocketAddr, HashSet<BlockRequest>>>,
}

impl<N: Network> Default for Cache<N> {
Expand All @@ -57,10 +62,12 @@ impl<N: Network> Cache<N> {
seen_inbound_events: Default::default(),
seen_inbound_certificates: Default::default(),
seen_inbound_transmissions: Default::default(),
seen_inbound_block_requests: Default::default(),
seen_outbound_events: Default::default(),
seen_outbound_certificates: Default::default(),
seen_outbound_transmissions: Default::default(),
seen_outbound_validators_requests: Default::default(),
seen_outbound_block_requests: Default::default(),
}
}
}
Expand All @@ -85,6 +92,11 @@ impl<N: Network> Cache<N> {
pub fn insert_inbound_transmission(&self, key: TransmissionID<N>, interval_in_secs: i64) -> usize {
Self::retain_and_insert(&self.seen_inbound_transmissions, key, interval_in_secs)
}

/// Inserts a block request into the cache, returning the number of recent events.
pub fn insert_inbound_block_request(&self, key: SocketAddr, interval_in_secs: i64) -> usize {
Self::retain_and_insert(&self.seen_inbound_block_requests, key, interval_in_secs)
}
}

impl<N: Network> Cache<N> {
Expand Down Expand Up @@ -124,6 +136,25 @@ impl<N: Network> Cache<N> {
pub fn clear_outbound_validators_requests(&self, peer_ip: SocketAddr) {
self.seen_outbound_validators_requests.write().remove(&peer_ip);
}

/// Inserts the block request for the given peer.
pub fn insert_outbound_block_request(&self, peer_ip: SocketAddr, request: BlockRequest) {
self.seen_outbound_block_requests.write().entry(peer_ip).or_default().insert(request);
}

/// Removes the block request for the given peer. Returns whether the request was present.
pub fn remove_outbound_block_request(&self, peer_ip: SocketAddr, request: &BlockRequest) -> bool {
self.seen_outbound_block_requests
.write()
.get_mut(&peer_ip)
.map(|requests| requests.remove(request))
.unwrap_or(false)
}

/// Clears the peer's number of outbound block requests.
pub fn clear_outbound_block_requests(&self, peer_ip: SocketAddr) {
self.seen_outbound_block_requests.write().remove(&peer_ip);
}
}

impl<N: Network> Cache<N> {
Expand Down