From d37ce9de03dd7c0c45b492772cd88db83e7631de Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Thu, 20 Jun 2024 15:11:50 +0530 Subject: [PATCH] fix(network): store limited number of double spends from kad::put --- sn_networking/src/driver.rs | 2 +- sn_networking/src/event/kad.rs | 7 ++--- sn_networking/src/lib.rs | 7 ++--- sn_networking/src/record_store.rs | 3 +- sn_node/src/put_validation.rs | 51 +++++++++++++++++++++++-------- 5 files changed, 46 insertions(+), 24 deletions(-) diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 71ea2c68fc..5237c78fd8 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -102,7 +102,7 @@ pub(crate) type BadNodes = BTreeMap, bool)>; /// What is the largest packet to send over the network. /// Records larger than this will be rejected. // TODO: revisit once cashnote_redemption is in -const MAX_PACKET_SIZE: usize = 1024 * 1024 * 5; // the chunk size is 1mb, so should be higher than that to prevent failures, 5mb here to allow for CashNote storage +pub const MAX_PACKET_SIZE: usize = 1024 * 1024 * 5; // the chunk size is 1mb, so should be higher than that to prevent failures, 5mb here to allow for CashNote storage // Timeout for requests sent/received through the request_response behaviour. const REQUEST_TIMEOUT_DEFAULT_S: Duration = Duration::from_secs(30); diff --git a/sn_networking/src/event/kad.rs b/sn_networking/src/event/kad.rs index 8f72b5f3f7..6060eafe81 100644 --- a/sn_networking/src/event/kad.rs +++ b/sn_networking/src/event/kad.rs @@ -9,7 +9,6 @@ use crate::{ driver::PendingGetClosestType, get_quorum_value, get_raw_signed_spends_from_record, GetRecordCfg, GetRecordError, NetworkError, Result, SwarmDriver, CLOSE_GROUP_SIZE, - MAX_DOUBLE_SPEND_ATTEMPTS_PER_RECORD, }; use itertools::Itertools; use libp2p::kad::{ @@ -413,10 +412,8 @@ impl SwarmDriver { } if !accumulated_spends.is_empty() { info!("For record {pretty_key:?} task {query_id:?}, found split record for a spend, accumulated and sending them as a single record"); - let accumulated_spends = accumulated_spends - .into_iter() - .take(MAX_DOUBLE_SPEND_ATTEMPTS_PER_RECORD) - .collect::>(); + let accumulated_spends = + accumulated_spends.into_iter().collect::>(); let bytes = try_serialize_record(&accumulated_spends, RecordKind::Spend)?; diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 10b0aa0166..848c8210a8 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -36,7 +36,9 @@ pub use target_arch::{interval, sleep, spawn, Instant, Interval}; pub use self::{ cmd::{NodeIssue, SwarmLocalState}, - driver::{GetRecordCfg, NetworkBuilder, PutRecordCfg, SwarmDriver, VerificationKind}, + driver::{ + GetRecordCfg, NetworkBuilder, PutRecordCfg, SwarmDriver, VerificationKind, MAX_PACKET_SIZE, + }, error::{GetRecordError, NetworkError}, event::{MsgResponder, NetworkEvent}, record_store::{calculate_cost_for_records, NodeRecordStore}, @@ -87,9 +89,6 @@ pub const CLOSE_GROUP_SIZE: usize = 5; /// that a replication of the record shall be sent/accepted to/by the peer. pub const REPLICATION_PEERS_COUNT: usize = CLOSE_GROUP_SIZE + 2; -/// The maximum number of double spend attempts to store inside a record. -pub const MAX_DOUBLE_SPEND_ATTEMPTS_PER_RECORD: usize = 50; - /// Majority of a given group (i.e. > 1/2). #[inline] pub const fn close_group_majority() -> usize { diff --git a/sn_networking/src/record_store.rs b/sn_networking/src/record_store.rs index 96eacbb3ea..fd397b6af7 100644 --- a/sn_networking/src/record_store.rs +++ b/sn_networking/src/record_store.rs @@ -7,6 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. #![allow(clippy::mutable_key_type)] // for the Bytes in NetworkAddress +use crate::driver::MAX_PACKET_SIZE; use crate::target_arch::{spawn, Instant}; use crate::CLOSE_GROUP_SIZE; use crate::{cmd::SwarmCmd, event::NetworkEvent, log_markers::Marker, send_swarm_cmd}; @@ -109,7 +110,7 @@ impl Default for NodeRecordStoreConfig { storage_dir: historic_quote_dir.clone(), historic_quote_dir, max_records: MAX_RECORDS_COUNT, - max_value_bytes: 65 * 1024, + max_value_bytes: MAX_PACKET_SIZE, } } } diff --git a/sn_node/src/put_validation.rs b/sn_node/src/put_validation.rs index 4eff2475c8..e817f457e6 100644 --- a/sn_node/src/put_validation.rs +++ b/sn_node/src/put_validation.rs @@ -9,8 +9,7 @@ use crate::{node::Node, quote::verify_quote_for_storecost, Error, Marker, Result}; use libp2p::kad::{Record, RecordKey}; use sn_networking::{ - get_raw_signed_spends_from_record, GetRecordError, NetworkError, - MAX_DOUBLE_SPEND_ATTEMPTS_PER_RECORD, + get_raw_signed_spends_from_record, GetRecordError, NetworkError, MAX_PACKET_SIZE, }; use sn_protocol::{ storage::{ @@ -28,6 +27,12 @@ use std::collections::BTreeSet; use tokio::task::JoinSet; use xor_name::XorName; +/// The maximum number of double spend attempts to store that we got from PUTs +const MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_FROM_PUTS: usize = 15; + +/// The maximum number of double spend attempts to store inside a record +const MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_PER_RECORD: usize = 30; + impl Node { /// Validate a record and it's payment, and store the record to the RecordStore pub(crate) async fn validate_and_store_record(&self, record: Record) -> Result<()> { @@ -93,7 +98,7 @@ impl Node { let value_to_hash = record.value.clone(); let spends = try_deserialize_record::>(&record)?; let result = self - .validate_merge_and_store_spends(spends, &record_key) + .validate_merge_and_store_spends(spends, &record_key, true) .await; if result.is_ok() { Marker::ValidSpendPutFromClient(&PrettyPrintRecordKey::from(&record_key)).log(); @@ -201,7 +206,7 @@ impl Node { RecordKind::Spend => { let record_key = record.key.clone(); let spends = try_deserialize_record::>(&record)?; - self.validate_merge_and_store_spends(spends, &record_key) + self.validate_merge_and_store_spends(spends, &record_key, false) .await } RecordKind::Register => { @@ -336,6 +341,7 @@ impl Node { &self, signed_spends: Vec, record_key: &RecordKey, + from_put: bool, ) -> Result<()> { let pretty_key = PrettyPrintRecordKey::from(record_key); debug!("Validating spends before storage at {pretty_key:?}"); @@ -374,7 +380,7 @@ impl Node { // validate the signed spends against the network and the local knowledge debug!("Validating spends for {pretty_key:?} with unique key: {unique_pubkey:?}"); let validated_spends = match self - .signed_spends_to_keep(spends_for_key.clone(), *unique_pubkey) + .signed_spends_to_keep(spends_for_key.clone(), *unique_pubkey, from_put) .await { Ok(s) => s, @@ -622,17 +628,18 @@ impl Node { } /// Determine which spends our node should keep and store - /// - checks if we already have local copies and trusts them to be valid - /// - downloads spends from the network as well - /// - verifies incoming spends before trusting them - /// - ignores received invalid spends - /// - returns the valid spends to store - /// - returns max 2 spends to store - /// - if we have more than 2 valid spends, returns the first 2 + /// - if our local copy has reached the len/size limits, we don't store anymore from kad::PUT and return the local copy + /// - else if the request is from replication OR if limit not reached during kad::PUT, then: + /// - trust local spends + /// - downloads spends from the network + /// - verifies incoming spend + network spends and ignores the invalid ones. + /// - orders all the verified spends from local + incoming + network + /// - returns a maximum of MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_PER_RECORD spends async fn signed_spends_to_keep( &self, signed_spends: Vec, unique_pubkey: UniquePubkey, + from_put: bool, ) -> Result> { let spend_addr = SpendAddress::from_unique_pubkey(&unique_pubkey); debug!( @@ -640,6 +647,22 @@ impl Node { ); let local_spends = self.get_local_spends(spend_addr).await?; + let size_of_local_spends = try_serialize_record(&local_spends, RecordKind::Spend)? + .to_vec() + .len(); + let max_spend_len_reached = + local_spends.len() >= MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_FROM_PUTS; + let max_spend_size_reached = { + // todo: limit size of a single signed spend to < max_packet_size/2 + let size_limit = size_of_local_spends >= MAX_PACKET_SIZE / 2; + // just so that we can store the double spend + size_limit && local_spends.len() > 1 + }; + + if (max_spend_len_reached || max_spend_size_reached) && from_put { + info!("We already have {MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_FROM_PUTS} spends locally or have maximum size of spends, skipping spends received via PUT for {unique_pubkey:?}"); + return Ok(local_spends); + } let mut all_verified_spends = BTreeSet::from_iter(local_spends.into_iter()); // get spends from the network at the address for that unique pubkey @@ -693,9 +716,11 @@ impl Node { } } + // todo: should we also check the size of spends here? Maybe just limit the size of a single + // SignedSpend to < max_packet_size/2 so that we can store atleast 2 of them. let verified_spends = all_verified_spends .into_iter() - .take(MAX_DOUBLE_SPEND_ATTEMPTS_PER_RECORD) + .take(MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_PER_RECORD) .collect::>(); if verified_spends.is_empty() {