diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index bcebd007a3..f263c3b2dc 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -34,7 +34,7 @@ lazy_static! { /// time in seconds UTXOs are refetched in DAG crawl static ref UTXO_REATTEMPT_INTERVAL: Duration = Duration::from_secs( std::env::var("UTXO_REATTEMPT_INTERVAL") - .unwrap_or("3600".to_string()) + .unwrap_or("1800".to_string()) .parse::() .unwrap_or(300) ); @@ -237,17 +237,14 @@ impl SpendDagDb { let mut addrs_to_get = BTreeSet::new(); loop { - // get current utxos to fetch + // `addrs_to_get` is always empty when reaching this point + // get expired utxos for the further fetch + let utxos_to_fetch; let now = Instant::now(); - - // Always track new outputs first - if addrs_to_get.is_empty() { - let utxos_to_fetch; - (utxo_addresses, utxos_to_fetch) = utxo_addresses - .into_iter() - .partition(|(_address, time_stamp)| *time_stamp > now); - addrs_to_get.extend(utxos_to_fetch.keys().cloned().collect::>()); - } + (utxo_addresses, utxos_to_fetch) = utxo_addresses + .into_iter() + .partition(|(_address, time_stamp)| *time_stamp > now); + addrs_to_get.extend(utxos_to_fetch.keys().cloned().collect::>()); if addrs_to_get.is_empty() { debug!( @@ -273,15 +270,16 @@ impl SpendDagDb { .map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)), ); } else if let Some(sender) = spend_processing.clone() { - let (reattempt_addrs, new_utxos) = - client.crawl_to_next_utxos(&addrs_to_get, sender).await?; - utxo_addresses.extend( - reattempt_addrs - .into_iter() - .map(|a| (a, Instant::now() + *UTXO_REATTEMPT_INTERVAL)), - ); - addrs_to_get.clear(); - addrs_to_get.extend(new_utxos); + if let Ok(reattempt_addrs) = client + .crawl_to_next_utxos( + &mut addrs_to_get, + sender.clone(), + *UTXO_REATTEMPT_INTERVAL, + ) + .await + { + utxo_addresses.extend(reattempt_addrs); + } } else { panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments."); }; diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index 2a3dd6b7f4..ce9609380a 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -14,8 +14,11 @@ use sn_transfers::{ SignedSpend, SpendAddress, SpendReason, WalletError, WalletResult, GENESIS_SPEND_UNIQUE_KEY, NETWORK_ROYALTIES_PK, }; -use std::collections::BTreeSet; -use tokio::sync::mpsc::Sender; +use std::{ + collections::{BTreeMap, BTreeSet}, + time::{Duration, Instant}, +}; +use tokio::{sync::mpsc::Sender, task::JoinSet}; const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096; @@ -129,51 +132,51 @@ impl Client { Ok(dag) } - /// Get spends for a set of given SpendAddresses - /// Notifies the UTXOs that need to be further tracked down. - /// returns: (addresses_for_reattempt, new_utxos_for_furthertracking) + /// Get spends from a set of given SpendAddresses + /// Recursivly fetching till reached frontline of the DAG tree. + /// Return with UTXOs for re-attempt (with insertion time stamp) pub async fn crawl_to_next_utxos( &self, - from: &BTreeSet, - spend_processing: Sender<(SignedSpend, u64)>, - ) -> WalletResult<(BTreeSet, BTreeSet)> { - let spends = join_all(from.iter().map(|&address| { - let client_clone = self.clone(); - async move { (client_clone.crawl_spend(address).await, address) } - })) - .await; - - let mut failed_utxos = BTreeSet::new(); - let mut new_utxos = BTreeSet::new(); - - for (result, address) in spends { - let spend = match result { - InternalGetNetworkSpend::Spend(s) => *s, - InternalGetNetworkSpend::DoubleSpend(_s1, _s2) => { - warn!("Detected double spend regarding {address:?}"); - continue; - } - InternalGetNetworkSpend::NotFound => { - let _ = failed_utxos.insert(address); - continue; - } - InternalGetNetworkSpend::Error(e) => { - warn!("Got a fetching error {e:?}"); - continue; + addrs_to_get: &mut BTreeSet, + sender: Sender<(SignedSpend, u64)>, + reattempt_interval: Duration, + ) -> WalletResult> { + let mut failed_utxos = BTreeMap::new(); + let mut tasks = JoinSet::new(); + + while !addrs_to_get.is_empty() || !tasks.is_empty() { + while tasks.len() < 32 && !addrs_to_get.is_empty() { + if let Some(addr) = addrs_to_get.pop_first() { + let client_clone = self.clone(); + let _ = + tasks.spawn(async move { (client_clone.crawl_spend(addr).await, addr) }); } - }; - - let for_further_track = beta_track_analyze_spend(&spend); - - spend_processing - .send((spend, for_further_track.len() as u64)) - .await - .map_err(|e| WalletError::SpendProcessing(e.to_string()))?; + } - new_utxos.extend(for_further_track); + if let Some(Ok((result, address))) = tasks.join_next().await { + match result { + InternalGetNetworkSpend::Spend(spend) => { + let for_further_track = beta_track_analyze_spend(&spend); + let _ = sender + .send((*spend, for_further_track.len() as u64)) + .await + .map_err(|e| WalletError::SpendProcessing(e.to_string())); + addrs_to_get.extend(for_further_track); + } + InternalGetNetworkSpend::DoubleSpend(_s1, _s2) => { + warn!("Detected double spend regarding {address:?}"); + } + InternalGetNetworkSpend::NotFound => { + let _ = failed_utxos.insert(address, Instant::now() + reattempt_interval); + } + InternalGetNetworkSpend::Error(e) => { + warn!("Got a fetching error {e:?}"); + } + } + } } - Ok((failed_utxos, new_utxos)) + Ok(failed_utxos) } /// Crawls the Spend Dag from a given SpendAddress recursively