Skip to content

Commit

Permalink
fix(network): store limited number of double spends from kad::put
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Jun 20, 2024
1 parent bc31827 commit d37ce9d
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 24 deletions.
2 changes: 1 addition & 1 deletion sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub(crate) type BadNodes = BTreeMap<PeerId, (Vec<(NodeIssue, Instant)>, bool)>;
/// What is the largest packet to send over the network.
/// Records larger than this will be rejected.
// TODO: revisit once cashnote_redemption is in
const MAX_PACKET_SIZE: usize = 1024 * 1024 * 5; // the chunk size is 1mb, so should be higher than that to prevent failures, 5mb here to allow for CashNote storage
pub const MAX_PACKET_SIZE: usize = 1024 * 1024 * 5; // the chunk size is 1mb, so should be higher than that to prevent failures, 5mb here to allow for CashNote storage

// Timeout for requests sent/received through the request_response behaviour.
const REQUEST_TIMEOUT_DEFAULT_S: Duration = Duration::from_secs(30);
Expand Down
7 changes: 2 additions & 5 deletions sn_networking/src/event/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use crate::{
driver::PendingGetClosestType, get_quorum_value, get_raw_signed_spends_from_record,
GetRecordCfg, GetRecordError, NetworkError, Result, SwarmDriver, CLOSE_GROUP_SIZE,
MAX_DOUBLE_SPEND_ATTEMPTS_PER_RECORD,
};
use itertools::Itertools;
use libp2p::kad::{
Expand Down Expand Up @@ -413,10 +412,8 @@ impl SwarmDriver {
}
if !accumulated_spends.is_empty() {
info!("For record {pretty_key:?} task {query_id:?}, found split record for a spend, accumulated and sending them as a single record");
let accumulated_spends = accumulated_spends
.into_iter()
.take(MAX_DOUBLE_SPEND_ATTEMPTS_PER_RECORD)
.collect::<Vec<SignedSpend>>();
let accumulated_spends =
accumulated_spends.into_iter().collect::<Vec<SignedSpend>>();

let bytes = try_serialize_record(&accumulated_spends, RecordKind::Spend)?;

Expand Down
7 changes: 3 additions & 4 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ pub use target_arch::{interval, sleep, spawn, Instant, Interval};

pub use self::{
cmd::{NodeIssue, SwarmLocalState},
driver::{GetRecordCfg, NetworkBuilder, PutRecordCfg, SwarmDriver, VerificationKind},
driver::{
GetRecordCfg, NetworkBuilder, PutRecordCfg, SwarmDriver, VerificationKind, MAX_PACKET_SIZE,
},
error::{GetRecordError, NetworkError},
event::{MsgResponder, NetworkEvent},
record_store::{calculate_cost_for_records, NodeRecordStore},
Expand Down Expand Up @@ -87,9 +89,6 @@ pub const CLOSE_GROUP_SIZE: usize = 5;
/// that a replication of the record shall be sent/accepted to/by the peer.
pub const REPLICATION_PEERS_COUNT: usize = CLOSE_GROUP_SIZE + 2;

/// The maximum number of double spend attempts to store inside a record.
pub const MAX_DOUBLE_SPEND_ATTEMPTS_PER_RECORD: usize = 50;

/// Majority of a given group (i.e. > 1/2).
#[inline]
pub const fn close_group_majority() -> usize {
Expand Down
3 changes: 2 additions & 1 deletion sn_networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.
#![allow(clippy::mutable_key_type)] // for the Bytes in NetworkAddress

use crate::driver::MAX_PACKET_SIZE;
use crate::target_arch::{spawn, Instant};
use crate::CLOSE_GROUP_SIZE;
use crate::{cmd::SwarmCmd, event::NetworkEvent, log_markers::Marker, send_swarm_cmd};
Expand Down Expand Up @@ -109,7 +110,7 @@ impl Default for NodeRecordStoreConfig {
storage_dir: historic_quote_dir.clone(),
historic_quote_dir,
max_records: MAX_RECORDS_COUNT,
max_value_bytes: 65 * 1024,
max_value_bytes: MAX_PACKET_SIZE,
}
}
}
Expand Down
51 changes: 38 additions & 13 deletions sn_node/src/put_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
use crate::{node::Node, quote::verify_quote_for_storecost, Error, Marker, Result};
use libp2p::kad::{Record, RecordKey};
use sn_networking::{
get_raw_signed_spends_from_record, GetRecordError, NetworkError,
MAX_DOUBLE_SPEND_ATTEMPTS_PER_RECORD,
get_raw_signed_spends_from_record, GetRecordError, NetworkError, MAX_PACKET_SIZE,
};
use sn_protocol::{
storage::{
Expand All @@ -28,6 +27,12 @@ use std::collections::BTreeSet;
use tokio::task::JoinSet;
use xor_name::XorName;

/// The maximum number of double spend attempts to store that we got from PUTs
const MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_FROM_PUTS: usize = 15;

/// The maximum number of double spend attempts to store inside a record
const MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_PER_RECORD: usize = 30;

impl Node {
/// Validate a record and it's payment, and store the record to the RecordStore
pub(crate) async fn validate_and_store_record(&self, record: Record) -> Result<()> {
Expand Down Expand Up @@ -93,7 +98,7 @@ impl Node {
let value_to_hash = record.value.clone();
let spends = try_deserialize_record::<Vec<SignedSpend>>(&record)?;
let result = self
.validate_merge_and_store_spends(spends, &record_key)
.validate_merge_and_store_spends(spends, &record_key, true)
.await;
if result.is_ok() {
Marker::ValidSpendPutFromClient(&PrettyPrintRecordKey::from(&record_key)).log();
Expand Down Expand Up @@ -201,7 +206,7 @@ impl Node {
RecordKind::Spend => {
let record_key = record.key.clone();
let spends = try_deserialize_record::<Vec<SignedSpend>>(&record)?;
self.validate_merge_and_store_spends(spends, &record_key)
self.validate_merge_and_store_spends(spends, &record_key, false)
.await
}
RecordKind::Register => {
Expand Down Expand Up @@ -336,6 +341,7 @@ impl Node {
&self,
signed_spends: Vec<SignedSpend>,
record_key: &RecordKey,
from_put: bool,
) -> Result<()> {
let pretty_key = PrettyPrintRecordKey::from(record_key);
debug!("Validating spends before storage at {pretty_key:?}");
Expand Down Expand Up @@ -374,7 +380,7 @@ impl Node {
// validate the signed spends against the network and the local knowledge
debug!("Validating spends for {pretty_key:?} with unique key: {unique_pubkey:?}");
let validated_spends = match self
.signed_spends_to_keep(spends_for_key.clone(), *unique_pubkey)
.signed_spends_to_keep(spends_for_key.clone(), *unique_pubkey, from_put)
.await
{
Ok(s) => s,
Expand Down Expand Up @@ -622,24 +628,41 @@ impl Node {
}

/// Determine which spends our node should keep and store
/// - checks if we already have local copies and trusts them to be valid
/// - downloads spends from the network as well
/// - verifies incoming spends before trusting them
/// - ignores received invalid spends
/// - returns the valid spends to store
/// - returns max 2 spends to store
/// - if we have more than 2 valid spends, returns the first 2
/// - if our local copy has reached the len/size limits, we don't store anymore from kad::PUT and return the local copy
/// - else if the request is from replication OR if limit not reached during kad::PUT, then:
/// - trust local spends
/// - downloads spends from the network
/// - verifies incoming spend + network spends and ignores the invalid ones.
/// - orders all the verified spends from local + incoming + network
/// - returns a maximum of MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_PER_RECORD spends
async fn signed_spends_to_keep(
&self,
signed_spends: Vec<SignedSpend>,
unique_pubkey: UniquePubkey,
from_put: bool,
) -> Result<Vec<SignedSpend>> {
let spend_addr = SpendAddress::from_unique_pubkey(&unique_pubkey);
debug!(
"Validating before storing spend at {spend_addr:?} with unique key: {unique_pubkey}"
);

let local_spends = self.get_local_spends(spend_addr).await?;
let size_of_local_spends = try_serialize_record(&local_spends, RecordKind::Spend)?
.to_vec()
.len();
let max_spend_len_reached =
local_spends.len() >= MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_FROM_PUTS;
let max_spend_size_reached = {
// todo: limit size of a single signed spend to < max_packet_size/2
let size_limit = size_of_local_spends >= MAX_PACKET_SIZE / 2;
// just so that we can store the double spend
size_limit && local_spends.len() > 1
};

if (max_spend_len_reached || max_spend_size_reached) && from_put {
info!("We already have {MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_FROM_PUTS} spends locally or have maximum size of spends, skipping spends received via PUT for {unique_pubkey:?}");
return Ok(local_spends);
}
let mut all_verified_spends = BTreeSet::from_iter(local_spends.into_iter());

// get spends from the network at the address for that unique pubkey
Expand Down Expand Up @@ -693,9 +716,11 @@ impl Node {
}
}

// todo: should we also check the size of spends here? Maybe just limit the size of a single
// SignedSpend to < max_packet_size/2 so that we can store atleast 2 of them.
let verified_spends = all_verified_spends
.into_iter()
.take(MAX_DOUBLE_SPEND_ATTEMPTS_PER_RECORD)
.take(MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_PER_RECORD)
.collect::<Vec<SignedSpend>>();

if verified_spends.is_empty() {
Expand Down

0 comments on commit d37ce9d

Please sign in to comment.