diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index 54a1a31108..bcebd007a3 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -222,8 +222,10 @@ impl SpendDagDb { let spend_processing = if let Some(sk) = self.encryption_sk.clone() { let (tx, mut rx) = tokio::sync::mpsc::channel(SPENDS_PROCESSING_BUFFER_SIZE); tokio::spawn(async move { - while let Some(spend) = rx.recv().await { - self_clone.beta_background_process_spend(spend, &sk).await; + while let Some((spend, utxos_for_further_track)) = rx.recv().await { + self_clone + .beta_background_process_spend(spend, &sk, utxos_for_further_track) + .await; } }); Some(tx) @@ -232,14 +234,20 @@ impl SpendDagDb { None }; + let mut addrs_to_get = BTreeSet::new(); + loop { // get current utxos to fetch let now = Instant::now(); - let utxos_to_fetch; - (utxo_addresses, utxos_to_fetch) = utxo_addresses - .into_iter() - .partition(|(_address, time_stamp)| *time_stamp > now); - let addrs_to_get = utxos_to_fetch.keys().cloned().collect::>(); + + // Always track new outputs first + if addrs_to_get.is_empty() { + let utxos_to_fetch; + (utxo_addresses, utxos_to_fetch) = utxo_addresses + .into_iter() + .partition(|(_address, time_stamp)| *time_stamp > now); + addrs_to_get.extend(utxos_to_fetch.keys().cloned().collect::>()); + } if addrs_to_get.is_empty() { debug!( @@ -250,31 +258,40 @@ impl SpendDagDb { continue; } - let new_utxos = if cfg!(feature = "dag-collection") { - self.crawl_and_generate_local_dag( - addrs_to_get, - spend_processing.clone(), - client.clone(), - ) - .await + if cfg!(feature = "dag-collection") { + let new_utxos = self + .crawl_and_generate_local_dag( + addrs_to_get.clone(), + spend_processing.clone(), + client.clone(), + ) + .await; + addrs_to_get.clear(); + utxo_addresses.extend( + new_utxos + .into_iter() + .map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)), + ); } else if let Some(sender) = spend_processing.clone() { - client.crawl_to_next_utxos(addrs_to_get, sender).await? + let (reattempt_addrs, new_utxos) = + client.crawl_to_next_utxos(&addrs_to_get, sender).await?; + utxo_addresses.extend( + reattempt_addrs + .into_iter() + .map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)), + ); + addrs_to_get.clear(); + addrs_to_get.extend(new_utxos); } else { panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments."); }; - - utxo_addresses.extend( - new_utxos - .into_iter() - .map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)), - ); } } async fn crawl_and_generate_local_dag( &self, from: BTreeSet, - spend_processing: Option>, + spend_processing: Option>, client: Client, ) -> BTreeSet { // get a copy of the current DAG @@ -309,13 +326,16 @@ impl SpendDagDb { } /// Process each spend and update beta rewards data - pub async fn beta_background_process_spend(&self, spend: SignedSpend, sk: &SecretKey) { + pub async fn beta_background_process_spend( + &self, + spend: SignedSpend, + sk: &SecretKey, + utxos_for_further_track: u64, + ) { let mut beta_tracking = self.beta_tracking.write().await; beta_tracking.processed_spends += 1; beta_tracking.total_accumulated_utxo += spend.spend.spent_tx.outputs.len() as u64; - // TODO: currently all royalty and payment_forward output will be tracked - // correct this metrics once this got optimized - beta_tracking.total_on_track_utxo += spend.spend.spent_tx.outputs.len() as u64; + beta_tracking.total_on_track_utxo += utxos_for_further_track; // check for beta rewards reason let user_name_hash = match spend.reason().get_sender_hash(sk) { @@ -331,15 +351,15 @@ impl SpendDagDb { let beta_participants_read = self.beta_participants.read().await; if let Some(user_name) = beta_participants_read.get(&user_name_hash) { - trace!("Got forwarded reward from {user_name} of {amount} at {addr:?}"); + trace!("Got forwarded reward {amount} from {user_name} of {amount} at {addr:?}"); beta_tracking .forwarded_payments .entry(user_name.to_owned()) .or_default() .insert((addr, amount)); } else { - warn!("Found a forwarded reward for an unknown participant at {addr:?}: {user_name_hash:?}"); - eprintln!("Found a forwarded reward for an unknown participant at {addr:?}: {user_name_hash:?}"); + warn!("Found a forwarded reward {amount} for an unknown participant at {addr:?}: {user_name_hash:?}"); + eprintln!("Found a forwarded reward {amount} for an unknown participant at {addr:?}: {user_name_hash:?}"); beta_tracking .forwarded_payments .entry(format!("unknown participant: {user_name_hash:?}")) diff --git a/sn_auditor/src/main.rs b/sn_auditor/src/main.rs index 4398a17c14..301be702ab 100644 --- a/sn_auditor/src/main.rs +++ b/sn_auditor/src/main.rs @@ -204,8 +204,10 @@ async fn initialize_background_spend_dag_collection( .map_err(|e| eyre!("Could not create SpendDag Db: {e}"))?; // optional force restart from genesis and merge into our current DAG - if force_from_genesis { + // feature guard to prevent a mis-use of opt + if force_from_genesis && cfg!(feature = "dag-collection") { println!("Forcing DAG to be updated from genesis..."); + warn!("Forcing DAG to be updated from genesis..."); let mut d = dag.clone(); let mut genesis_dag = client .new_dag_with_genesis_only() diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index c159576777..2a3dd6b7f4 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -11,7 +11,8 @@ use crate::{Client, Error, SpendDag}; use futures::{future::join_all, StreamExt}; use sn_networking::{GetRecordError, NetworkError}; use sn_transfers::{ - SignedSpend, SpendAddress, WalletError, WalletResult, GENESIS_SPEND_UNIQUE_KEY, + SignedSpend, SpendAddress, SpendReason, WalletError, WalletResult, GENESIS_SPEND_UNIQUE_KEY, + NETWORK_ROYALTIES_PK, }; use std::collections::BTreeSet; use tokio::sync::mpsc::Sender; @@ -61,7 +62,7 @@ impl Client { pub async fn spend_dag_build_from( &self, spend_addr: SpendAddress, - spend_processing: Option>, + spend_processing: Option>, verify: bool, ) -> WalletResult { let (tx, mut rx) = tokio::sync::mpsc::channel(SPENDS_PROCESSING_BUFFER_SIZE); @@ -86,8 +87,9 @@ impl Client { ); dag.insert(addr, spend.clone()); if let Some(sender) = &spend_processing { + let outputs = spend.spend.spent_tx.outputs.len() as u64; sender - .send(spend) + .send((spend, outputs)) .await .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; } @@ -127,28 +129,51 @@ impl Client { Ok(dag) } - /// Crawls the Spend Dag from a set of given SpendAddresses recursively - /// following descendants all the way to UTXOs - /// Returns all the UTXOs reached + /// Get spends for a set of given SpendAddresses + /// Notifies the UTXOs that need to be further tracked down. + /// returns: (addresses_for_reattempt, new_utxos_for_furthertracking) pub async fn crawl_to_next_utxos( &self, - from: BTreeSet, - spend_processing: Sender, - ) -> WalletResult> { - let tasks: Vec<_> = from - .iter() - .map(|a| self.spend_dag_crawl_from(*a, spend_processing.clone())) - .collect(); - let res = futures::future::join_all(tasks).await; + from: &BTreeSet, + spend_processing: Sender<(SignedSpend, u64)>, + ) -> WalletResult<(BTreeSet, BTreeSet)> { + let spends = join_all(from.iter().map(|&address| { + let client_clone = self.clone(); + async move { (client_clone.crawl_spend(address).await, address) } + })) + .await; + + let mut failed_utxos = BTreeSet::new(); let mut new_utxos = BTreeSet::new(); - for r in res.into_iter() { - match r { - Ok(utxos) => new_utxos.extend(utxos), - Err(e) => return Err(e), - } + + for (result, address) in spends { + let spend = match result { + InternalGetNetworkSpend::Spend(s) => *s, + InternalGetNetworkSpend::DoubleSpend(_s1, _s2) => { + warn!("Detected double spend regarding {address:?}"); + continue; + } + InternalGetNetworkSpend::NotFound => { + let _ = failed_utxos.insert(address); + continue; + } + InternalGetNetworkSpend::Error(e) => { + warn!("Got a fetching error {e:?}"); + continue; + } + }; + + let for_further_track = beta_track_analyze_spend(&spend); + + spend_processing + .send((spend, for_further_track.len() as u64)) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; + + new_utxos.extend(for_further_track); } - Ok(new_utxos) + Ok((failed_utxos, new_utxos)) } /// Crawls the Spend Dag from a given SpendAddress recursively @@ -410,7 +435,7 @@ impl Client { &self, dag: &mut SpendDag, utxos: BTreeSet, - spend_processing: Option>, + spend_processing: Option>, verify: bool, ) { let main_dag_src = dag.source(); @@ -446,7 +471,7 @@ impl Client { pub async fn spend_dag_continue_from_utxos( &self, dag: &mut SpendDag, - spend_processing: Option>, + spend_processing: Option>, verify: bool, ) { let utxos = dag.get_utxos(); @@ -481,3 +506,41 @@ impl Client { } } } + +/// Helper function to analyze spend for beta_tracking optimization. +/// returns the new_utxos that needs to be further tracked. +fn beta_track_analyze_spend(spend: &SignedSpend) -> BTreeSet { + // Filter out royalty outputs + let royalty_pubkeys: BTreeSet<_> = spend + .spend + .network_royalties + .iter() + .map(|derivation_idx| NETWORK_ROYALTIES_PK.new_unique_pubkey(derivation_idx)) + .collect(); + + let new_utxos: BTreeSet<_> = spend + .spend + .spent_tx + .outputs + .iter() + .filter_map(|output| { + if !royalty_pubkeys.contains(&output.unique_pubkey) { + Some(SpendAddress::from_unique_pubkey(&output.unique_pubkey)) + } else { + None + } + }) + .collect(); + + if let SpendReason::BetaRewardTracking(_) = spend.reason() { + // Do not track down forwarded payment further + Default::default() + } else { + trace!( + "Spend original has {} outputs, tracking {} of them.", + spend.spend.spent_tx.outputs.len(), + new_utxos.len() + ); + new_utxos + } +}