diff --git a/Cargo.lock b/Cargo.lock index 73678613d8..a30d9ff963 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6968,6 +6968,7 @@ dependencies = [ "tiny_http", "tokio", "tracing", + "urlencoding", ] [[package]] diff --git a/sn_auditor/Cargo.toml b/sn_auditor/Cargo.toml index 75448db46c..0ddd5cdae3 100644 --- a/sn_auditor/Cargo.toml +++ b/sn_auditor/Cargo.toml @@ -18,7 +18,8 @@ local-discovery = [ network-contacts = ["sn_peers_acquisition/network-contacts"] open-metrics = ["sn_client/open-metrics"] websockets = ["sn_client/websockets"] -svg-dag = ["graphviz-rust"] +svg-dag = ["graphviz-rust", "dag-collection"] +dag-collection = [] [dependencies] bls = { package = "blsttc", version = "8.0.1" } @@ -42,3 +43,4 @@ tokio = { version = "1.32.0", features = [ "time", "fs", ] } +urlencoding = "2.1.3" \ No newline at end of file diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index ea1edc70d2..897bdc9727 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -13,17 +13,14 @@ use color_eyre::eyre::{bail, eyre, Result}; #[cfg(feature = "svg-dag")] use graphviz_rust::{cmd::Format, exec, parse, printer::PrinterContext}; use serde::{Deserialize, Serialize}; -use sn_client::networking::NetworkError; -use sn_client::transfers::{Hash, NanoTokens, SignedSpend, SpendAddress, GENESIS_SPEND_UNIQUE_KEY}; -use sn_client::Error as ClientError; +use sn_client::transfers::{Hash, NanoTokens, SignedSpend, SpendAddress}; use sn_client::{Client, SpendDag, SpendDagGet}; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Write; -use std::time::{SystemTime, UNIX_EPOCH}; -use std::{ - path::PathBuf, - sync::{Arc, RwLock}, -}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use tokio::sync::RwLock; pub const SPEND_DAG_FILENAME: &str = "spend_dag"; #[cfg(feature = "svg-dag")] @@ -31,6 +28,9 @@ pub const SPEND_DAG_SVG_FILENAME: &str = "spend_dag.svg"; /// Store a locally copy to restore on restart pub const BETA_PARTICIPANTS_FILENAME: &str = "beta_participants.txt"; +const REATTEMPT_INTERVAL: Duration = Duration::from_secs(3600); +const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096; + /// Abstraction for the Spend DAG database /// Currently in memory, with disk backup, but should probably be a real DB at scale #[derive(Clone)] @@ -39,7 +39,7 @@ pub struct SpendDagDb { pub(crate) path: PathBuf, dag: Arc>, forwarded_payments: Arc>, - pub(crate) beta_participants: Arc>>, + beta_participants: Arc>>, encryption_sk: Option, } @@ -72,7 +72,7 @@ impl SpendDagDb { } Err(_) => { println!("Found no local spend DAG file, starting from Genesis"); - new_dag_with_genesis_only(&client).await? + client.new_dag_with_genesis_only().await? } }; @@ -109,11 +109,9 @@ impl SpendDagDb { } /// Get info about a single spend in JSON format - pub fn spend_json(&self, address: SpendAddress) -> Result { + pub async fn spend_json(&self, address: SpendAddress) -> Result { let dag_ref = self.dag.clone(); - let r_handle = dag_ref - .read() - .map_err(|e| eyre!("Failed to get read lock: {e}"))?; + let r_handle = dag_ref.read().await; let spend = r_handle.get_spend(&address); let faults = r_handle.get_spend_faults(&address); let fault = if faults.is_empty() { @@ -144,13 +142,11 @@ impl SpendDagDb { } /// Dump DAG to disk - pub fn dump(&self) -> Result<()> { + pub async fn dump(&self) -> Result<()> { std::fs::create_dir_all(&self.path)?; let dag_path = self.path.join(SPEND_DAG_FILENAME); let dag_ref = self.dag.clone(); - let r_handle = dag_ref - .read() - .map_err(|e| eyre!("Failed to get read lock: {e}"))?; + let r_handle = dag_ref.read().await; r_handle.dump_to_file(dag_path)?; Ok(()) } @@ -166,89 +162,146 @@ impl SpendDagDb { /// Dump current DAG as svg to disk #[cfg(feature = "svg-dag")] - pub fn dump_dag_svg(&self) -> Result<()> { + pub async fn dump_dag_svg(&self) -> Result<()> { info!("Dumping DAG to svg..."); std::fs::create_dir_all(&self.path)?; let svg_path = self.path.join(SPEND_DAG_SVG_FILENAME); let dag_ref = self.dag.clone(); - let r_handle = dag_ref - .read() - .map_err(|e| eyre!("Failed to get read lock: {e}"))?; + let r_handle = dag_ref.read().await; let svg = dag_to_svg(&r_handle)?; std::fs::write(svg_path.clone(), svg)?; info!("Successfully dumped DAG to {svg_path:?}..."); Ok(()) } - /// Update DAG from Network - pub async fn update(&mut self) -> Result<()> { - // read current DAG - let mut dag = { - self.dag - .clone() - .read() - .map_err(|e| eyre!("Failed to get read lock: {e}"))? - .clone() + /// Update DAG from Network continuously + pub async fn continuous_background_update(self) -> Result<()> { + let client = if let Some(client) = &self.client { + client.clone() + } else { + bail!("Cannot update DAG in offline mode") }; - // update that copy 10 generations further - const NEXT_10_GEN: u32 = 10; - self.client - .clone() - .ok_or(eyre!("Cannot update in offline mode"))? - .spend_dag_continue_from_utxos(&mut dag, Some(NEXT_10_GEN), true) - .await?; - - // write update to DAG - let mut dag_w_handle = self - .dag - .write() - .map_err(|e| eyre!("Failed to get write lock: {e}"))?; - *dag_w_handle = dag; - std::mem::drop(dag_w_handle); - - #[cfg(feature = "svg-dag")] - { - // update and save svg to file in a background thread so we don't block - // - let self_clone = self.clone(); + // init utxos to fetch + let start_dag = { self.dag.clone().read().await.clone() }; + let mut utxo_addresses: BTreeMap = start_dag + .get_utxos() + .into_iter() + .map(|a| (a, Instant::now())) + .collect(); + + // beta rewards processing + let self_clone = self.clone(); + let spend_processing = if let Some(sk) = self.encryption_sk.clone() { + let (tx, mut rx) = tokio::sync::mpsc::channel(SPENDS_PROCESSING_BUFFER_SIZE); tokio::spawn(async move { - if let Err(e) = self_clone.dump_dag_svg() { - error!("Failed to dump DAG svg: {e}"); + while let Some(spend) = rx.recv().await { + self_clone.beta_background_process_spend(spend, &sk).await; } }); + Some(tx) + } else { + eprintln!("Foundation secret key not set! Beta rewards will not be processed."); + None + }; + + loop { + // get current utxos to fetch + let now = Instant::now(); + let utxos_to_fetch; + (utxo_addresses, utxos_to_fetch) = utxo_addresses + .into_iter() + .partition(|(_address, time_stamp)| *time_stamp > now); + let addrs_to_get = utxos_to_fetch.keys().cloned().collect::>(); + + if addrs_to_get.is_empty() { + debug!("Sleeping for {REATTEMPT_INTERVAL:?} until next re-attempt..."); + tokio::time::sleep(REATTEMPT_INTERVAL).await; + continue; + } + + // get a copy of the current DAG + let mut dag = { self.dag.clone().read().await.clone() }; + + // update it + client + .spend_dag_continue_from(&mut dag, addrs_to_get, spend_processing.clone(), true) + .await; + + // update utxos + let new_utxos = dag.get_utxos(); + utxo_addresses.extend( + new_utxos + .into_iter() + .map(|a| (a, Instant::now() + REATTEMPT_INTERVAL)), + ); + + // write updates to local DAG and save to disk + let mut dag_w_handle = self.dag.write().await; + *dag_w_handle = dag; + std::mem::drop(dag_w_handle); + if let Err(e) = self.dump().await { + error!("Failed to dump DAG: {e}"); + } + + // update and save svg to file in a background thread so we don't block + #[cfg(feature = "svg-dag")] + { + let self_clone = self.clone(); + tokio::spawn(async move { + if let Err(e) = self_clone.dump_dag_svg().await { + error!("Failed to dump DAG svg: {e}"); + } + }); + } } + } - // gather forwarded payments in a background thread so we don't block - let self_clone = self.clone(); - tokio::spawn(async move { - if let Err(e) = self_clone.gather_forwarded_payments().await { - error!("Failed to gather forwarded payments: {e}"); + /// Process each spend and update beta rewards data + pub async fn beta_background_process_spend(&self, spend: SignedSpend, sk: &SecretKey) { + // check for beta rewards reason + let user_name_hash = match spend.reason().get_sender_hash(sk) { + Some(n) => n, + None => { + return; } - }); + }; - Ok(()) + // add to local rewards + let addr = spend.address(); + let amount = spend.spend.amount; + let beta_participants_read = self.beta_participants.read().await; + let mut self_payments = self.forwarded_payments.write().await; + + if let Some(user_name) = beta_participants_read.get(&user_name_hash) { + trace!("Got forwarded reward from {user_name} of {amount} at {addr:?}"); + self_payments + .entry(user_name.to_owned()) + .or_default() + .insert((addr, amount)); + } else { + warn!("Found a forwarded reward for an unknown participant at {addr:?}: {user_name_hash:?}"); + eprintln!("Found a forwarded reward for an unknown participant at {addr:?}: {user_name_hash:?}"); + self_payments + .entry(format!("unknown participant: {user_name_hash:?}")) + .or_default() + .insert((addr, amount)); + } } /// Merge a SpendDag into the current DAG /// This can be used to enrich our DAG with a DAG from another node to avoid costly computations /// Make sure to verify the other DAG is trustworthy before calling this function to merge it in - pub fn merge(&mut self, other: SpendDag) -> Result<()> { - let mut w_handle = self - .dag - .write() - .map_err(|e| eyre!("Failed to get write lock: {e}"))?; + pub async fn merge(&mut self, other: SpendDag) -> Result<()> { + let mut w_handle = self.dag.write().await; w_handle.merge(other, true)?; Ok(()) } /// Returns the current state of the beta program in JSON format, including total rewards for each participant - pub(crate) fn beta_program_json(&self) -> Result { + pub(crate) async fn beta_program_json(&self) -> Result { let r_handle = self.forwarded_payments.clone(); - let beta_rewards = r_handle.read(); - - let participants = - beta_rewards.map_err(|e| eyre!("Failed to get beta rewards read lock: {e}"))?; + let participants = r_handle.read().await; let mut rewards_output = vec![]; for (participant, rewards) in participants.iter() { let total_rewards = rewards @@ -263,13 +316,13 @@ impl SpendDagDb { } /// Track new beta participants. This just add the participants to the list of tracked participants. - pub(crate) fn track_new_beta_participants(&self, participants: Vec) -> Result<()> { + pub(crate) async fn track_new_beta_participants( + &self, + participants: BTreeSet, + ) -> Result<()> { // track new participants { - let mut beta_participants = self - .beta_participants - .write() - .map_err(|e| eyre!("Failed to get beta participants write lock: {e}"))?; + let mut beta_participants = self.beta_participants.write().await; beta_participants.extend( participants .iter() @@ -278,95 +331,28 @@ impl SpendDagDb { } // initialize forwarded payments { - let mut fwd_payments = self - .forwarded_payments - .write() - .map_err(|e| eyre!("Failed to get forwarded payments write lock: {e}"))?; + let mut fwd_payments = self.forwarded_payments.write().await; fwd_payments.extend(participants.into_iter().map(|p| (p, BTreeSet::new()))); } Ok(()) } /// Check if a participant is being tracked - pub(crate) fn is_participant_tracked(&self, discord_id: &str) -> Result { - let beta_participants = self - .beta_participants - .read() - .map_err(|e| eyre!("Failed to get beta participants read lock: {e}"))?; + pub(crate) async fn is_participant_tracked(&self, discord_id: &str) -> Result { + let beta_participants = self.beta_participants.read().await; + debug!("Existing beta participants: {beta_participants:?}"); + + debug!( + "Adding new beta participants: {discord_id}, {:?}", + Hash::hash(discord_id.as_bytes()) + ); Ok(beta_participants.contains_key(&Hash::hash(discord_id.as_bytes()))) } - /// Initialize reward forward tracking, gathers current rewards from the DAG - pub(crate) async fn init_reward_forward_tracking( - &self, - participants: Vec, - ) -> Result<()> { - self.track_new_beta_participants(participants)?; - self.gather_forwarded_payments().await?; - Ok(()) - } - - // Gather forwarded payments from the DAG - pub(crate) async fn gather_forwarded_payments(&self) -> Result<()> { - info!("Gathering forwarded payments..."); - - // make sure we have the foundation secret key - let sk = self - .encryption_sk - .clone() - .ok_or_else(|| eyre!("Foundation secret key not set"))?; - - // get spends from current DAG - let dag = self.dag.read().map_err(|e| { - eyre!("Failed to get dag read lock for gathering forwarded payments: {e}") - })?; - let all_spends = dag.all_spends(); - - // find spends with payments - let mut payments: ForwardedPayments = BTreeMap::new(); - for spend in all_spends { - let user_name_hash = match spend.reason().get_sender_hash(&sk) { - Some(n) => n, - None => continue, - }; - let addr = spend.address(); - let amount = spend.spend.amount; - let beta_participants_read = self - .beta_participants - .read() - .map_err(|e| eyre!("Failed to get payments write lock: {e}"))?; - if let Some(user_name) = beta_participants_read.get(&user_name_hash) { - debug!("Got forwarded reward from {user_name} of {amount} at {addr:?}"); - payments - .entry(user_name.to_owned()) - .or_default() - .insert((addr, amount)); - } else { - info!( - "Found a forwarded reward for an unknown participant at {:?}: {user_name_hash:?}", - spend.address() - ); - payments - .entry(format!("unknown participant: {user_name_hash:?}")) - .or_default() - .insert((addr, amount)); - } - } - - // save new payments - let mut self_payments = self - .forwarded_payments - .write() - .map_err(|e| eyre!("Failed to get payments write lock: {e}"))?; - self_payments.extend(payments); - info!("Done gathering forwarded payments"); - Ok(()) - } - /// Backup beta rewards to a timestamped json file - pub(crate) fn backup_rewards(&self) -> Result<()> { + pub(crate) async fn backup_rewards(&self) -> Result<()> { info!("Beta rewards backup requested"); - let json = match self.beta_program_json() { + let json = match self.beta_program_json().await { Ok(j) => j, Err(e) => bail!("Failed to get beta rewards json: {e}"), }; @@ -384,25 +370,6 @@ impl SpendDagDb { } } -pub async fn new_dag_with_genesis_only(client: &Client) -> Result { - let genesis_addr = SpendAddress::from_unique_pubkey(&GENESIS_SPEND_UNIQUE_KEY); - let mut dag = SpendDag::new(genesis_addr); - let genesis_spend = match client.get_spend_from_network(genesis_addr).await { - Ok(s) => s, - Err(ClientError::Network(NetworkError::DoubleSpendAttempt(spend1, spend2))) => { - let addr = spend1.address(); - println!("Double spend detected at Genesis: {addr:?}"); - dag.insert(genesis_addr, *spend2); - dag.record_faults(&dag.source())?; - *spend1 - } - Err(e) => return Err(eyre!("Failed to get genesis spend: {e}")), - }; - dag.insert(genesis_addr, genesis_spend); - - Ok(dag) -} - #[cfg(feature = "svg-dag")] fn dag_to_svg(dag: &SpendDag) -> Result> { let dot = dag.dump_dot_format(); diff --git a/sn_auditor/src/main.rs b/sn_auditor/src/main.rs index f8c7da2cbd..0f7976ff0a 100644 --- a/sn_auditor/src/main.rs +++ b/sn_auditor/src/main.rs @@ -20,14 +20,12 @@ use sn_client::Client; use sn_logging::{Level, LogBuilder, LogFormat, LogOutputDest}; use sn_peers_acquisition::get_peers_from_args; use sn_peers_acquisition::PeersArgs; +use std::collections::BTreeSet; use std::path::PathBuf; use tiny_http::{Response, Server}; -/// Interval in seconds to update the DAG, save to disk, and update beta participants -const DAG_UPDATE_INTERVAL_SECS: u64 = 5 * 60; - /// Backup the beta rewards in a timestamped json file -const BETA_REWARDS_BACKOUP_INTERVAL_SECS: u64 = 3 * 60 * 60; +const BETA_REWARDS_BACKOUP_INTERVAL_SECS: u64 = 20 * 60; #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -97,7 +95,7 @@ async fn main() -> Result<()> { if let Some(dag_to_view) = opt.offline_viewer { let dag = SpendDagDb::offline(dag_to_view, maybe_sk)?; #[cfg(feature = "svg-dag")] - dag.dump_dag_svg()?; + dag.dump_dag_svg().await?; start_server(dag).await?; return Ok(()); @@ -173,7 +171,7 @@ fn initialize_background_rewards_backup(dag: SpendDagDb) { .await; println!("Backing up beta rewards..."); - if let Err(e) = dag.backup_rewards() { + if let Err(e) = dag.backup_rewards().await { eprintln!("Failed to backup beta rewards: {e}"); } } @@ -187,7 +185,7 @@ async fn initialize_background_spend_dag_collection( client: Client, force_from_genesis: bool, clean: bool, - beta_participants: Vec, + beta_participants: BTreeSet, foundation_sk: Option, ) -> Result { println!("Initialize spend dag..."); @@ -209,23 +207,24 @@ async fn initialize_background_spend_dag_collection( if force_from_genesis { println!("Forcing DAG to be updated from genesis..."); let mut d = dag.clone(); - let mut genesis_dag = dag_db::new_dag_with_genesis_only(&client) + let mut genesis_dag = client + .new_dag_with_genesis_only() .await .map_err(|e| eyre!("Could not create new DAG from genesis: {e}"))?; tokio::spawn(async move { - let _ = client + client .spend_dag_continue_from_utxos(&mut genesis_dag, None, true) - .await - .map_err(|e| eprintln!("Could not update DAG from genesis: {e}")); + .await; let _ = d .merge(genesis_dag) + .await .map_err(|e| eprintln!("Failed to merge from genesis DAG into our DAG: {e}")); }); } // initialize svg #[cfg(feature = "svg-dag")] - dag.dump_dag_svg()?; + dag.dump_dag_svg().await?; // initialize beta rewards program tracking if !beta_participants.is_empty() { @@ -234,7 +233,7 @@ async fn initialize_background_spend_dag_collection( }; println!("Initializing beta rewards program tracking..."); - if let Err(e) = dag.init_reward_forward_tracking(beta_participants).await { + if let Err(e) = dag.track_new_beta_participants(beta_participants).await { eprintln!("Could not initialize beta rewards: {e}"); return Err(e); } @@ -242,20 +241,12 @@ async fn initialize_background_spend_dag_collection( // background thread to update DAG println!("Starting background DAG collection thread..."); - let mut d = dag.clone(); + let d = dag.clone(); tokio::spawn(async move { - loop { - println!("Updating DAG..."); - let _ = d - .update() - .await - .map_err(|e| eprintln!("Could not update DAG: {e}")); - let _ = d - .dump() - .map_err(|e| eprintln!("Could not dump DAG to disk: {e}")); - println!("Sleeping for {DAG_UPDATE_INTERVAL_SECS} seconds..."); - tokio::time::sleep(tokio::time::Duration::from_secs(DAG_UPDATE_INTERVAL_SECS)).await; - } + let _ = d + .continuous_background_update() + .await + .map_err(|e| eprintln!("Failed to update DAG in background thread: {e}")); }); Ok(dag) @@ -274,9 +265,11 @@ async fn start_server(dag: SpendDagDb) -> Result<()> { // Dispatch the request to the appropriate handler let response = match request.url() { "/" => routes::spend_dag_svg(&dag), - s if s.starts_with("/spend/") => routes::spend(&dag, &request), - s if s.starts_with("/add-participant/") => routes::add_participant(&dag, &request), - "/beta-rewards" => routes::beta_rewards(&dag), + s if s.starts_with("/spend/") => routes::spend(&dag, &request).await, + s if s.starts_with("/add-participant/") => { + routes::add_participant(&dag, &request).await + } + "/beta-rewards" => routes::beta_rewards(&dag).await, _ => routes::not_found(), }; @@ -311,7 +304,7 @@ fn get_auditor_data_dir_path() -> Result { fn load_and_update_beta_participants( provided_participants_file: Option, -) -> Result> { +) -> Result> { let mut beta_participants = if let Some(participants_file) = provided_participants_file { let raw_data = std::fs::read_to_string(&participants_file)?; // instead of serde_json, just use a line separated file @@ -348,5 +341,5 @@ fn load_and_update_beta_participants( let _ = std::fs::write(local_participants_file, beta_participants.join("\n")) .map_err(|e| eprintln!("Failed to write beta participants to disk: {e}")); - Ok(beta_participants) + Ok(beta_participants.into_iter().collect()) } diff --git a/sn_auditor/src/routes.rs b/sn_auditor/src/routes.rs index 9c217ea612..a98d04e63c 100644 --- a/sn_auditor/src/routes.rs +++ b/sn_auditor/src/routes.rs @@ -10,6 +10,7 @@ use crate::dag_db::{self, SpendDagDb}; use color_eyre::eyre::{eyre, Result}; use sn_client::transfers::SpendAddress; use std::{ + collections::BTreeSet, fs::{File, OpenOptions}, io::{Cursor, Write}, str::FromStr, @@ -33,7 +34,10 @@ pub(crate) fn spend_dag_svg(_dag: &SpendDagDb) -> Result } } -pub(crate) fn spend(dag: &SpendDagDb, request: &Request) -> Result>>> { +pub(crate) async fn spend( + dag: &SpendDagDb, + request: &Request, +) -> Result>>> { let addr = match request.url().split('/').last() { Some(addr) => addr, None => { @@ -54,6 +58,7 @@ pub(crate) fn spend(dag: &SpendDagDb, request: &Request) -> Result Result>>> { Ok(response) } -pub(crate) fn beta_rewards(dag: &SpendDagDb) -> Result>>> { +pub(crate) async fn beta_rewards(dag: &SpendDagDb) -> Result>>> { let json = dag .beta_program_json() + .await .map_err(|e| eyre!("Failed to get beta rewards JSON: {e}"))?; let response = Response::from_data(json); Ok(response) } -pub(crate) fn add_participant( +pub(crate) async fn add_participant( dag: &SpendDagDb, request: &Request, ) -> Result>>> { let discord_id = match request.url().split('/').last() { - Some(discord_id) => discord_id, + Some(discord_id) => { + // TODO: When we simply accept POST we can remove this decoding + // For now we need it to decode #fragments in urls + let discord_id = urlencoding::decode(discord_id)?; + discord_id.to_string() + } None => { return Ok(Response::from_string( "No discord_id provided. Should be /add-participant/[your_discord_id_here]", @@ -94,7 +105,7 @@ pub(crate) fn add_participant( return Ok(Response::from_string("discord_id cannot be empty").with_status_code(400)); } - if let Err(err) = track_new_participant(dag, discord_id.to_owned()) { + if let Err(err) = track_new_participant(dag, discord_id.to_owned()).await { return Ok( Response::from_string(format!("Failed to track new participant: {err}")) .with_status_code(400), @@ -104,11 +115,12 @@ pub(crate) fn add_participant( Ok(Response::from_string("Successfully added participant ")) } -fn track_new_participant(dag: &SpendDagDb, discord_id: String) -> Result<()> { - dag.track_new_beta_participants(vec![discord_id.to_owned()])?; +async fn track_new_participant(dag: &SpendDagDb, discord_id: String) -> Result<()> { + dag.track_new_beta_participants(BTreeSet::from_iter([discord_id.to_owned()])) + .await?; // only append new ids - if dag.is_participant_tracked(&discord_id)? { + if dag.is_participant_tracked(&discord_id).await? { return Ok(()); } diff --git a/sn_cli/src/bin/subcommands/wallet/audit.rs b/sn_cli/src/bin/subcommands/wallet/audit.rs index f7bb1aab1e..29423fd8c6 100644 --- a/sn_cli/src/bin/subcommands/wallet/audit.rs +++ b/sn_cli/src/bin/subcommands/wallet/audit.rs @@ -6,7 +6,6 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use std::collections::BTreeSet; use std::path::Path; use std::str::FromStr; @@ -18,36 +17,27 @@ use sn_client::transfers::{CashNoteRedemption, SpendAddress, Transfer, GENESIS_S use sn_client::{Client, SpendDag}; const SPEND_DAG_FILENAME: &str = "spend_dag"; +const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096; async fn step_by_step_spend_dag_gathering(client: &Client, mut dag: SpendDag) -> Result { - let verify_after = false; let start_time = std::time::Instant::now(); - let mut depth_exponential = 1; - let mut current_utxos = dag.get_utxos(); - let mut last_utxos = BTreeSet::new(); - println!("Gathering the Spend DAG, note that this might take a very long time..."); - while last_utxos != current_utxos { - let unexplored_utxos = current_utxos.difference(&last_utxos).cloned().collect(); - last_utxos = std::mem::take(&mut current_utxos); - - client - .spend_dag_continue_from( - &mut dag, - unexplored_utxos, - Some(depth_exponential), - verify_after, - ) - .await?; - - depth_exponential += depth_exponential; - current_utxos = dag.get_utxos(); - let dag_size = dag.all_spends().len(); - println!( - "Depth {depth_exponential}: the DAG now has {dag_size} spends and {} UTXOs", - current_utxos.len() - ); - } + let (tx, mut rx) = tokio::sync::mpsc::channel(SPENDS_PROCESSING_BUFFER_SIZE); + tokio::spawn(async move { + let mut spend_count = 0; + let mut exponential = 64; + while let Some(_spend) = rx.recv().await { + spend_count += 1; + if spend_count % exponential == 0 { + println!("Collected {spend_count} spends..."); + exponential *= 2; + } + } + }); + + client + .spend_dag_continue_from_utxos(&mut dag, Some(tx), false) + .await; println!("Done gathering the Spend DAG in {:?}", start_time.elapsed()); // verify the DAG @@ -74,8 +64,8 @@ async fn gather_spend_dag(client: &Client, root_dir: &Path, fast_mode: bool) -> println!("Found a local spend dag on disk, continuing from it..."); if fast_mode { client - .spend_dag_continue_from_utxos(&mut dag, None, false) - .await?; + .spend_dag_continue_from_utxos(&mut dag, Default::default(), false) + .await; } dag } @@ -83,10 +73,13 @@ async fn gather_spend_dag(client: &Client, root_dir: &Path, fast_mode: bool) -> println!("Starting from Genesis as found no local spend dag on disk..."); info!("Starting from Genesis as failed to load spend dag from disk: {err}"); let genesis_addr = SpendAddress::from_unique_pubkey(&GENESIS_SPEND_UNIQUE_KEY); - let stop_after = if fast_mode { None } else { Some(1) }; - client - .spend_dag_build_from(genesis_addr, stop_after, true) - .await? + if fast_mode { + client + .spend_dag_build_from(genesis_addr, Default::default(), true) + .await? + } else { + client.new_dag_with_genesis_only().await? + } } }; diff --git a/sn_client/src/audit.rs b/sn_client/src/audit.rs index b10941af24..0d9bb8daec 100644 --- a/sn_client/src/audit.rs +++ b/sn_client/src/audit.rs @@ -6,9 +6,9 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. +mod dag_crawling; mod dag_error; mod spend_dag; -mod spend_dag_building; #[cfg(test)] mod tests; diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index 66825846e3..324cb18565 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -10,8 +10,11 @@ use crate::{Client, Error, SpendDag}; use futures::{future::join_all, StreamExt}; use sn_networking::{GetRecordError, NetworkError}; -use sn_transfers::{SignedSpend, SpendAddress, WalletError, WalletResult}; +use sn_transfers::{ + SignedSpend, SpendAddress, WalletError, WalletResult, GENESIS_SPEND_UNIQUE_KEY, +}; use std::collections::BTreeSet; +use tokio::sync::mpsc::Sender; enum InternalGetNetworkSpend { Spend(Box), @@ -21,6 +24,24 @@ enum InternalGetNetworkSpend { } impl Client { + pub async fn new_dag_with_genesis_only(&self) -> WalletResult { + let genesis_addr = SpendAddress::from_unique_pubkey(&GENESIS_SPEND_UNIQUE_KEY); + let mut dag = SpendDag::new(genesis_addr); + let genesis_spend = match self.get_spend_from_network(genesis_addr).await { + Ok(s) => s, + Err(Error::Network(NetworkError::DoubleSpendAttempt(spend1, spend2))) => { + let addr = spend1.address(); + println!("Double spend detected at Genesis: {addr:?}"); + dag.insert(genesis_addr, *spend2); + *spend1 + } + Err(e) => return Err(WalletError::FailedToGetSpend(e.to_string())), + }; + dag.insert(genesis_addr, genesis_spend); + + Ok(dag) + } + /// Builds a SpendDag from a given SpendAddress recursively following descendants all the way to UTxOs /// Started from Genesis this gives the entire SpendDag of the Network at a certain point in time /// Once the DAG collected, optionally verifies and records errors in the DAG @@ -38,7 +59,7 @@ impl Client { pub async fn spend_dag_build_from( &self, spend_addr: SpendAddress, - max_depth: Option, + spend_processing: Option>, verify: bool, ) -> WalletResult { info!("Building spend DAG from {spend_addr:?}"); @@ -100,12 +121,25 @@ impl Client { match get_spend { InternalGetNetworkSpend::Spend(spend) => { next_gen_tx.insert(spend.spend.spent_tx.clone()); + if let Some(sender) = &spend_processing { + let _ = sender.send(*spend.clone()).await.map_err(|e| { + error!("Failed to send spend {addr:?} to processing: {e}") + }); + } dag.insert(addr, *spend); } InternalGetNetworkSpend::DoubleSpend(s1, s2) => { info!("Fetched double spend at {addr:?} from network, following both..."); next_gen_tx.insert(s1.spend.spent_tx.clone()); next_gen_tx.insert(s2.spend.spent_tx.clone()); + if let Some(sender) = &spend_processing { + let _ = sender.send(*s1.clone()).await.map_err(|e| { + error!("Failed to send spend {addr:?} to processing: {e}") + }); + let _ = sender.send(*s2.clone()).await.map_err(|e| { + error!("Failed to send spend {addr:?} to processing: {e}") + }); + } dag.insert(addr, *s1); dag.insert(addr, *s2); } @@ -125,10 +159,6 @@ impl Client { // go on to next gen gen += 1; - if gen >= max_depth.unwrap_or(u32::MAX) { - info!("Reached generation {gen}, stopping DAG collection from {spend_addr:?}"); - break; - } } let elapsed = start.elapsed(); @@ -288,31 +318,25 @@ impl Client { /// Extends an existing SpendDag starting from the given utxos /// If verify is true, records faults in the DAG - /// Stops gathering after max_depth generations pub async fn spend_dag_continue_from( &self, dag: &mut SpendDag, utxos: BTreeSet, - max_depth: Option, + spend_processing: Option>, verify: bool, - ) -> WalletResult<()> { + ) { let main_dag_src = dag.source(); info!( "Expanding spend DAG with source: {main_dag_src:?} from {} utxos", utxos.len() ); - let mut stream = futures::stream::iter(utxos.into_iter()) - .map(|utxo| async move { - debug!("Queuing task to gather DAG from utxo: {:?}", utxo); - ( - self.spend_dag_build_from(utxo, max_depth, false).await, - utxo, - ) - }) - .buffer_unordered(crate::MAX_CONCURRENT_TASKS); - - while let Some((res, addr)) = stream.next().await { + let sender = spend_processing.clone(); + let tasks = utxos + .iter() + .map(|utxo| self.spend_dag_build_from(*utxo, sender.clone(), false)); + let sub_dags = join_all(tasks).await; + for (res, addr) in sub_dags.into_iter().zip(utxos.into_iter()) { match res { Ok(sub_dag) => { debug!("Gathered sub DAG from: {addr:?}"); @@ -325,7 +349,6 @@ impl Client { } info!("Done gathering spend DAG from utxos"); - Ok(()) } /// Extends an existing SpendDag starting from the utxos in this DAG @@ -335,11 +358,11 @@ impl Client { pub async fn spend_dag_continue_from_utxos( &self, dag: &mut SpendDag, - max_depth: Option, + spend_processing: Option>, verify: bool, - ) -> WalletResult<()> { + ) { let utxos = dag.get_utxos(); - self.spend_dag_continue_from(dag, utxos, max_depth, verify) + self.spend_dag_continue_from(dag, utxos, spend_processing, verify) .await } diff --git a/sn_client/src/lib.rs b/sn_client/src/lib.rs index 10bced5cff..87d6204a6d 100644 --- a/sn_client/src/lib.rs +++ b/sn_client/src/lib.rs @@ -34,7 +34,7 @@ pub use sn_protocol as protocol; pub use sn_registers as registers; pub use sn_transfers as transfers; -const MAX_CONCURRENT_TASKS: usize = 32; +const MAX_CONCURRENT_TASKS: usize = 4096; pub use self::{ audit::{DagError, SpendDag, SpendDagGet, SpendFault},