diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index 3a6ffc726a..ddc1ab7aa9 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -24,7 +24,7 @@ const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096; enum InternalGetNetworkSpend { Spend(Box), - DoubleSpend(Box, Box), + DoubleSpend(Vec), NotFound, Error(Error), } @@ -33,17 +33,18 @@ impl Client { pub async fn new_dag_with_genesis_only(&self) -> WalletResult { let genesis_addr = SpendAddress::from_unique_pubkey(&GENESIS_SPEND_UNIQUE_KEY); let mut dag = SpendDag::new(genesis_addr); - let genesis_spend = match self.get_spend_from_network(genesis_addr).await { - Ok(s) => s, - Err(Error::Network(NetworkError::DoubleSpendAttempt(spend1, spend2))) => { - let addr = spend1.address(); - println!("Double spend detected at Genesis: {addr:?}"); - dag.insert(genesis_addr, *spend2); - *spend1 + match self.get_spend_from_network(genesis_addr).await { + Ok(spend) => { + dag.insert(genesis_addr, spend); + } + Err(Error::Network(NetworkError::DoubleSpendAttempt(spends))) => { + println!("Double spend detected at Genesis: {genesis_addr:?}"); + for spend in spends.into_iter() { + dag.insert(genesis_addr, spend); + } } Err(e) => return Err(WalletError::FailedToGetSpend(e.to_string())), }; - dag.insert(genesis_addr, genesis_spend); Ok(dag) } @@ -163,7 +164,7 @@ impl Client { .map_err(|e| WalletError::SpendProcessing(e.to_string())); addrs_to_get.extend(for_further_track); } - InternalGetNetworkSpend::DoubleSpend(_s1, _s2) => { + InternalGetNetworkSpend::DoubleSpend(_spends) => { warn!("Detected double spend regarding {address:?}"); } InternalGetNetworkSpend::NotFound => { @@ -193,14 +194,27 @@ impl Client { let mut utxos = BTreeSet::new(); // get first spend - let first_spend = match self.crawl_spend(spend_addr).await { - InternalGetNetworkSpend::Spend(s) => *s, - InternalGetNetworkSpend::DoubleSpend(s1, s2) => { + let mut txs_to_follow = match self.crawl_spend(spend_addr).await { + InternalGetNetworkSpend::Spend(spend) => { + let spend = *spend; + let txs = BTreeSet::from_iter([spend.spend.spent_tx.clone()]); + spend_processing - .send(*s2) + .send(spend) .await .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; - *s1 + txs + } + InternalGetNetworkSpend::DoubleSpend(spends) => { + let mut txs = BTreeSet::new(); + for spend in spends.into_iter() { + txs.insert(spend.spend.spent_tx.clone()); + spend_processing + .send(spend) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; + } + txs } InternalGetNetworkSpend::NotFound => { // the cashnote was not spent yet, so it's an UTXO @@ -212,13 +226,8 @@ impl Client { return Err(WalletError::FailedToGetSpend(e.to_string())); } }; - spend_processing - .send(first_spend.clone()) - .await - .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; // use iteration instead of recursion to avoid stack overflow - let mut txs_to_follow = BTreeSet::from_iter([first_spend.spend.spent_tx]); let mut known_tx = BTreeSet::new(); let mut gen: u32 = 0; let start = std::time::Instant::now(); @@ -260,18 +269,15 @@ impl Client { .await .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; } - InternalGetNetworkSpend::DoubleSpend(s1, s2) => { - info!("Fetched double spend at {addr:?} from network, following both..."); - next_gen_tx.insert(s1.spend.spent_tx.clone()); - next_gen_tx.insert(s2.spend.spent_tx.clone()); - spend_processing - .send(*s1.clone()) - .await - .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; - spend_processing - .send(*s2.clone()) - .await - .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; + InternalGetNetworkSpend::DoubleSpend(spends) => { + info!("Fetched double spend(s) of len {} at {addr:?} from network, following all of them.", spends.len()); + for s in spends.into_iter() { + next_gen_tx.insert(s.spend.spent_tx.clone()); + spend_processing + .send(s.clone()) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; + } } InternalGetNetworkSpend::NotFound => { info!("Reached UTXO at {addr:?}"); @@ -360,8 +366,8 @@ impl Client { InternalGetNetworkSpend::Spend(s) => { spends.insert(*s); } - InternalGetNetworkSpend::DoubleSpend(s1, s2) => { - spends.extend([*s1, *s2]); + InternalGetNetworkSpend::DoubleSpend(s) => { + spends.extend(s.into_iter()); } InternalGetNetworkSpend::NotFound => { return Err(WalletError::FailedToGetSpend(format!( @@ -498,9 +504,9 @@ impl Client { debug!("DAG crawling: spend at {spend_addr:?} not found on the network"); InternalGetNetworkSpend::NotFound } - Err(Error::Network(NetworkError::DoubleSpendAttempt(s1, s2))) => { - debug!("DAG crawling: got a double spend at {spend_addr:?} on the network"); - InternalGetNetworkSpend::DoubleSpend(s1, s2) + Err(Error::Network(NetworkError::DoubleSpendAttempt(spends))) => { + debug!("DAG crawling: got a double spend(s) of len {} at {spend_addr:?} on the network", spends.len()); + InternalGetNetworkSpend::DoubleSpend(spends) } Err(e) => { debug!( diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 71ea2c68fc..5237c78fd8 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -102,7 +102,7 @@ pub(crate) type BadNodes = BTreeMap, bool)>; /// What is the largest packet to send over the network. /// Records larger than this will be rejected. // TODO: revisit once cashnote_redemption is in -const MAX_PACKET_SIZE: usize = 1024 * 1024 * 5; // the chunk size is 1mb, so should be higher than that to prevent failures, 5mb here to allow for CashNote storage +pub const MAX_PACKET_SIZE: usize = 1024 * 1024 * 5; // the chunk size is 1mb, so should be higher than that to prevent failures, 5mb here to allow for CashNote storage // Timeout for requests sent/received through the request_response behaviour. const REQUEST_TIMEOUT_DEFAULT_S: Duration = Duration::from_secs(30); diff --git a/sn_networking/src/error.rs b/sn_networking/src/error.rs index cbaa4db731..103a79a9a3 100644 --- a/sn_networking/src/error.rs +++ b/sn_networking/src/error.rs @@ -138,8 +138,8 @@ pub enum NetworkError { // ---------- Spend Errors #[error("Spend not found: {0:?}")] NoSpendFoundInsideRecord(SpendAddress), - #[error("A double spend was detected. Two diverging signed spends: {0:?}, {1:?}")] - DoubleSpendAttempt(Box, Box), + #[error("Double spend(s) was detected. The signed spends are: {0:?}")] + DoubleSpendAttempt(Vec), // ---------- Store Error #[error("No Store Cost Responses")] diff --git a/sn_networking/src/event/kad.rs b/sn_networking/src/event/kad.rs index ec941a933e..6060eafe81 100644 --- a/sn_networking/src/event/kad.rs +++ b/sn_networking/src/event/kad.rs @@ -412,10 +412,8 @@ impl SwarmDriver { } if !accumulated_spends.is_empty() { info!("For record {pretty_key:?} task {query_id:?}, found split record for a spend, accumulated and sending them as a single record"); - let accumulated_spends = accumulated_spends - .into_iter() - .take(2) - .collect::>(); + let accumulated_spends = + accumulated_spends.into_iter().collect::>(); let bytes = try_serialize_record(&accumulated_spends, RecordKind::Spend)?; diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 4e7246f95f..848c8210a8 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -36,7 +36,9 @@ pub use target_arch::{interval, sleep, spawn, Instant, Interval}; pub use self::{ cmd::{NodeIssue, SwarmLocalState}, - driver::{GetRecordCfg, NetworkBuilder, PutRecordCfg, SwarmDriver, VerificationKind}, + driver::{ + GetRecordCfg, NetworkBuilder, PutRecordCfg, SwarmDriver, VerificationKind, MAX_PACKET_SIZE, + }, error::{GetRecordError, NetworkError}, event::{MsgResponder, NetworkEvent}, record_store::{calculate_cost_for_records, NodeRecordStore}, diff --git a/sn_networking/src/record_store.rs b/sn_networking/src/record_store.rs index 96eacbb3ea..fd397b6af7 100644 --- a/sn_networking/src/record_store.rs +++ b/sn_networking/src/record_store.rs @@ -7,6 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. #![allow(clippy::mutable_key_type)] // for the Bytes in NetworkAddress +use crate::driver::MAX_PACKET_SIZE; use crate::target_arch::{spawn, Instant}; use crate::CLOSE_GROUP_SIZE; use crate::{cmd::SwarmCmd, event::NetworkEvent, log_markers::Marker, send_swarm_cmd}; @@ -109,7 +110,7 @@ impl Default for NodeRecordStoreConfig { storage_dir: historic_quote_dir.clone(), historic_quote_dir, max_records: MAX_RECORDS_COUNT, - max_value_bytes: 65 * 1024, + max_value_bytes: MAX_PACKET_SIZE, } } } diff --git a/sn_networking/src/transfers.rs b/sn_networking/src/transfers.rs index 15e739a595..e03dcff456 100644 --- a/sn_networking/src/transfers.rs +++ b/sn_networking/src/transfers.rs @@ -236,21 +236,22 @@ pub fn get_signed_spend_from_record( address: &SpendAddress, record: &Record, ) -> Result { - match get_raw_signed_spends_from_record(record)?.as_slice() { - [one, two, ..] => { - warn!("Found double spend for {address:?}"); - Err(NetworkError::DoubleSpendAttempt( - Box::new(one.to_owned()), - Box::new(two.to_owned()), - )) + let spends = get_raw_signed_spends_from_record(record)?; + match spends.as_slice() { + [] => { + error!("Found no spend for {address:?}"); + Err(NetworkError::NoSpendFoundInsideRecord(*address)) } [one] => { trace!("Spend get for address: {address:?} successful"); Ok(one.clone()) } - _ => { - trace!("Found no spend for {address:?}"); - Err(NetworkError::NoSpendFoundInsideRecord(*address)) + _double_spends => { + warn!( + "Found double spend(s) of len {} for {address:?}", + spends.len() + ); + Err(NetworkError::DoubleSpendAttempt(spends)) } } } diff --git a/sn_node/src/put_validation.rs b/sn_node/src/put_validation.rs index 3bbea878d2..e817f457e6 100644 --- a/sn_node/src/put_validation.rs +++ b/sn_node/src/put_validation.rs @@ -8,7 +8,9 @@ use crate::{node::Node, quote::verify_quote_for_storecost, Error, Marker, Result}; use libp2p::kad::{Record, RecordKey}; -use sn_networking::{get_raw_signed_spends_from_record, GetRecordError, NetworkError}; +use sn_networking::{ + get_raw_signed_spends_from_record, GetRecordError, NetworkError, MAX_PACKET_SIZE, +}; use sn_protocol::{ storage::{ try_deserialize_record, try_serialize_record, Chunk, RecordHeader, RecordKind, RecordType, @@ -25,6 +27,12 @@ use std::collections::BTreeSet; use tokio::task::JoinSet; use xor_name::XorName; +/// The maximum number of double spend attempts to store that we got from PUTs +const MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_FROM_PUTS: usize = 15; + +/// The maximum number of double spend attempts to store inside a record +const MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_PER_RECORD: usize = 30; + impl Node { /// Validate a record and it's payment, and store the record to the RecordStore pub(crate) async fn validate_and_store_record(&self, record: Record) -> Result<()> { @@ -90,7 +98,7 @@ impl Node { let value_to_hash = record.value.clone(); let spends = try_deserialize_record::>(&record)?; let result = self - .validate_merge_and_store_spends(spends, &record_key) + .validate_merge_and_store_spends(spends, &record_key, true) .await; if result.is_ok() { Marker::ValidSpendPutFromClient(&PrettyPrintRecordKey::from(&record_key)).log(); @@ -198,7 +206,7 @@ impl Node { RecordKind::Spend => { let record_key = record.key.clone(); let spends = try_deserialize_record::>(&record)?; - self.validate_merge_and_store_spends(spends, &record_key) + self.validate_merge_and_store_spends(spends, &record_key, false) .await } RecordKind::Register => { @@ -333,6 +341,7 @@ impl Node { &self, signed_spends: Vec, record_key: &RecordKey, + from_put: bool, ) -> Result<()> { let pretty_key = PrettyPrintRecordKey::from(record_key); debug!("Validating spends before storage at {pretty_key:?}"); @@ -370,8 +379,8 @@ impl Node { // validate the signed spends against the network and the local knowledge debug!("Validating spends for {pretty_key:?} with unique key: {unique_pubkey:?}"); - let (spend1, maybe_spend2) = match self - .signed_spends_to_keep(spends_for_key.clone(), *unique_pubkey) + let validated_spends = match self + .signed_spends_to_keep(spends_for_key.clone(), *unique_pubkey, from_put) .await { Ok(s) => s, @@ -380,12 +389,11 @@ impl Node { return Err(e); } }; - let validated_spends = maybe_spend2 - .clone() - .map(|spend2| vec![spend1.clone(), spend2]) - .unwrap_or_else(|| vec![spend1.clone()]); - let len = validated_spends.len(); - debug!("Got {len} validated spends with key: {unique_pubkey:?} at {pretty_key:?}"); + + debug!( + "Got {} validated spends with key: {unique_pubkey:?} at {pretty_key:?}", + validated_spends.len() + ); // store the record into the local storage let record = Record { @@ -401,8 +409,8 @@ impl Node { // Just log the double spend attempt. DoubleSpend error during PUT is not used and would just lead to // RecordRejected marker (which is incorrect, since we store double spends). - if maybe_spend2.is_some() { - warn!("Got a double spend for the Spend PUT with unique_pubkey {unique_pubkey}"); + if validated_spends.len() > 1 { + warn!("Got double spend(s) of len {} for the Spend PUT with unique_pubkey {unique_pubkey}", validated_spends.len()); } self.record_metrics(Marker::ValidSpendRecordPutFromNetwork(&pretty_key)); @@ -620,24 +628,41 @@ impl Node { } /// Determine which spends our node should keep and store - /// - checks if we already have local copies and trusts them to be valid - /// - downloads spends from the network as well - /// - verifies incoming spends before trusting them - /// - ignores received invalid spends - /// - returns the valid spends to store - /// - returns max 2 spends to store - /// - if we have more than 2 valid spends, returns the first 2 + /// - if our local copy has reached the len/size limits, we don't store anymore from kad::PUT and return the local copy + /// - else if the request is from replication OR if limit not reached during kad::PUT, then: + /// - trust local spends + /// - downloads spends from the network + /// - verifies incoming spend + network spends and ignores the invalid ones. + /// - orders all the verified spends from local + incoming + network + /// - returns a maximum of MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_PER_RECORD spends async fn signed_spends_to_keep( &self, signed_spends: Vec, unique_pubkey: UniquePubkey, - ) -> Result<(SignedSpend, Option)> { + from_put: bool, + ) -> Result> { let spend_addr = SpendAddress::from_unique_pubkey(&unique_pubkey); debug!( "Validating before storing spend at {spend_addr:?} with unique key: {unique_pubkey}" ); let local_spends = self.get_local_spends(spend_addr).await?; + let size_of_local_spends = try_serialize_record(&local_spends, RecordKind::Spend)? + .to_vec() + .len(); + let max_spend_len_reached = + local_spends.len() >= MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_FROM_PUTS; + let max_spend_size_reached = { + // todo: limit size of a single signed spend to < max_packet_size/2 + let size_limit = size_of_local_spends >= MAX_PACKET_SIZE / 2; + // just so that we can store the double spend + size_limit && local_spends.len() > 1 + }; + + if (max_spend_len_reached || max_spend_size_reached) && from_put { + info!("We already have {MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_FROM_PUTS} spends locally or have maximum size of spends, skipping spends received via PUT for {unique_pubkey:?}"); + return Ok(local_spends); + } let mut all_verified_spends = BTreeSet::from_iter(local_spends.into_iter()); // get spends from the network at the address for that unique pubkey @@ -691,28 +716,24 @@ impl Node { } } - // return the single unique spend to store - match all_verified_spends + // todo: should we also check the size of spends here? Maybe just limit the size of a single + // SignedSpend to < max_packet_size/2 so that we can store atleast 2 of them. + let verified_spends = all_verified_spends .into_iter() - .collect::>() - .as_slice() - { - [a] => { - debug!("Got a single valid spend for {unique_pubkey:?}"); - Ok((a.to_owned(), None)) - } - [a, b] => { - warn!("Got a double spend for {unique_pubkey:?}"); - Ok((a.to_owned(), Some(b.to_owned()))) - } - _ => { - debug!( - "No valid spends found while validating Spend PUT. Who is sending us garbage?" - ); - Err(Error::InvalidRequest(format!( - "Found no valid spends while validating Spend PUT for {unique_pubkey:?}" - ))) - } + .take(MAX_DOUBLE_SPEND_ATTEMPTS_TO_KEEP_PER_RECORD) + .collect::>(); + + if verified_spends.is_empty() { + debug!("No valid spends found while validating Spend PUT. Who is sending us garbage?"); + Err(Error::InvalidRequest(format!( + "Found no valid spends while validating Spend PUT for {unique_pubkey:?}" + ))) + } else if verified_spends.len() > 1 { + warn!("Got a double spend for {unique_pubkey:?}"); + Ok(verified_spends) + } else { + debug!("Got a single valid spend for {unique_pubkey:?}"); + Ok(verified_spends) } } }