From f37b4d3ce5e29bd223b3addf2156d6144d2963d4 Mon Sep 17 00:00:00 2001 From: Anselme Date: Mon, 28 Oct 2024 09:15:30 +0100 Subject: [PATCH] Revert "feat(autonomi): allow the uploader to work with the new apis" --- .github/workflows/merge.yml | 6 +- Cargo.lock | 17 - autonomi-cli/src/utils.rs | 41 +- autonomi/Cargo.toml | 5 - autonomi/src/client/data.rs | 78 +- autonomi/src/client/data_private.rs | 70 +- autonomi/src/client/external_signer.rs | 25 +- autonomi/src/client/fs.rs | 12 +- autonomi/src/client/fs_private.rs | 10 +- autonomi/src/client/mod.rs | 14 +- autonomi/src/client/registers.rs | 226 ++--- autonomi/src/client/utils.rs | 96 +- autonomi/src/lib.rs | 2 - autonomi/src/uploader/mod.rs | 590 ----------- autonomi/src/uploader/tests/mod.rs | 557 ---------- autonomi/src/uploader/tests/setup.rs | 480 --------- autonomi/src/uploader/upload.rs | 1232 ----------------------- autonomi/src/utils.rs | 7 +- autonomi/tests/fs.rs | 2 +- sn_evm/src/lib.rs | 1 - sn_networking/Cargo.toml | 1 - sn_networking/src/target_arch.rs | 25 +- sn_protocol/src/messages/chunk_proof.rs | 9 - 23 files changed, 318 insertions(+), 3188 deletions(-) delete mode 100644 autonomi/src/uploader/mod.rs delete mode 100644 autonomi/src/uploader/tests/mod.rs delete mode 100644 autonomi/src/uploader/tests/setup.rs delete mode 100644 autonomi/src/uploader/upload.rs diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index 720a2f7e25..d639924585 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -110,10 +110,6 @@ jobs: - uses: Swatinem/rust-cache@v2 - - name: Run autonomi tests - timeout-minutes: 25 - run: cargo test --release --package autonomi --lib --features="full,fs" - - name: Run node tests timeout-minutes: 25 run: cargo test --release --package sn_node --lib @@ -192,7 +188,7 @@ jobs: # only these unit tests require a network, the rest are run above in unit test section - name: Run autonomi --tests - run: cargo test --package autonomi --features="full,fs" --tests -- --nocapture + run: cargo test --package autonomi --tests -- --nocapture env: SN_LOG: "v" # only set the target dir for windows to bypass the linker issue. diff --git a/Cargo.lock b/Cargo.lock index a9e73bd939..d274255dbc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -950,18 +950,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" -[[package]] -name = "async-channel" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - [[package]] name = "async-io" version = "2.3.4" @@ -1091,7 +1079,6 @@ name = "autonomi" version = "0.2.1" dependencies = [ "alloy", - "assert_matches", "bip39", "blst", "blstrs 0.7.1", @@ -1099,13 +1086,11 @@ dependencies = [ "bytes", "console_error_panic_hook", "const-hex", - "custom_debug", "evmlib", "eyre", "futures", "hex 0.4.3", "instant", - "itertools 0.12.1", "js-sys", "libp2p 0.54.1", "rand 0.8.5", @@ -1122,7 +1107,6 @@ dependencies = [ "sn_peers_acquisition", "sn_protocol", "sn_registers", - "tempfile", "test_utils", "thiserror", "tiny_http", @@ -8278,7 +8262,6 @@ name = "sn_networking" version = "0.19.0" dependencies = [ "aes-gcm-siv", - "async-channel", "async-trait", "backoff", "blsttc", diff --git a/autonomi-cli/src/utils.rs b/autonomi-cli/src/utils.rs index 80c46150ad..5f031a3c24 100644 --- a/autonomi-cli/src/utils.rs +++ b/autonomi-cli/src/utils.rs @@ -6,40 +6,28 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use autonomi::client::{Amount, ClientEvent}; - -/// Summary of the upload operation. -#[derive(Debug, Clone)] -pub struct CliUploadSummary { - /// Total tokens spent during the upload. - pub tokens_spent: Amount, - /// Total number of records uploaded. - pub record_count: usize, -} +use autonomi::client::{Amount, ClientEvent, UploadSummary}; /// Collects upload summary from the event receiver. /// Send a signal to the returned sender to stop collecting and to return the result via the join handle. pub fn collect_upload_summary( mut event_receiver: tokio::sync::mpsc::Receiver, ) -> ( - tokio::task::JoinHandle, + tokio::task::JoinHandle, tokio::sync::oneshot::Sender<()>, ) { let (upload_completed_tx, mut upload_completed_rx) = tokio::sync::oneshot::channel::<()>(); let stats_thread = tokio::spawn(async move { - let mut tokens: Amount = Amount::from(0); - let mut records = 0; + let mut tokens_spent: Amount = Amount::from(0); + let mut record_count = 0; loop { tokio::select! { event = event_receiver.recv() => { match event { - Some(ClientEvent::UploadComplete { - tokens_spent, - record_count - }) => { - tokens += tokens_spent; - records += record_count; + Some(ClientEvent::UploadComplete(upload_summary)) => { + tokens_spent += upload_summary.tokens_spent; + record_count += upload_summary.record_count; } None => break, } @@ -51,19 +39,16 @@ pub fn collect_upload_summary( // try to drain the event receiver in case there are any more events while let Ok(event) = event_receiver.try_recv() { match event { - ClientEvent::UploadComplete { - tokens_spent, - record_count, - } => { - tokens += tokens_spent; - records += record_count; + ClientEvent::UploadComplete(upload_summary) => { + tokens_spent += upload_summary.tokens_spent; + record_count += upload_summary.record_count; } } } - CliUploadSummary { - tokens_spent: tokens, - record_count: records, + UploadSummary { + tokens_spent, + record_count, } }); diff --git a/autonomi/Cargo.toml b/autonomi/Cargo.toml index 2213775d78..3da273183e 100644 --- a/autonomi/Cargo.toml +++ b/autonomi/Cargo.toml @@ -30,11 +30,9 @@ bytes = { version = "1.0.1", features = ["serde"] } curv = { version = "0.10.1", package = "sn_curv", default-features = false, features = [ "num-bigint", ] } -custom_debug = "~0.6.1" eip2333 = { version = "0.2.1", package = "sn_bls_ckd" } const-hex = "1.12.0" hex = "~0.4.3" -itertools = "~0.12.1" libp2p = "0.54.1" rand = "0.8.5" rmp-serde = "1.1.1" @@ -60,13 +58,10 @@ blstrs = "0.7.1" [dev-dependencies] alloy = { version = "0.5.3", default-features = false, features = ["std", "reqwest-rustls-tls", "provider-anvil-node", "sol-types", "json", "signers", "contract", "signer-local", "network"] } -assert_matches = "1.5.0" eyre = "0.6.5" sha2 = "0.10.6" sn_logging = { path = "../sn_logging", version = "0.2.37" } sn_peers_acquisition = { path = "../sn_peers_acquisition", version = "0.5.4" } -sn_registers = { path = "../sn_registers", version = "0.4.0", features = ["test-utils"] } -tempfile = "3.6.0" # Do not specify the version field. Release process expects even the local dev deps to be published. # Removing the version field is a workaround. test_utils = { path = "../test_utils" } diff --git a/autonomi/src/client/data.rs b/autonomi/src/client/data.rs index 542dea1f0b..869022cd37 100644 --- a/autonomi/src/client/data.rs +++ b/autonomi/src/client/data.rs @@ -6,11 +6,15 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::client::ClientEvent; -use crate::uploader::{UploadError, Uploader}; -use crate::{self_encryption::encrypt, Client}; use bytes::Bytes; use libp2p::kad::Quorum; +use tokio::task::{JoinError, JoinSet}; + +use std::collections::HashSet; +use xor_name::XorName; + +use crate::client::{ClientEvent, UploadSummary}; +use crate::{self_encryption::encrypt, Client}; use sn_evm::{Amount, AttoTokens}; use sn_evm::{EvmWallet, EvmWalletError}; use sn_networking::{GetRecordCfg, NetworkError}; @@ -18,8 +22,6 @@ use sn_protocol::{ storage::{try_deserialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind}, NetworkAddress, }; -use std::collections::HashSet; -use xor_name::XorName; /// Raw Data Address (points to a DataMap) pub type DataAddr = XorName; @@ -39,14 +41,14 @@ pub enum PutError { PayError(#[from] PayError), #[error("Serialization error: {0}")] Serialization(String), - #[error("Upload Error")] - Upload(#[from] UploadError), #[error("A wallet error occurred.")] Wallet(#[from] sn_evm::EvmError), #[error("The vault owner key does not match the client's public key")] VaultBadOwner, #[error("Payment unexpectedly invalid for {0:?}")] PaymentUnexpectedlyInvalid(NetworkAddress), + #[error("Could not simultaneously upload chunks: {0:?}")] + JoinError(tokio::task::JoinError), } /// Errors that can occur during the pay operation. @@ -78,6 +80,8 @@ pub enum GetError { /// Errors that can occur during the cost calculation. #[derive(Debug, thiserror::Error)] pub enum CostError { + #[error("Could not simultaneously fetch store costs: {0:?}")] + JoinError(JoinError), #[error("Failed to self-encrypt data.")] SelfEncryption(#[from] crate::self_encryption::Error), #[error("Could not get store quote for: {0:?} after several retries")] @@ -114,24 +118,62 @@ impl Client { debug!("Encryption took: {:.2?}", now.elapsed()); let map_xor_name = *data_map_chunk.address().xorname(); + let mut xor_names = vec![map_xor_name]; - let mut uploader = Uploader::new(self.clone(), wallet.clone()); - uploader.insert_chunks(chunks); - uploader.insert_chunks(vec![data_map_chunk]); + for chunk in &chunks { + xor_names.push(*chunk.name()); + } - let summary = uploader.start_upload().await?; + // Pay for all chunks + data map chunk + info!("Paying for {} addresses", xor_names.len()); + let (payment_proofs, _free_chunks) = self + .pay(xor_names.into_iter(), wallet) + .await + .inspect_err(|err| error!("Error paying for data: {err:?}"))?; + + let mut record_count = 0; + + // Upload all the chunks in parallel including the data map chunk + debug!("Uploading {} chunks", chunks.len()); + let mut tasks = JoinSet::new(); + for chunk in chunks.into_iter().chain(std::iter::once(data_map_chunk)) { + let self_clone = self.clone(); + let address = *chunk.address(); + if let Some(proof) = payment_proofs.get(chunk.name()) { + let proof_clone = proof.clone(); + tasks.spawn(async move { + self_clone + .chunk_upload_with_payment(chunk, proof_clone) + .await + .inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}")) + }); + } else { + debug!("Chunk at {address:?} was already paid for so skipping"); + } + } + while let Some(result) = tasks.join_next().await { + result + .inspect_err(|err| error!("Join error uploading chunk: {err:?}")) + .map_err(PutError::JoinError)? + .inspect_err(|err| error!("Error uploading chunk: {err:?}"))?; + record_count += 1; + } if let Some(channel) = self.client_event_sender.as_ref() { - if let Err(err) = channel - .send(ClientEvent::UploadComplete { - record_count: summary.uploaded_count, - tokens_spent: summary.storage_cost, - }) - .await - { + let tokens_spent = payment_proofs + .values() + .map(|proof| proof.quote.cost.as_atto()) + .sum::(); + + let summary = UploadSummary { + record_count, + tokens_spent, + }; + if let Err(err) = channel.send(ClientEvent::UploadComplete(summary)).await { error!("Failed to send client event: {err:?}"); } } + Ok(map_xor_name) } diff --git a/autonomi/src/client/data_private.rs b/autonomi/src/client/data_private.rs index b1cda1b3f7..b6d0bfa8a3 100644 --- a/autonomi/src/client/data_private.rs +++ b/autonomi/src/client/data_private.rs @@ -6,15 +6,17 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use super::data::{GetError, PutError}; -use crate::client::ClientEvent; -use crate::uploader::Uploader; -use crate::{self_encryption::encrypt, Client}; +use std::hash::{DefaultHasher, Hash, Hasher}; + use bytes::Bytes; use serde::{Deserialize, Serialize}; -use sn_evm::EvmWallet; +use sn_evm::{Amount, EvmWallet}; use sn_protocol::storage::Chunk; -use std::hash::{DefaultHasher, Hash, Hasher}; +use tokio::task::JoinSet; + +use super::data::{GetError, PutError}; +use crate::client::{ClientEvent, UploadSummary}; +use crate::{self_encryption::encrypt, Client}; /// Private data on the network can be accessed with this #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)] @@ -67,21 +69,53 @@ impl Client { let (data_map_chunk, chunks) = encrypt(data)?; debug!("Encryption took: {:.2?}", now.elapsed()); - // Upload the chunks with the payments - let mut uploader = Uploader::new(self.clone(), wallet.clone()); - uploader.insert_chunks(chunks); - uploader.insert_chunks(vec![data_map_chunk.clone()]); + // Pay for all chunks + let xor_names: Vec<_> = chunks.iter().map(|chunk| *chunk.name()).collect(); + info!("Paying for {} addresses", xor_names.len()); + let (payment_proofs, _free_chunks) = self + .pay(xor_names.into_iter(), wallet) + .await + .inspect_err(|err| error!("Error paying for data: {err:?}"))?; - let summary = uploader.start_upload().await?; + // Upload the chunks with the payments + let mut record_count = 0; + debug!("Uploading {} chunks", chunks.len()); + let mut tasks = JoinSet::new(); + for chunk in chunks { + let self_clone = self.clone(); + let address = *chunk.address(); + if let Some(proof) = payment_proofs.get(chunk.name()) { + let proof_clone = proof.clone(); + tasks.spawn(async move { + self_clone + .chunk_upload_with_payment(chunk, proof_clone) + .await + .inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}")) + }); + } else { + debug!("Chunk at {address:?} was already paid for so skipping"); + } + } + while let Some(result) = tasks.join_next().await { + result + .inspect_err(|err| error!("Join error uploading chunk: {err:?}")) + .map_err(PutError::JoinError)? + .inspect_err(|err| error!("Error uploading chunk: {err:?}"))?; + record_count += 1; + } + // Reporting if let Some(channel) = self.client_event_sender.as_ref() { - if let Err(err) = channel - .send(ClientEvent::UploadComplete { - record_count: summary.uploaded_count, - tokens_spent: summary.storage_cost, - }) - .await - { + let tokens_spent = payment_proofs + .values() + .map(|proof| proof.quote.cost.as_atto()) + .sum::(); + + let summary = UploadSummary { + record_count, + tokens_spent, + }; + if let Err(err) = channel.send(ClientEvent::UploadComplete(summary)).await { error!("Failed to send client event: {err:?}"); } } diff --git a/autonomi/src/client/external_signer.rs b/autonomi/src/client/external_signer.rs index 5057bc3b28..b17002bd9c 100644 --- a/autonomi/src/client/external_signer.rs +++ b/autonomi/src/client/external_signer.rs @@ -3,8 +3,7 @@ use crate::client::utils::extract_quote_payments; use crate::self_encryption::encrypt; use crate::Client; use bytes::Bytes; -use sn_evm::{ProofOfPayment, QuotePayment}; -use sn_networking::PayeeQuote; +use sn_evm::{PaymentQuote, ProofOfPayment, QuotePayment}; use sn_protocol::storage::Chunk; use std::collections::HashMap; use xor_name::XorName; @@ -34,7 +33,7 @@ impl Client { data: Bytes, ) -> Result< ( - HashMap, + HashMap, Vec, Vec, ), @@ -42,9 +41,15 @@ impl Client { > { // Encrypt the data as chunks let (_data_map_chunk, _chunks, xor_names) = encrypt_data(data)?; - let cost_map = self.get_store_quotes(xor_names.into_iter()).await?; - let (quote_payments, free_chunks) = extract_quote_payments(&cost_map); + let cost_map: HashMap = self + .get_store_quotes(xor_names.into_iter()) + .await? + .into_iter() + .map(|(name, (_, _, q))| (name, q)) + .collect(); + + let (quote_payments, free_chunks) = extract_quote_payments(&cost_map); Ok((cost_map, quote_payments, free_chunks)) } @@ -57,12 +62,12 @@ impl Client { if let Some(proof) = payment_proofs.get(map_xor_name) { debug!("Uploading data map chunk: {map_xor_name:?}"); - self.chunk_upload_with_payment(data_map_chunk.clone(), proof.clone(), None) + self.chunk_upload_with_payment(data_map_chunk.clone(), proof.clone()) .await - .inspect_err(|err| error!("Error uploading data map chunk: {err:?}"))?; + .inspect_err(|err| error!("Error uploading data map chunk: {err:?}")) + } else { + Ok(()) } - - Ok(()) } async fn upload_chunks( @@ -74,7 +79,7 @@ impl Client { for chunk in chunks { if let Some(proof) = payment_proofs.get(chunk.name()) { let address = *chunk.address(); - self.chunk_upload_with_payment(chunk.clone(), proof.clone(), None) + self.chunk_upload_with_payment(chunk.clone(), proof.clone()) .await .inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}"))?; } diff --git a/autonomi/src/client/fs.rs b/autonomi/src/client/fs.rs index 43ab87f504..d7f243df68 100644 --- a/autonomi/src/client/fs.rs +++ b/autonomi/src/client/fs.rs @@ -20,7 +20,7 @@ use super::data::{DataAddr, GetError, PutError}; /// Errors that can occur during the file upload operation. #[cfg(feature = "fs")] #[derive(Debug, thiserror::Error)] -pub enum FileUploadError { +pub enum UploadError { #[error("Failed to recursively traverse directory")] WalkDir(#[from] walkdir::Error), #[error("Input/output failure")] @@ -38,7 +38,7 @@ pub enum FileUploadError { #[cfg(feature = "fs")] /// Errors that can occur during the download operation. #[derive(Debug, thiserror::Error)] -pub enum FileDownloadError { +pub enum DownloadError { #[error("Failed to download file")] GetError(#[from] GetError), #[error("IO failure")] @@ -67,7 +67,7 @@ impl Client { &self, data_addr: DataAddr, to_dest: PathBuf, - ) -> Result<(), FileDownloadError> { + ) -> Result<(), DownloadError> { let data = self.data_get(data_addr).await?; if let Some(parent) = to_dest.parent() { tokio::fs::create_dir_all(parent).await?; @@ -81,7 +81,7 @@ impl Client { &self, archive_addr: ArchiveAddr, to_dest: PathBuf, - ) -> Result<(), FileDownloadError> { + ) -> Result<(), DownloadError> { let archive = self.archive_get(archive_addr).await?; for (path, addr, _meta) in archive.iter() { self.file_download(*addr, to_dest.join(path)).await?; @@ -95,7 +95,7 @@ impl Client { &self, dir_path: PathBuf, wallet: &EvmWallet, - ) -> Result { + ) -> Result { let mut archive = Archive::new(); for entry in walkdir::WalkDir::new(dir_path) { @@ -129,7 +129,7 @@ impl Client { &self, path: PathBuf, wallet: &EvmWallet, - ) -> Result { + ) -> Result { let data = tokio::fs::read(path).await?; let data = Bytes::from(data); let addr = self.data_put(data, wallet).await?; diff --git a/autonomi/src/client/fs_private.rs b/autonomi/src/client/fs_private.rs index 31f7857ec6..0d9b819d70 100644 --- a/autonomi/src/client/fs_private.rs +++ b/autonomi/src/client/fs_private.rs @@ -21,7 +21,7 @@ use std::path::PathBuf; use super::archive_private::{PrivateArchive, PrivateArchiveAccess}; use super::data_private::PrivateDataAccess; -use super::fs::{FileDownloadError, FileUploadError}; +use super::fs::{DownloadError, UploadError}; impl Client { /// Download a private file from network to local file system @@ -29,7 +29,7 @@ impl Client { &self, data_access: PrivateDataAccess, to_dest: PathBuf, - ) -> Result<(), FileDownloadError> { + ) -> Result<(), DownloadError> { let data = self.private_data_get(data_access).await?; if let Some(parent) = to_dest.parent() { tokio::fs::create_dir_all(parent).await?; @@ -43,7 +43,7 @@ impl Client { &self, archive_access: PrivateArchiveAccess, to_dest: PathBuf, - ) -> Result<(), FileDownloadError> { + ) -> Result<(), DownloadError> { let archive = self.private_archive_get(archive_access).await?; for (path, addr, _meta) in archive.iter() { self.private_file_download(addr.clone(), to_dest.join(path)) @@ -58,7 +58,7 @@ impl Client { &self, dir_path: PathBuf, wallet: &EvmWallet, - ) -> Result { + ) -> Result { let mut archive = PrivateArchive::new(); for entry in walkdir::WalkDir::new(dir_path) { @@ -92,7 +92,7 @@ impl Client { &self, path: PathBuf, wallet: &EvmWallet, - ) -> Result { + ) -> Result { let data = tokio::fs::read(path).await?; let data = Bytes::from(data); let addr = self.private_data_put(data, wallet).await?; diff --git a/autonomi/src/client/mod.rs b/autonomi/src/client/mod.rs index 172fb9ba4f..d530f210f2 100644 --- a/autonomi/src/client/mod.rs +++ b/autonomi/src/client/mod.rs @@ -31,7 +31,7 @@ pub mod vault; pub mod wasm; // private module with utility functions -pub(crate) mod utils; +mod utils; pub use sn_evm::Amount; @@ -207,8 +207,12 @@ async fn handle_event_receiver( /// Events that can be broadcasted by the client. #[derive(Debug, Clone)] pub enum ClientEvent { - UploadComplete { - record_count: usize, - tokens_spent: Amount, - }, + UploadComplete(UploadSummary), +} + +/// Summary of an upload operation. +#[derive(Debug, Clone)] +pub struct UploadSummary { + pub record_count: usize, + pub tokens_spent: Amount, } diff --git a/autonomi/src/client/registers.rs b/autonomi/src/client/registers.rs index a17bffb147..52f8944e1e 100644 --- a/autonomi/src/client/registers.rs +++ b/autonomi/src/client/registers.rs @@ -6,30 +6,34 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use super::data::CostError; -use crate::client::{Client, ClientEvent}; -use crate::uploader::{UploadError, Uploader}; /// Register Secret Key pub use bls::SecretKey as RegisterSecretKey; -use bytes::Bytes; -use libp2p::kad::{Quorum, Record}; use sn_evm::Amount; use sn_evm::AttoTokens; -use sn_evm::EvmWallet; -use sn_evm::ProofOfPayment; +use sn_evm::EvmWalletError; use sn_networking::VerificationKind; +use sn_protocol::storage::RetryStrategy; +pub use sn_registers::{Permissions as RegisterPermissions, RegisterAddress}; + +use crate::client::data::PayError; +use crate::client::Client; +use crate::client::ClientEvent; +use crate::client::UploadSummary; +use bytes::Bytes; +use libp2p::kad::{Quorum, Record}; +use sn_evm::EvmWallet; use sn_networking::{GetRecordCfg, GetRecordError, NetworkError, PutRecordCfg}; use sn_protocol::storage::try_deserialize_record; use sn_protocol::storage::try_serialize_record; use sn_protocol::storage::RecordKind; -use sn_protocol::storage::RetryStrategy; use sn_protocol::NetworkAddress; use sn_registers::Register as BaseRegister; -pub use sn_registers::{Permissions as RegisterPermissions, RegisterAddress}; use sn_registers::{Permissions, RegisterCrdt, RegisterOp, SignedRegister}; use std::collections::BTreeSet; use xor_name::XorName; +use super::data::CostError; + #[derive(Debug, thiserror::Error)] pub enum RegisterError { #[error("Cost error: {0}")] @@ -40,12 +44,16 @@ pub enum RegisterError { Serialization, #[error("Register could not be verified (corrupt)")] FailedVerification, - #[error("Upload Error")] - Upload(#[from] UploadError), + #[error("Payment failure occurred during register creation.")] + Pay(#[from] PayError), + #[error("Failed to retrieve wallet payment")] + Wallet(#[from] EvmWalletError), #[error("Failed to write to low-level register")] Write(#[source] sn_registers::Error), #[error("Failed to sign register")] CouldNotSign(#[source] sn_registers::Error), + #[error("Received invalid quote from node, this node is possibly malfunctioning, try another node by trying another register name")] + InvalidQuote, } #[derive(Clone, Debug)] @@ -111,45 +119,6 @@ impl Register { Ok(()) } - - /// Merge two registers together. - pub(crate) fn merge(&mut self, other: &Self) -> Result<(), RegisterError> { - debug!("Merging Register of: {:?}", self.address()); - - other.signed_reg.verify().map_err(|_| { - error!( - "Failed to verify register at address: {:?}", - other.address() - ); - RegisterError::FailedVerification - })?; - - self.signed_reg.merge(&other.signed_reg).map_err(|err| { - error!("Failed to merge registers {}: {err}", self.address()); - RegisterError::Write(err) - })?; - - for op in other.signed_reg.ops() { - if let Err(err) = self.crdt_reg.apply_op(op.clone()) { - error!( - "Failed to apply {op:?} to Register {}: {err}", - self.address() - ); - return Err(RegisterError::Write(err)); - } - } - - Ok(()) - } - - #[cfg(test)] - pub(crate) fn test_new_from_register(signed_reg: SignedRegister) -> Register { - let crdt_reg = RegisterCrdt::new(*signed_reg.address()); - Register { - signed_reg, - crdt_reg, - } - } } impl Client { @@ -191,18 +160,13 @@ impl Client { }; // Make sure the fetched record contains valid CRDT operations - signed_reg.verify().map_err(|_| { - error!("Failed to verify register at address: {address}"); - RegisterError::FailedVerification - })?; + signed_reg + .verify() + .map_err(|_| RegisterError::FailedVerification)?; let mut crdt_reg = RegisterCrdt::new(*signed_reg.address()); for op in signed_reg.ops() { if let Err(err) = crdt_reg.apply_op(op.clone()) { - error!( - "Failed to apply {op:?} to Register {address}: {err}", - address = signed_reg.address() - ); return Err(RegisterError::Write(err)); } } @@ -222,6 +186,18 @@ impl Client { ) -> Result<(), RegisterError> { register.write_atop(&new_value, &owner)?; + let signed_register = register.signed_reg.clone(); + + // Prepare the record for network storage + let record = Record { + key: NetworkAddress::from_register_address(*register.address()).to_record_key(), + value: try_serialize_record(&signed_register, RecordKind::Register) + .map_err(|_| RegisterError::Serialization)? + .to_vec(), + publisher: None, + expires: None, + }; + let get_cfg = GetRecordCfg { get_quorum: Quorum::Majority, retry_strategy: Some(RetryStrategy::default()), @@ -236,7 +212,16 @@ impl Client { verification: Some((VerificationKind::Network, get_cfg)), }; - self.register_upload(®ister, None, &put_cfg).await?; + // Store the updated register on the network + self.network + .put_record(record, &put_cfg) + .await + .inspect_err(|err| { + error!( + "Failed to put record - register {:?} to the network: {err}", + register.address() + ) + })?; Ok(()) } @@ -308,79 +293,74 @@ impl Client { // Owner can write to the register. let register = Register::new(Some(value), name, owner, permissions)?; - let address = *register.address(); - - let mut uploader = Uploader::new(self.clone(), wallet.clone()); - uploader.insert_register(vec![register]); - uploader.collect_registers(true); + let address = register.address(); - let summary = uploader.start_upload().await?; - - let register = summary - .uploaded_registers - .get(&address) - .ok_or_else(|| { - error!("Failed to get register with name: {name}"); - RegisterError::Upload(UploadError::InternalError) - })? - .clone(); - - if let Some(channel) = self.client_event_sender.as_ref() { - if let Err(err) = channel - .send(ClientEvent::UploadComplete { - record_count: summary.uploaded_count, - tokens_spent: summary.storage_cost, - }) - .await - { - error!("Failed to send client event: {err:?}"); - } - } + let reg_xor = address.xorname(); + debug!("Paying for register at address: {address}"); + let (payment_proofs, _skipped) = self + .pay(std::iter::once(reg_xor), wallet) + .await + .inspect_err(|err| { + error!("Failed to pay for register at address: {address} : {err}") + })?; + let proof = if let Some(proof) = payment_proofs.get(®_xor) { + proof + } else { + // register was skipped, meaning it was already paid for + error!("Register at address: {address} was already paid for"); + return Err(RegisterError::Network(NetworkError::RegisterAlreadyExists)); + }; - Ok(register) - } + let payee = proof + .to_peer_id_payee() + .ok_or(RegisterError::InvalidQuote) + .inspect_err(|err| error!("Failed to get payee from payment proof: {err}"))?; + let signed_register = register.signed_reg.clone(); + + let record = Record { + key: NetworkAddress::from_register_address(*address).to_record_key(), + value: try_serialize_record( + &(proof, &signed_register), + RecordKind::RegisterWithPayment, + ) + .map_err(|_| RegisterError::Serialization)? + .to_vec(), + publisher: None, + expires: None, + }; - // Used by the uploader. - pub(crate) async fn register_upload( - &self, - register: &Register, - payment: Option<&ProofOfPayment>, - put_cfg: &PutRecordCfg, - ) -> Result<(), RegisterError> { - let signed_register = ®ister.signed_reg; - let record = if let Some(proof) = payment { - Record { - key: NetworkAddress::from_register_address(*register.address()).to_record_key(), - value: try_serialize_record( - &(proof, signed_register), - RecordKind::RegisterWithPayment, - ) - .map_err(|_| RegisterError::Serialization)? - .to_vec(), - publisher: None, - expires: None, - } - } else { - Record { - key: NetworkAddress::from_register_address(*register.address()).to_record_key(), - value: try_serialize_record(signed_register, RecordKind::Register) - .map_err(|_| RegisterError::Serialization)? - .to_vec(), - publisher: None, - expires: None, - } + let get_cfg = GetRecordCfg { + get_quorum: Quorum::Majority, + retry_strategy: Some(RetryStrategy::default()), + target_record: None, + expected_holders: Default::default(), + is_register: true, + }; + let put_cfg = PutRecordCfg { + put_quorum: Quorum::All, + retry_strategy: None, + use_put_record_to: Some(vec![payee]), + verification: Some((VerificationKind::Network, get_cfg)), }; + debug!("Storing register at address {address} to the network"); self.network - .put_record(record, put_cfg) + .put_record(record, &put_cfg) .await .inspect_err(|err| { - error!( - "Failed to put record - register {:?} to the network: {err}", - register.address() - ) + error!("Failed to put record - register {address} to the network: {err}") })?; - Ok(()) + if let Some(channel) = self.client_event_sender.as_ref() { + let summary = UploadSummary { + record_count: 1, + tokens_spent: proof.quote.cost.as_atto(), + }; + if let Err(err) = channel.send(ClientEvent::UploadComplete(summary)).await { + error!("Failed to send client event: {err}"); + } + } + + Ok(register) } } diff --git a/autonomi/src/client/utils.rs b/autonomi/src/client/utils.rs index ec6515a78b..68ae70f2f7 100644 --- a/autonomi/src/client/utils.rs +++ b/autonomi/src/client/utils.rs @@ -6,28 +6,30 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use super::{ - data::{CostError, GetError, PayError}, - Client, -}; -use crate::utils::payment_proof_from_quotes_and_payments; -use crate::{self_encryption::DataMapLevel, uploader::UploadError}; +use std::{collections::HashMap, num::NonZero}; + use bytes::Bytes; use libp2p::kad::{Quorum, Record}; use rand::{thread_rng, Rng}; use self_encryption::{decrypt_full_set, DataMap, EncryptedChunk}; -use sn_evm::{EvmWallet, ProofOfPayment, QuotePayment}; +use sn_evm::{EvmWallet, PaymentQuote, ProofOfPayment, QuotePayment}; use sn_networking::{ GetRecordCfg, Network, NetworkError, PayeeQuote, PutRecordCfg, VerificationKind, }; use sn_protocol::{ messages::ChunkProof, storage::{try_serialize_record, Chunk, ChunkAddress, RecordKind, RetryStrategy}, - NetworkAddress, CLOSE_GROUP_SIZE, + NetworkAddress, }; -use std::{collections::HashMap, num::NonZero}; use xor_name::XorName; +use super::{ + data::{CostError, GetError, PayError, PutError}, + Client, +}; +use crate::self_encryption::DataMapLevel; +use crate::utils::payment_proof_from_quotes_and_payments; + impl Client { /// Fetch and decrypt all chunks in the data map. pub(crate) async fn fetch_from_data_map(&self, data_map: &DataMap) -> Result { @@ -87,8 +89,7 @@ impl Client { &self, chunk: Chunk, payment: ProofOfPayment, - cfg: Option, - ) -> Result<(), UploadError> { + ) -> Result<(), PutError> { let storing_node = payment.to_peer_id_payee().expect("Missing node Peer ID"); debug!("Storing chunk: {chunk:?} to {:?}", storing_node); @@ -100,7 +101,7 @@ impl Client { key: key.clone(), value: try_serialize_record(&(payment, chunk.clone()), record_kind) .map_err(|e| { - UploadError::Serialization(format!( + PutError::Serialization(format!( "Failed to serialize chunk with payment: {e:?}" )) })? @@ -109,41 +110,35 @@ impl Client { expires: None, }; - let put_cfg = if let Some(cfg) = cfg { - cfg - } else { - let verification = { - let verification_cfg = GetRecordCfg { - get_quorum: Quorum::N( - NonZero::new(CLOSE_GROUP_SIZE / 2).expect("2 is non-zero"), - ), - retry_strategy: Some(RetryStrategy::Quick), - target_record: None, - expected_holders: Default::default(), - is_register: false, - }; - - let random_nonce = thread_rng().gen::(); - let expected_proof = - ChunkProof::from_chunk(&chunk, random_nonce).map_err(|err| { - UploadError::Serialization(format!("Failed to obtain chunk proof: {err:?}")) - })?; - - Some(( - VerificationKind::ChunkProof { - expected_proof, - nonce: random_nonce, - }, - verification_cfg, - )) + let verification = { + let verification_cfg = GetRecordCfg { + get_quorum: Quorum::N(NonZero::new(2).expect("2 is non-zero")), + retry_strategy: Some(RetryStrategy::Quick), + target_record: None, + expected_holders: Default::default(), + is_register: false, }; - PutRecordCfg { - put_quorum: Quorum::One, - retry_strategy: Some(RetryStrategy::Balanced), - use_put_record_to: Some(vec![storing_node]), - verification, - } + let stored_on_node = try_serialize_record(&chunk, RecordKind::Chunk) + .map_err(|e| PutError::Serialization(format!("Failed to serialize chunk: {e:?}")))? + .to_vec(); + let random_nonce = thread_rng().gen::(); + let expected_proof = ChunkProof::new(&stored_on_node, random_nonce); + + Some(( + VerificationKind::ChunkProof { + expected_proof, + nonce: random_nonce, + }, + verification_cfg, + )) + }; + + let put_cfg = PutRecordCfg { + put_quorum: Quorum::One, + retry_strategy: Some(RetryStrategy::Balanced), + use_put_record_to: Some(vec![storing_node]), + verification, }; Ok(self.network.put_record(record, &put_cfg).await?) } @@ -154,7 +149,12 @@ impl Client { content_addrs: impl Iterator, wallet: &EvmWallet, ) -> Result<(HashMap, Vec), PayError> { - let cost_map = self.get_store_quotes(content_addrs).await?; + let cost_map = self + .get_store_quotes(content_addrs) + .await? + .into_iter() + .map(|(name, (_, _, q))| (name, q)) + .collect(); let (quote_payments, skipped_chunks) = extract_quote_payments(&cost_map); @@ -233,12 +233,12 @@ async fn fetch_store_quote( /// Form to be executed payments and already executed payments from a cost map. pub(crate) fn extract_quote_payments( - cost_map: &HashMap, + cost_map: &HashMap, ) -> (Vec, Vec) { let mut to_be_paid = vec![]; let mut already_paid = vec![]; - for (chunk_address, (_, _, quote)) in cost_map.iter() { + for (chunk_address, quote) in cost_map.iter() { if quote.cost.is_zero() { already_paid.push(*chunk_address); } else { diff --git a/autonomi/src/lib.rs b/autonomi/src/lib.rs index 6a1476d900..c73bef1378 100644 --- a/autonomi/src/lib.rs +++ b/autonomi/src/lib.rs @@ -33,8 +33,6 @@ extern crate tracing; pub mod client; -pub mod uploader; - #[cfg(feature = "data")] mod self_encryption; mod utils; diff --git a/autonomi/src/uploader/mod.rs b/autonomi/src/uploader/mod.rs deleted file mode 100644 index cd5f7e4fc2..0000000000 --- a/autonomi/src/uploader/mod.rs +++ /dev/null @@ -1,590 +0,0 @@ -// Copyright 2024 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. - -//! The uploader module provides an interface to upload data to the network, with the ability to retry failed uploads, -//! make repayments for failed payments, and verify the data after it has been uploaded. -//! -//! # Example -//! let mut uploader = Uploader::new(self.clone(), wallet.clone()); -//! uploader.insert_chunks(chunks); -//! uploader.insert_chunks(vec![data_map_chunk]); -//! let summary = uploader.start_upload().await?; -//! -//! # Configuration -//! The `Uploader` can be configured using the `UploadCfg` struct. The most notable options are the `batch_size` and -//! `payment_batch_size` which determine the number of data that are processed in parallel and the number of payments -//! that are made in a single evm transaction, respectively. -//! Also the `max_repayments_for_failed_data` option determines the maximum number of repayments to make if the -//! initial payment fails. - -#[cfg(test)] -mod tests; -mod upload; - -#[cfg(feature = "registers")] -use crate::client::registers::{Register, RegisterError}; -use crate::Client; -use itertools::Either; -use sn_evm::EvmWallet; -use sn_evm::{Amount, EvmNetworkTokenError, ProofOfPayment}; -use sn_networking::target_arch::{mpsc, mpsc_channel}; -use sn_networking::{NetworkError, PayeeQuote}; -#[cfg(feature = "data")] -use sn_protocol::storage::{Chunk, ChunkAddress}; -use sn_protocol::{storage::RetryStrategy, NetworkAddress}; -#[cfg(feature = "registers")] -use sn_registers::RegisterAddress; -use std::{ - collections::{HashMap, HashSet}, - fmt::Debug, - path::PathBuf, -}; -use upload::InnerUploader; -use xor_name::XorName; - -/// The default batch size that determines the number of data that are processed in parallel. -/// This includes fetching the store cost, uploading and verifying the data. -/// Use PAYMENT_BATCH_SIZE to control the number of payments made in a single transaction. -pub const BATCH_SIZE: usize = 16; - -/// The number of payments to make in a single EVM transaction. -pub const PAYMENT_BATCH_SIZE: usize = 512; - -/// The number of repayments to attempt for a failed item before returning an error. -/// If value = 1, we do an initial payment & 1 repayment. Thus we make a max 2 payments per data item. -#[cfg(not(test))] -pub(super) const MAX_REPAYMENTS_PER_FAILED_ITEM: usize = 3; -#[cfg(test)] -pub(super) const MAX_REPAYMENTS_PER_FAILED_ITEM: usize = 1; - -#[derive(Debug, thiserror::Error)] -pub enum UploadError { - #[error("Network Token error: {0:?}")] - EvmNetworkTokenError(#[from] EvmNetworkTokenError), - #[error("Internal Error")] - InternalError, - #[error("Invalid cfg: {0:?}")] - InvalidCfg(String), - #[error("I/O error: {0:?}")] - Io(#[from] std::io::Error), - #[error("The upload failed with maximum repayments reached for multiple items: {items:?}")] - MaximumRepaymentsReached { items: Vec }, - #[error("Network error: {0:?}")] - Network(#[from] NetworkError), - #[cfg(feature = "registers")] - #[error("Register could not be verified (corrupt)")] - RegisterFailedVerification, - #[cfg(feature = "registers")] - #[error("Failed to write to low-level register")] - RegisterWrite(#[source] sn_registers::Error), - #[cfg(feature = "registers")] - #[error("Failed to sign register")] - RegisterCouldNotSign(#[source] sn_registers::Error), - #[error("Multiple consecutive network errors reported during upload")] - SequentialNetworkErrors, - #[error("Too many sequential payment errors reported during upload")] - SequentialUploadPaymentError, - #[error("Failed to serialize {0}")] - Serialization(String), -} - -// UploadError is used inside RegisterError, but the uploader emits RegisterError. So this is used to avoid -// recursive enum definition. -#[cfg(feature = "registers")] -impl From for UploadError { - fn from(err: RegisterError) -> Self { - match err { - RegisterError::Network(err) => Self::Network(err), - RegisterError::Write(err) => Self::RegisterWrite(err), - RegisterError::CouldNotSign(err) => Self::RegisterCouldNotSign(err), - RegisterError::Cost(_) => Self::InternalError, - RegisterError::Serialization => Self::Serialization("Register".to_string()), - RegisterError::FailedVerification => Self::RegisterFailedVerification, - RegisterError::Upload(err) => err, - } - } -} - -/// The set of options to pass into the `Uploader` -#[derive(Debug, Clone, Copy)] -pub struct UploadCfg { - pub batch_size: usize, - pub payment_batch_size: usize, - pub verify_store: bool, - pub show_holders: bool, - pub retry_strategy: RetryStrategy, - pub max_repayments_for_failed_data: usize, - #[cfg(feature = "registers")] - pub collect_registers: bool, -} - -impl Default for UploadCfg { - fn default() -> Self { - Self { - batch_size: BATCH_SIZE, - payment_batch_size: PAYMENT_BATCH_SIZE, - verify_store: true, - show_holders: false, - retry_strategy: RetryStrategy::Balanced, - max_repayments_for_failed_data: MAX_REPAYMENTS_PER_FAILED_ITEM, - #[cfg(feature = "registers")] - collect_registers: false, - } - } -} - -/// The result of a successful upload. -#[derive(Debug, Clone, Default)] -pub struct UploadSummary { - pub storage_cost: Amount, - pub final_balance: Amount, - pub uploaded_addresses: HashSet, - #[cfg(feature = "registers")] - pub uploaded_registers: HashMap, - /// The number of records that were paid for and uploaded to the network. - pub uploaded_count: usize, - /// The number of records that were skipped during because they were already present in the network. - pub skipped_count: usize, -} - -impl UploadSummary { - /// Merge two UploadSummary together. - pub fn merge(mut self, other: Self) -> Result> { - self.uploaded_addresses.extend(other.uploaded_addresses); - #[cfg(feature = "registers")] - self.uploaded_registers.extend(other.uploaded_registers); - - let summary = Self { - storage_cost: self - .storage_cost - .checked_add(other.storage_cost) - .ok_or_else(|| { - error!("Failed to merge UploadSummary: NumericOverflow"); - UploadError::InternalError - })?, - final_balance: self - .final_balance - .checked_add(other.storage_cost) - .ok_or_else(|| { - error!("Failed to merge UploadSummary: NumericOverflow"); - UploadError::InternalError - })?, - uploaded_addresses: self.uploaded_addresses, - #[cfg(feature = "registers")] - uploaded_registers: self.uploaded_registers, - uploaded_count: self.uploaded_count + other.uploaded_count, - skipped_count: self.skipped_count + other.skipped_count, - }; - Ok(summary) - } -} - -#[derive(Debug, Clone)] -/// The events emitted from the upload process. -pub enum UploadEvent { - /// Uploaded a record to the network. - #[cfg(feature = "data")] - ChunkUploaded(ChunkAddress), - /// Uploaded a Register to the network. - /// The returned register is just the passed in register. - #[cfg(feature = "registers")] - RegisterUploaded(Register), - /// The Chunk already exists in the network. No payments were made. - #[cfg(feature = "data")] - ChunkAlreadyExistsInNetwork(ChunkAddress), - /// The Register already exists in the network. The locally register changes were pushed to the network. - /// No payments were made. - /// The returned register contains the remote replica merged with the passed in register. - #[cfg(feature = "registers")] - RegisterUpdated(Register), - /// Payment for a batch of records has been made. - PaymentMade { tokens_spent: Amount }, - /// The upload process has terminated with an error. - // Note: We cannot send the Error enum as it does not implement Clone. So we cannot even do Result if - // we also want to return this error from the function. - Error, -} - -pub struct Uploader { - // Has to be stored as an Option as we have to take ownership of inner during the upload. - inner: Option, -} - -impl Uploader { - /// Start the upload process. - pub async fn start_upload(mut self) -> Result { - let event_sender = self - .inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .event_sender - .clone(); - match upload::start_upload(Box::new(self)).await { - Err(err) => { - if let Some(event_sender) = event_sender { - if let Err(err) = event_sender.send(UploadEvent::Error).await { - error!("Error while emitting event: {err:?}"); - } - } - Err(err) - } - Ok(summary) => Ok(summary), - } - } - - /// Creates a new instance of `Uploader` with the default configuration. - /// To modify the configuration, use the provided setter methods (`set_...` functions). - // NOTE: Self has to be constructed only using this method. We expect `Self::inner` is present everywhere. - pub fn new(client: Client, wallet: EvmWallet) -> Self { - Self { - inner: Some(InnerUploader::new(client, wallet)), - } - } - - /// Update all the configurations by passing the `UploadCfg` struct - pub fn set_upload_cfg(&mut self, cfg: UploadCfg) { - // Self can only be constructed with new(), which will set inner to InnerUploader always. - // So it is okay to call unwrap here. - self.inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .set_cfg(cfg); - } - - /// Sets the default batch size that determines the number of data that are processed in parallel. - /// - /// By default, this option is set to the constant `BATCH_SIZE: usize = 16`. - pub fn set_batch_size(&mut self, batch_size: usize) { - // Self can only be constructed with new(), which will set inner to InnerUploader always. - // So it is okay to call unwrap here. - self.inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .set_batch_size(batch_size); - } - - /// Sets the default payment batch size that determines the number of payments that are made in a single - /// transaction. The maximum number of payments that can be made in a single transaction is 512. - /// - /// By default, this option is set to the constant `PAYMENT_BATCH_SIZE: usize = 512`. - pub fn set_payment_batch_size(&mut self, payment_batch_size: usize) { - // Self can only be constructed with new(), which will set inner to InnerUploader always. - // So it is okay to call unwrap here. - self.inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .set_payment_batch_size(payment_batch_size); - } - - /// Sets the option to verify the data after they have been uploaded. - /// - /// By default, this option is set to `true`. - pub fn set_verify_store(&mut self, verify_store: bool) { - self.inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .set_verify_store(verify_store); - } - - /// Sets the option to display the holders that are expected to be holding the data during verification. - /// - /// By default, this option is set to false. - pub fn set_show_holders(&mut self, show_holders: bool) { - self.inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .set_show_holders(show_holders); - } - - /// Sets the RetryStrategy to increase the re-try during the GetStoreCost & Upload tasks. - /// This does not affect the retries during the Payment task. Use `set_max_repayments_for_failed_data` to - /// configure the re-payment attempts. - /// - /// By default, this option is set to `RetryStrategy::Quick` - pub fn set_retry_strategy(&mut self, retry_strategy: RetryStrategy) { - self.inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .set_retry_strategy(retry_strategy); - } - - /// Sets the maximum number of repayments to perform if the initial payment failed. - /// NOTE: This creates an extra Spend and uses the wallet funds. - /// - /// By default, this option is set to `3` repayments. - pub fn set_max_repayments_for_failed_data(&mut self, retries: usize) { - self.inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .set_max_repayments_for_failed_data(retries); - } - - /// Enables the uploader to return all the registers that were Uploaded or Updated. - /// The registers are emitted through the event channel whenever they're completed, but this returns them - /// through the UploadSummary when the whole upload process completes. - /// - /// By default, this option is set to `False` - #[cfg(feature = "registers")] - pub fn collect_registers(&mut self, collect_registers: bool) { - self.inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .set_collect_registers(collect_registers); - } - - /// Returns a receiver for UploadEvent. - /// This method is optional and the upload process can be performed without it. - pub fn get_event_receiver(&mut self) -> mpsc::Receiver { - self.inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .get_event_receiver() - } - - /// Insert a list of chunk paths into the uploader. - /// - /// Use `start_upload` to start the upload process. - #[cfg(feature = "fs")] - pub fn insert_chunk_paths(&mut self, chunks: impl IntoIterator) { - self.inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .insert_chunk_paths(chunks); - } - - /// Insert a list of chunks into the uploader. - /// - /// Use `start_upload` to start the upload process. - #[cfg(feature = "data")] - pub fn insert_chunks(&mut self, chunks: impl IntoIterator) { - self.inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .insert_chunks(chunks); - } - - /// Insert a list of registers into the uploader. To get the updated registers, set `collect_registers` to true. - /// - /// Use `start_upload` to start the upload process. - #[cfg(feature = "registers")] - pub fn insert_register(&mut self, registers: impl IntoIterator) { - self.inner - .as_mut() - .expect("Uploader::new makes sure inner is present") - .insert_register(registers); - } -} - -// ======= Private ======== - -/// An interface to make the testing easier by not interacting with the network. -trait UploaderInterface: Send + Sync { - fn take_inner_uploader(&mut self) -> InnerUploader; - - // Mutable reference is used in tests. - fn submit_get_register_task( - &mut self, - client: Client, - reg_addr: RegisterAddress, - task_result_sender: mpsc::Sender, - ); - - fn submit_push_register_task( - &mut self, - client: Client, - upload_item: UploadItem, - verify_store: bool, - task_result_sender: mpsc::Sender, - ); - - #[expect(clippy::too_many_arguments)] - fn submit_get_store_cost_task( - &mut self, - client: Client, - xorname: XorName, - address: NetworkAddress, - previous_payments: Option<&Vec>, - get_store_cost_strategy: GetStoreCostStrategy, - max_repayments_for_failed_data: usize, - task_result_sender: mpsc::Sender, - ); - - fn submit_make_payment_task( - &mut self, - to_send: Option<(UploadItem, Box)>, - make_payment_sender: mpsc::Sender)>>, - ); - - fn submit_upload_item_task( - &mut self, - upload_item: UploadItem, - client: Client, - previous_payments: Option<&Vec>, - verify_store: bool, - retry_strategy: RetryStrategy, - task_result_sender: mpsc::Sender, - ); -} - -// Configuration functions are used in tests. So these are defined here and re-used inside `Uploader` -impl InnerUploader { - pub(super) fn set_cfg(&mut self, cfg: UploadCfg) { - self.cfg = cfg; - } - - pub(super) fn set_batch_size(&mut self, batch_size: usize) { - self.cfg.batch_size = batch_size; - } - - pub(super) fn set_payment_batch_size(&mut self, payment_batch_size: usize) { - self.cfg.payment_batch_size = payment_batch_size; - } - - pub(super) fn set_verify_store(&mut self, verify_store: bool) { - self.cfg.verify_store = verify_store; - } - - pub(super) fn set_show_holders(&mut self, show_holders: bool) { - self.cfg.show_holders = show_holders; - } - - pub(super) fn set_retry_strategy(&mut self, retry_strategy: RetryStrategy) { - self.cfg.retry_strategy = retry_strategy; - } - - pub(super) fn set_max_repayments_for_failed_data(&mut self, retries: usize) { - self.cfg.max_repayments_for_failed_data = retries; - } - - #[cfg(feature = "registers")] - pub(super) fn set_collect_registers(&mut self, collect_registers: bool) { - self.cfg.collect_registers = collect_registers; - } - - pub(super) fn get_event_receiver(&mut self) -> mpsc::Receiver { - let (tx, rx) = mpsc_channel(100); - self.event_sender = Some(tx); - rx - } - - #[cfg(feature = "fs")] - pub(super) fn insert_chunk_paths( - &mut self, - chunks: impl IntoIterator, - ) { - self.all_upload_items - .extend(chunks.into_iter().map(|(xorname, path)| { - let item = UploadItem::Chunk { - address: ChunkAddress::new(xorname), - chunk: Either::Right(path), - }; - (xorname, item) - })); - } - - #[cfg(feature = "data")] - pub(super) fn insert_chunks(&mut self, chunks: impl IntoIterator) { - self.all_upload_items - .extend(chunks.into_iter().map(|chunk| { - let xorname = *chunk.name(); - let item = UploadItem::Chunk { - address: *chunk.address(), - chunk: Either::Left(chunk), - }; - (xorname, item) - })); - } - - #[cfg(feature = "registers")] - pub(super) fn insert_register(&mut self, registers: impl IntoIterator) { - self.all_upload_items - .extend(registers.into_iter().map(|reg| { - let address = *reg.address(); - let item = UploadItem::Register { address, reg }; - (address.xorname(), item) - })); - } -} - -#[derive(Debug, Clone)] -enum UploadItem { - #[cfg(feature = "data")] - Chunk { - address: ChunkAddress, - // Either the actual chunk or the path to the chunk. - chunk: Either, - }, - #[cfg(feature = "registers")] - Register { - address: RegisterAddress, - reg: Register, - }, -} - -impl UploadItem { - fn address(&self) -> NetworkAddress { - match self { - #[cfg(feature = "data")] - Self::Chunk { address, .. } => NetworkAddress::from_chunk_address(*address), - #[cfg(feature = "registers")] - Self::Register { address, .. } => NetworkAddress::from_register_address(*address), - } - } - - fn xorname(&self) -> XorName { - match self { - #[cfg(feature = "data")] - UploadItem::Chunk { address, .. } => *address.xorname(), - #[cfg(feature = "registers")] - UploadItem::Register { address, .. } => address.xorname(), - } - } -} - -#[derive(Debug)] -enum TaskResult { - #[cfg(feature = "registers")] - GetRegisterFromNetworkOk { - remote_register: Register, - }, - #[cfg(feature = "registers")] - GetRegisterFromNetworkErr(XorName), - #[cfg(feature = "registers")] - PushRegisterOk { - updated_register: Register, - }, - PushRegisterErr(XorName), - GetStoreCostOk { - xorname: XorName, - quote: Box, - }, - GetStoreCostErr { - xorname: XorName, - get_store_cost_strategy: GetStoreCostStrategy, - max_repayments_reached: bool, - }, - MakePaymentsOk { - payment_proofs: HashMap, - }, - MakePaymentsErr { - failed_xornames: Vec<(XorName, Box)>, - }, - UploadOk(XorName), - UploadErr { - xorname: XorName, - io_error: Option>, - }, -} - -#[derive(Debug, Clone)] -enum GetStoreCostStrategy { - /// Selects the PeerId with the lowest quote - Cheapest, - /// Selects the cheapest PeerId that we have not made payment to. - SelectDifferentPayee, -} diff --git a/autonomi/src/uploader/tests/mod.rs b/autonomi/src/uploader/tests/mod.rs deleted file mode 100644 index 80fb47f415..0000000000 --- a/autonomi/src/uploader/tests/mod.rs +++ /dev/null @@ -1,557 +0,0 @@ -// Copyright 2024 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. - -mod setup; - -use crate::uploader::{ - tests::setup::{ - get_dummy_chunk_paths, get_dummy_registers, get_inner_uploader, start_uploading_with_steps, - TestSteps, - }, - UploadError, UploadEvent, -}; -use assert_matches::assert_matches; -use bls::SecretKey; -use eyre::Result; -use sn_logging::LogBuilder; -use std::collections::VecDeque; -use tempfile::tempdir; - -// ===== HAPPY PATH ======= - -/// 1. Chunk: if cost =0, then chunk is present in the network. -#[tokio::test] -async fn chunk_that_already_exists_in_the_network_should_return_zero_store_cost() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let temp_dir = tempdir()?; - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_chunk_paths(get_dummy_chunk_paths(1, temp_dir.path().to_path_buf())); - - // the path to test - let steps = vec![TestSteps::GetStoreCostOk { - trigger_zero_cost: true, - assert_select_different_payee: false, - }]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - SecretKey::random(), - task_result_rx, - ); - - let _stats = upload_handle.await??; - let events = events_handle.await?; - - assert_eq!(events.len(), 1); - assert_matches!(events[0], UploadEvent::ChunkAlreadyExistsInNetwork(_)); - Ok(()) -} - -/// 2. Chunk: if cost !=0, then make payment upload to the network. -#[tokio::test] -async fn chunk_should_be_paid_for_and_uploaded_if_cost_is_not_zero() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let temp_dir = tempdir()?; - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_chunk_paths(get_dummy_chunk_paths(1, temp_dir.path().to_path_buf())); - - // the path to test - let steps = vec![ - TestSteps::GetStoreCostOk { - trigger_zero_cost: false, - assert_select_different_payee: false, - }, - TestSteps::MakePaymentOk, - TestSteps::UploadItemOk, - ]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - SecretKey::random(), - task_result_rx, - ); - - let _stats = upload_handle.await??; - let events = events_handle.await?; - - assert_eq!(events.len(), 2); - assert_matches!(events[0], UploadEvent::PaymentMade { .. }); - assert_matches!(events[1], UploadEvent::ChunkUploaded(..)); - Ok(()) -} - -/// 3. Register: if GET register = ok, then merge and push the register. -#[tokio::test] -async fn register_should_be_merged_and_pushed_if_it_already_exists_in_the_network() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - let register_sk = SecretKey::random(); - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_register(get_dummy_registers(1, ®ister_sk)); - - // the path to test - let steps = vec![TestSteps::GetRegisterOk, TestSteps::PushRegisterOk]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - register_sk, - task_result_rx, - ); - - let _stats = upload_handle.await??; - let events = events_handle.await?; - - assert_eq!(events.len(), 1); - assert_matches!(events[0], UploadEvent::RegisterUpdated { .. }); - Ok(()) -} - -/// 4. Register: if Get register = err, then get store cost and upload. -#[tokio::test] -async fn register_should_be_paid_and_uploaded_if_it_does_not_exists() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - let register_sk = SecretKey::random(); - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_register(get_dummy_registers(1, ®ister_sk)); - - // the path to test - // todo: what if cost = 0 even after GetRegister returns error. check that - let steps = vec![ - TestSteps::GetRegisterErr, - TestSteps::GetStoreCostOk { - trigger_zero_cost: false, - assert_select_different_payee: false, - }, - TestSteps::MakePaymentOk, - TestSteps::UploadItemOk, - ]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - register_sk, - task_result_rx, - ); - - let _stats = upload_handle.await??; - let events = events_handle.await?; - - assert_eq!(events.len(), 2); - assert_matches!(events[0], UploadEvent::PaymentMade { .. }); - assert_matches!(events[1], UploadEvent::RegisterUploaded(..)); - Ok(()) -} - -// ===== REPAYMENTS ====== - -/// 1. Chunks: if upload task fails > threshold, then get store cost should be triggered with SelectDifferentStrategy -/// and then uploaded. -#[tokio::test] -async fn chunks_should_perform_repayment_if_the_upload_fails_multiple_times() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let temp_dir = tempdir()?; - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_chunk_paths(get_dummy_chunk_paths(1, temp_dir.path().to_path_buf())); - - // the path to test - let steps = vec![ - TestSteps::GetStoreCostOk { - trigger_zero_cost: false, - assert_select_different_payee: false, - }, - TestSteps::MakePaymentOk, - TestSteps::UploadItemErr { io_error: false }, - TestSteps::UploadItemErr { io_error: false }, - TestSteps::GetStoreCostOk { - trigger_zero_cost: false, - assert_select_different_payee: true, - }, - TestSteps::MakePaymentOk, - TestSteps::UploadItemOk, - ]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - SecretKey::random(), - task_result_rx, - ); - - let _stats = upload_handle.await??; - let events = events_handle.await?; - - assert_eq!(events.len(), 3); - assert_matches!(events[0], UploadEvent::PaymentMade { .. }); - assert_matches!(events[1], UploadEvent::PaymentMade { .. }); - assert_matches!(events[2], UploadEvent::ChunkUploaded(..)); - Ok(()) -} - -/// 2. Register: if upload task fails > threshold, then get store cost should be triggered with SelectDifferentStrategy -/// and then uploaded. -#[tokio::test] -async fn registers_should_perform_repayment_if_the_upload_fails_multiple_times() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - let register_sk = SecretKey::random(); - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_register(get_dummy_registers(1, ®ister_sk)); - - // the path to test - let steps = vec![ - TestSteps::GetRegisterErr, - TestSteps::GetStoreCostOk { - trigger_zero_cost: false, - assert_select_different_payee: false, - }, - TestSteps::MakePaymentOk, - TestSteps::UploadItemErr { io_error: false }, - TestSteps::UploadItemErr { io_error: false }, - TestSteps::GetStoreCostOk { - trigger_zero_cost: false, - assert_select_different_payee: true, - }, - TestSteps::MakePaymentOk, - TestSteps::UploadItemOk, - ]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - register_sk, - task_result_rx, - ); - - let _stats = upload_handle.await??; - let events = events_handle.await?; - - assert_eq!(events.len(), 3); - assert_matches!(events[0], UploadEvent::PaymentMade { .. }); - assert_matches!(events[1], UploadEvent::PaymentMade { .. }); - assert_matches!(events[2], UploadEvent::RegisterUploaded(..)); - Ok(()) -} - -// ===== ERRORS ======= -/// 1. Registers: Multiple PushRegisterErr should result in Error::SequentialNetworkErrors -#[tokio::test] -async fn register_upload_should_error_out_if_there_are_multiple_push_failures() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - let register_sk = SecretKey::random(); - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_register(get_dummy_registers(1, ®ister_sk)); - - // the path to test - let steps = vec![ - TestSteps::GetRegisterOk, - TestSteps::PushRegisterErr, - TestSteps::PushRegisterErr, - ]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - register_sk, - task_result_rx, - ); - - assert_matches!( - upload_handle.await?, - Err(UploadError::SequentialNetworkErrors) - ); - let events = events_handle.await?; - - // UploadEvent::Error is performed by the caller of start_upload, so we can't check that one here. - assert_eq!(events.len(), 0); - Ok(()) -} - -/// 2. Chunk: Multiple errors during get store cost should result in Error::SequentialNetworkErrors -#[tokio::test] -async fn chunk_should_error_out_if_there_are_multiple_errors_during_get_store_cost() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let temp_dir = tempdir()?; - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_chunk_paths(get_dummy_chunk_paths(1, temp_dir.path().to_path_buf())); - - // the path to test - let steps = vec![ - TestSteps::GetStoreCostErr { - assert_select_different_payee: false, - }, - TestSteps::GetStoreCostErr { - assert_select_different_payee: false, - }, - ]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - SecretKey::random(), - task_result_rx, - ); - - assert_matches!( - upload_handle.await?, - Err(UploadError::SequentialNetworkErrors) - ); - let events = events_handle.await?; - - // UploadEvent::Error is performed by the caller of start_upload, so we can't check that one here. - assert_eq!(events.len(), 0); - Ok(()) -} - -/// 3. Register: Multiple errors during get store cost should result in Error::SequentialNetworkErrors -#[tokio::test] -async fn register_should_error_out_if_there_are_multiple_errors_during_get_store_cost() -> Result<()> -{ - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - let register_sk = SecretKey::random(); - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_register(get_dummy_registers(1, ®ister_sk)); - - // the path to test - let steps = vec![ - TestSteps::GetRegisterErr, - TestSteps::GetStoreCostErr { - assert_select_different_payee: false, - }, - TestSteps::GetStoreCostErr { - assert_select_different_payee: false, - }, - ]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - register_sk, - task_result_rx, - ); - - assert_matches!( - upload_handle.await?, - Err(UploadError::SequentialNetworkErrors) - ); - let events = events_handle.await?; - - // UploadEvent::Error is performed by the caller of start_upload, so we can't check that one here. - assert_eq!(events.len(), 0); - Ok(()) -} - -/// 4. Chunk: Multiple errors during make payment should result in Error::SequentialUploadPaymentError -#[tokio::test] -async fn chunk_should_error_out_if_there_are_multiple_errors_during_make_payment() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let temp_dir = tempdir()?; - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_chunk_paths(get_dummy_chunk_paths(1, temp_dir.path().to_path_buf())); - - // the path to test - let steps = vec![ - TestSteps::GetStoreCostOk { - trigger_zero_cost: false, - assert_select_different_payee: false, - }, - TestSteps::MakePaymentErr, - TestSteps::MakePaymentErr, - ]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - SecretKey::random(), - task_result_rx, - ); - - assert_matches!( - upload_handle.await?, - Err(UploadError::SequentialUploadPaymentError) - ); - let events = events_handle.await?; - - // UploadEvent::Error is performed by the caller of start_upload, so we can't check that one here. - assert_eq!(events.len(), 0); - Ok(()) -} - -/// 5. Register: Multiple errors during make payment should result in Error::SequentialUploadPaymentError -#[tokio::test] -async fn register_should_error_out_if_there_are_multiple_errors_during_make_payment() -> Result<()> -{ - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - let register_sk = SecretKey::random(); - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_register(get_dummy_registers(1, ®ister_sk)); - - // the path to test - let steps = vec![ - TestSteps::GetRegisterErr, - TestSteps::GetStoreCostOk { - trigger_zero_cost: false, - assert_select_different_payee: false, - }, - TestSteps::MakePaymentErr, - TestSteps::MakePaymentErr, - ]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - register_sk, - task_result_rx, - ); - - assert_matches!( - upload_handle.await?, - Err(UploadError::SequentialUploadPaymentError) - ); - let events = events_handle.await?; - - // UploadEvent::Error is performed by the caller of start_upload, so we can't check that one here. - assert_eq!(events.len(), 0); - Ok(()) -} - -// 6: Chunks + Registers: if the number of repayments exceed a threshold, it should return MaximumRepaymentsReached error. -#[tokio::test] -async fn maximum_repayment_error_should_be_triggered_during_get_store_cost() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let temp_dir = tempdir()?; - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_chunk_paths(get_dummy_chunk_paths(1, temp_dir.path().to_path_buf())); - - // the path to test - let steps = vec![ - // initial payment done - TestSteps::GetStoreCostOk { - trigger_zero_cost: false, - assert_select_different_payee: false, - }, - TestSteps::MakePaymentOk, - TestSteps::UploadItemErr { io_error: false }, - TestSteps::UploadItemErr { io_error: false }, - // first repayment - TestSteps::GetStoreCostOk { - trigger_zero_cost: false, - assert_select_different_payee: true, - }, - TestSteps::MakePaymentOk, - TestSteps::UploadItemErr { io_error: false }, - TestSteps::UploadItemErr { io_error: false }, - // thus after reaching max repayments, we should error out during get store cost. - TestSteps::GetStoreCostErr { - assert_select_different_payee: true, - }, - ]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - SecretKey::random(), - task_result_rx, - ); - - assert_matches!( - upload_handle.await?, - Err(UploadError::MaximumRepaymentsReached { .. }) - ); - let events = events_handle.await?; - - assert_eq!(events.len(), 2); - assert_matches!(events[0], UploadEvent::PaymentMade { .. }); - assert_matches!(events[1], UploadEvent::PaymentMade { .. }); - Ok(()) -} - -// 7. if we get io error during upload, then the entire upload should error out. -#[tokio::test] -async fn io_error_during_upload_should_stop_the_uploads() -> Result<()> { - let _log_guards = LogBuilder::init_single_threaded_tokio_test("uploader", true); - let temp_dir = tempdir()?; - let (mut inner_uploader, task_result_rx) = get_inner_uploader()?; - - // cfg - inner_uploader.set_batch_size(1); - inner_uploader.set_payment_batch_size(1); - inner_uploader.insert_chunk_paths(get_dummy_chunk_paths(1, temp_dir.path().to_path_buf())); - - // the path to test - let steps = vec![ - TestSteps::GetStoreCostOk { - trigger_zero_cost: false, - assert_select_different_payee: false, - }, - TestSteps::MakePaymentOk, - TestSteps::UploadItemErr { io_error: true }, - ]; - - let (upload_handle, events_handle) = start_uploading_with_steps( - inner_uploader, - VecDeque::from(steps), - SecretKey::random(), - task_result_rx, - ); - - assert_matches!(upload_handle.await?, Err(UploadError::Io { .. })); - let events = events_handle.await?; - - assert_eq!(events.len(), 1); - assert_matches!(events[0], UploadEvent::PaymentMade { .. }); - Ok(()) -} diff --git a/autonomi/src/uploader/tests/setup.rs b/autonomi/src/uploader/tests/setup.rs deleted file mode 100644 index 98fe2128a7..0000000000 --- a/autonomi/src/uploader/tests/setup.rs +++ /dev/null @@ -1,480 +0,0 @@ -// Copyright 2024 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. - -use crate::{ - client::registers::Register, - uploader::{ - upload::{start_upload, InnerUploader}, - GetStoreCostStrategy, TaskResult, UploadError, UploadEvent, UploadItem, UploadSummary, - UploaderInterface, - }, - Client, -}; -use alloy::{primitives::TxHash, signers::local::PrivateKeySigner}; -use assert_matches::assert_matches; -use bls::SecretKey as BlsSecretKey; -use eyre::Result; -use libp2p::{identity::Keypair, PeerId}; -use rand::thread_rng; -use sn_evm::{EvmNetwork, EvmWallet, PaymentQuote, ProofOfPayment}; -use sn_networking::{NetworkBuilder, PayeeQuote}; -use sn_protocol::{storage::RetryStrategy, NetworkAddress}; -use sn_registers::{RegisterAddress, SignedRegister}; -use std::{ - collections::{BTreeMap, HashMap, VecDeque}, - path::PathBuf, - sync::Arc, -}; -use tokio::{runtime::Handle, sync::mpsc, task::JoinHandle}; -use xor_name::XorName; - -struct TestUploader { - inner: Option, - test_steps: VecDeque, - task_result_sender: mpsc::Sender, - - // test states - make_payment_collector: Vec<(XorName, Box)>, - payments_made_per_xorname: BTreeMap, - payment_batch_size: usize, - register_sk: BlsSecretKey, -} - -impl UploaderInterface for TestUploader { - fn take_inner_uploader(&mut self) -> InnerUploader { - self.inner.take().unwrap() - } - - fn submit_get_register_task( - &mut self, - _client: Client, - reg_addr: RegisterAddress, - _task_result_sender: mpsc::Sender, - ) { - let xorname = reg_addr.xorname(); - let step = self - .test_steps - .pop_front() - .expect("TestSteps are empty. Expected a GetRegister step."); - let handle = Handle::current(); - let register_sk = self.register_sk.clone(); - let task_result_sender = self.task_result_sender.clone(); - - println!("spawn_get_register called for: {xorname:?}. Step to execute: {step:?}"); - info!("TEST: spawn_get_register called for: {xorname:?}. Step to execute: {step:?}"); - match step { - TestSteps::GetRegisterOk => { - handle.spawn(async move { - let remote_register = - SignedRegister::test_new_from_address(reg_addr, ®ister_sk); - let remote_register = Register::test_new_from_register(remote_register); - task_result_sender - .send(TaskResult::GetRegisterFromNetworkOk { remote_register }) - .await - .expect("Failed to send task result"); - }); - } - TestSteps::GetRegisterErr => { - handle.spawn(async move { - task_result_sender - .send(TaskResult::GetRegisterFromNetworkErr(xorname)) - .await - .expect("Failed to send task result"); - }); - } - con => panic!("Test failed: Expected GetRegister step. Got: {con:?}"), - } - } - - fn submit_push_register_task( - &mut self, - _client: Client, - upload_item: UploadItem, - _verify_store: bool, - _task_result_sender: mpsc::Sender, - ) { - let xorname = upload_item.xorname(); - let step = self - .test_steps - .pop_front() - .expect("TestSteps are empty. Expected a PushRegister step."); - let handle = Handle::current(); - let task_result_sender = self.task_result_sender.clone(); - - println!("spawn_push_register called for: {xorname:?}. Step to execute: {step:?}"); - info!("TEST: spawn_push_register called for: {xorname:?}. Step to execute: {step:?}"); - match step { - TestSteps::PushRegisterOk => { - handle.spawn(async move { - let updated_register = match upload_item { - UploadItem::Register { reg, .. } => reg, - _ => panic!("Expected UploadItem::Register"), - }; - task_result_sender - .send(TaskResult::PushRegisterOk { - // this register is just used for returning. - updated_register, - }) - .await - .expect("Failed to send task result"); - }); - } - TestSteps::PushRegisterErr => { - handle.spawn(async move { - task_result_sender - .send(TaskResult::PushRegisterErr(xorname)) - .await - .expect("Failed to send task result"); - }); - } - con => panic!("Test failed: Expected PushRegister step. Got: {con:?}"), - } - } - - fn submit_get_store_cost_task( - &mut self, - _client: Client, - xorname: XorName, - _address: NetworkAddress, - _previous_payments: Option<&Vec>, - get_store_cost_strategy: GetStoreCostStrategy, - max_repayments_for_failed_data: usize, - _task_result_sender: mpsc::Sender, - ) { - let step = self - .test_steps - .pop_front() - .expect("TestSteps are empty. Expected a GetStoreCost step."); - let handle = Handle::current(); - let task_result_sender = self.task_result_sender.clone(); - - println!("spawn_get_store_cost called for: {xorname:?}. Step to execute: {step:?}"); - info!("TEST: spawn_get_store_cost called for: {xorname:?}. Step to execute: {step:?}"); - - let has_max_payments_reached_closure = - |get_store_cost_strategy: &GetStoreCostStrategy| -> bool { - match get_store_cost_strategy { - GetStoreCostStrategy::SelectDifferentPayee => { - if let Some(n_payments) = self.payments_made_per_xorname.get(&xorname) { - InnerUploader::have_we_reached_max_repayments( - *n_payments, - max_repayments_for_failed_data, - ) - } else { - false - } - } - _ => false, - } - }; - - // if select different payee, then it can possibly error out if max_repayments have been reached. - // then the step should've been a GetStoreCostErr. - if has_max_payments_reached_closure(&get_store_cost_strategy) { - assert_matches!(step, TestSteps::GetStoreCostErr { .. }, "Max repayments have been reached, so we expect a GetStoreCostErr, not GetStoreCostOk"); - } - - match step { - TestSteps::GetStoreCostOk { - trigger_zero_cost, - assert_select_different_payee, - } => { - // Make sure that the received strategy is the one defined in the step. - assert!(match get_store_cost_strategy { - // match here to not miss out on any new strategies. - GetStoreCostStrategy::Cheapest => !assert_select_different_payee, - GetStoreCostStrategy::SelectDifferentPayee { .. } => - assert_select_different_payee, - }); - - let mut quote = PaymentQuote::zero(); - if !trigger_zero_cost { - quote.cost = 1.into(); - } - handle.spawn(async move { - task_result_sender - .send(TaskResult::GetStoreCostOk { - xorname, - quote: Box::new(( - PeerId::random(), - PrivateKeySigner::random().address(), - quote, - )), - }) - .await - .expect("Failed to send task result"); - }); - } - TestSteps::GetStoreCostErr { - assert_select_different_payee, - } => { - // Make sure that the received strategy is the one defined in the step. - assert!(match get_store_cost_strategy { - // match here to not miss out on any new strategies. - GetStoreCostStrategy::Cheapest => !assert_select_different_payee, - GetStoreCostStrategy::SelectDifferentPayee { .. } => - assert_select_different_payee, - }); - let max_repayments_reached = - has_max_payments_reached_closure(&get_store_cost_strategy); - - handle.spawn(async move { - task_result_sender - .send(TaskResult::GetStoreCostErr { - xorname, - get_store_cost_strategy, - max_repayments_reached, - }) - .await - .expect("Failed to send task result"); - }); - } - con => panic!("Test failed: Expected GetStoreCost step. Got: {con:?}"), - } - } - - fn submit_make_payment_task( - &mut self, - to_send: Option<(UploadItem, Box)>, - _make_payment_sender: mpsc::Sender)>>, - ) { - let step = self - .test_steps - .pop_front() - .expect("TestSteps are empty. Expected a MakePayment step."); - let handle = Handle::current(); - let task_result_sender = self.task_result_sender.clone(); - match &to_send { - Some((upload_item, quote)) => { - let xorname = upload_item.xorname(); - println!("spawn_make_payment called for: {xorname:?}. Step to execute: {step:?}"); - info!( - "TEST: spawn_make_payment called for: {xorname:?}. Step to execute: {step:?}" - ); - - self.make_payment_collector - .push((upload_item.xorname(), quote.clone())); - } - None => { - println!( - "spawn_make_payment called with force make payment. Step to execute: {step:?}" - ); - info!("TEST: spawn_make_payment called with force make payment. Step to execute: {step:?}"); - } - } - - // gotta collect batch size before sending task result. - let _make_payment = self.make_payment_collector.len() >= self.payment_batch_size - || (to_send.is_none() && !self.make_payment_collector.is_empty()); - - match step { - // TestSteps::MakePaymentJustCollectItem => { - // // The test expected for us to just collect item, but if the logic wants us to make payment, then it as - // // error - // assert!(!make_payment); - // } - TestSteps::MakePaymentOk => { - let payment_proofs = std::mem::take(&mut self.make_payment_collector) - .into_iter() - .map(|(xorname, _)| { - ( - xorname, - ProofOfPayment { - quote: PaymentQuote::zero(), - tx_hash: TxHash::repeat_byte(0), - }, - ) - }) - .collect::>(); - // track the payments per xorname - for xorname in payment_proofs.keys() { - let entry = self.payments_made_per_xorname.entry(*xorname).or_insert(0); - *entry += 1; - } - - handle.spawn(async move { - task_result_sender - .send(TaskResult::MakePaymentsOk { payment_proofs }) - .await - .expect("Failed to send task result"); - }); - } - TestSteps::MakePaymentErr => { - let failed_xornames = std::mem::take(&mut self.make_payment_collector); - - handle.spawn(async move { - task_result_sender - .send(TaskResult::MakePaymentsErr { failed_xornames }) - .await - .expect("Failed to send task result"); - }); - } - con => panic!("Test failed: Expected MakePayment step. Got: {con:?}"), - } - } - - fn submit_upload_item_task( - &mut self, - upload_item: UploadItem, - _client: Client, - _previous_payments: Option<&Vec>, - _verify_store: bool, - _retry_strategy: RetryStrategy, - _task_result_sender: mpsc::Sender, - ) { - let xorname = upload_item.xorname(); - let step = self - .test_steps - .pop_front() - .expect("TestSteps are empty. Expected a UploadItem step."); - let handle = Handle::current(); - let task_result_sender = self.task_result_sender.clone(); - - println!("spawn_upload_item called for: {xorname:?}. Step to execute: {step:?}"); - info!("TEST: spawn_upload_item called for: {xorname:?}. Step to execute: {step:?}"); - match step { - TestSteps::UploadItemOk => { - handle.spawn(async move { - task_result_sender - .send(TaskResult::UploadOk(xorname)) - .await - .expect("Failed to send task result"); - }); - } - TestSteps::UploadItemErr { io_error } => { - handle.spawn(async move { - let io_error = if io_error { - Some(Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - "Test IO Error", - ))) - } else { - None - }; - task_result_sender - .send(TaskResult::UploadErr { xorname, io_error }) - .await - .expect("Failed to send task result"); - }); - } - con => panic!("Test failed: Expected UploadItem step. Got: {con:?}"), - } - } -} - -#[derive(Debug, Clone)] -pub enum TestSteps { - GetRegisterOk, - GetRegisterErr, - PushRegisterOk, - PushRegisterErr, - GetStoreCostOk { - trigger_zero_cost: bool, - assert_select_different_payee: bool, - }, - GetStoreCostErr { - assert_select_different_payee: bool, - }, - // MakePaymentJustCollectItem, - MakePaymentOk, - MakePaymentErr, - UploadItemOk, - UploadItemErr { - io_error: bool, - }, -} - -pub fn get_inner_uploader() -> Result<(InnerUploader, mpsc::Sender)> { - let client = build_unconnected_client()?; - - let mut inner = InnerUploader::new( - client, - EvmWallet::new_with_random_wallet(EvmNetwork::new_custom( - "http://localhost:63319/", - "0x5FbDB2315678afecb367f032d93F642f64180aa3", - "0x8464135c8F25Da09e49BC8782676a84730C318bC", - )), - ); - let (task_result_sender, task_result_receiver) = mpsc::channel(100); - inner.testing_task_channels = Some((task_result_sender.clone(), task_result_receiver)); - - Ok((inner, task_result_sender)) -} - -// Spawns two tasks. One is the actual upload task that will return an UploadStat when completed. -// The other is a one to collect all the UploadEvent emitted by the previous task. -pub fn start_uploading_with_steps( - mut inner_uploader: InnerUploader, - test_steps: VecDeque, - register_sk: BlsSecretKey, - task_result_sender: mpsc::Sender, -) -> ( - JoinHandle>, - JoinHandle>, -) { - let payment_batch_size = inner_uploader.cfg.payment_batch_size; - let mut upload_event_rx = inner_uploader.get_event_receiver(); - - let upload_handle = tokio::spawn(start_upload(Box::new(TestUploader { - inner: Some(inner_uploader), - test_steps, - task_result_sender, - make_payment_collector: Default::default(), - payments_made_per_xorname: Default::default(), - payment_batch_size, - register_sk, - }))); - - let event_handle = tokio::spawn(async move { - let mut events = vec![]; - while let Some(event) = upload_event_rx.recv().await { - events.push(event); - } - events - }); - - (upload_handle, event_handle) -} - -// Collect all the upload events into a list - -// Build a very simple client struct for testing. This does not connect to any network. -// The UploaderInterface eliminates the need for direct networking in tests. -pub fn build_unconnected_client() -> Result { - let network_builder = NetworkBuilder::new(Keypair::generate_ed25519(), true); - let (network, ..) = network_builder.build_client()?; - let client = Client { - network, - client_event_sender: Arc::new(None), - }; - Ok(client) -} - -// We don't perform any networking, so the paths can be dummy ones. -pub fn get_dummy_chunk_paths(num: usize, temp_dir: PathBuf) -> Vec<(XorName, PathBuf)> { - let mut rng = thread_rng(); - let mut chunks = Vec::with_capacity(num); - for _ in 0..num { - chunks.push((XorName::random(&mut rng), temp_dir.clone())); - } - chunks -} - -pub fn get_dummy_registers(num: usize, register_sk: &BlsSecretKey) -> Vec { - let mut rng = thread_rng(); - let mut registers = Vec::with_capacity(num); - for _ in 0..num { - // test_new_from_address that is used during get_register, - // uses AnyoneCanWrite permission, so use the same here - let address = RegisterAddress::new(XorName::random(&mut rng), register_sk.public_key()); - let base_register = SignedRegister::test_new_from_address(address, register_sk); - let register = Register::test_new_from_register(base_register); - registers.push(register); - } - registers -} diff --git a/autonomi/src/uploader/upload.rs b/autonomi/src/uploader/upload.rs deleted file mode 100644 index dac748549c..0000000000 --- a/autonomi/src/uploader/upload.rs +++ /dev/null @@ -1,1232 +0,0 @@ -// Copyright 2024 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. - -use super::{ - GetStoreCostStrategy, TaskResult, UploadCfg, UploadEvent, UploadItem, UploadSummary, Uploader, - UploaderInterface, PAYMENT_BATCH_SIZE, -}; -#[cfg(feature = "registers")] -use crate::client::registers::Register; -use crate::{uploader::UploadError, utils::payment_proof_from_quotes_and_payments, Client}; -use bytes::Bytes; -use itertools::Either; -use libp2p::{kad::Quorum, PeerId}; -use rand::{thread_rng, Rng}; -use sn_evm::{Amount, EvmWallet, ProofOfPayment}; -use sn_networking::target_arch::{mpsc, mpsc_channel, mpsc_recv, spawn}; -use sn_networking::{GetRecordCfg, PayeeQuote, PutRecordCfg, VerificationKind}; -#[cfg(feature = "data")] -use sn_protocol::{messages::ChunkProof, storage::Chunk}; -use sn_protocol::{storage::RetryStrategy, NetworkAddress}; -#[cfg(feature = "registers")] -use sn_registers::RegisterAddress; -use std::{ - collections::{HashMap, HashSet}, - num::NonZero, -}; -use xor_name::XorName; - -/// The maximum number of sequential payment failures before aborting the upload process. -#[cfg(not(test))] -const MAX_SEQUENTIAL_PAYMENT_FAILS: usize = 3; -#[cfg(test)] -const MAX_SEQUENTIAL_PAYMENT_FAILS: usize = 1; - -/// The maximum number of sequential network failures before aborting the upload process. -// todo: use uploader.retry_strategy.get_count() instead. -#[cfg(not(test))] -const MAX_SEQUENTIAL_NETWORK_ERRORS: usize = 32; -#[cfg(test)] -const MAX_SEQUENTIAL_NETWORK_ERRORS: usize = 1; - -/// The number of upload failures for a single data item before -#[cfg(not(test))] -const UPLOAD_FAILURES_BEFORE_SELECTING_DIFFERENT_PAYEE: usize = 3; -#[cfg(test)] -const UPLOAD_FAILURES_BEFORE_SELECTING_DIFFERENT_PAYEE: usize = 1; - -type Result = std::result::Result; - -// TODO: -// 1. track each batch with an id -// 2. create a irrecoverable error type, so we can bail on io/serialization etc. -// 3. separate cfgs/retries for register/chunk etc -// 4. log whenever we insert/remove items. i.e., don't ignore values with `let _` - -/// The main loop that performs the upload process. -/// An interface is passed here for easy testing. -pub(super) async fn start_upload( - mut interface: Box, -) -> Result { - let mut uploader = interface.take_inner_uploader(); - - uploader.validate_upload_cfg()?; - - // Take out the testing task senders if any. This is only set for tests. - let (task_result_sender, mut task_result_receiver) = - if let Some(channels) = uploader.testing_task_channels.take() { - channels - } else { - // 6 because of the 6 pipelines, 1 for redundancy. - mpsc_channel(uploader.cfg.batch_size * 6 + 1) - }; - let (make_payment_sender, make_payment_receiver) = mpsc_channel(uploader.cfg.batch_size); - - uploader.start_payment_processing_thread( - make_payment_receiver, - task_result_sender.clone(), - uploader.cfg.payment_batch_size, - )?; - - // chunks can be pushed to pending_get_store_cost directly - #[cfg(feature = "data")] - { - uploader.pending_to_get_store_cost = uploader - .all_upload_items - .iter() - .filter_map(|(xorname, item)| { - if let UploadItem::Chunk { .. } = item { - Some((*xorname, GetStoreCostStrategy::Cheapest)) - } else { - None - } - }) - .collect(); - } - - // registers have to be verified + merged with remote replica, so we have to fetch it first. - #[cfg(feature = "registers")] - { - uploader.pending_to_get_register = uploader - .all_upload_items - .iter() - .filter_map(|(_xorname, item)| { - if let UploadItem::Register { address, .. } = item { - Some(*address) - } else { - None - } - }) - .collect(); - } - - loop { - // Break if we have uploaded all the items. - // The loop also breaks if we fail to get_store_cost / make payment / upload for n consecutive times. - if uploader.all_upload_items.is_empty() { - debug!("Upload items are empty, exiting main upload loop."); - - // To avoid empty final_balance when all items are skipped. Skip for tests. - #[cfg(not(test))] - { - uploader.upload_final_balance = uploader - .wallet - .balance_of_tokens() - .await - .inspect_err(|err| { - error!("Failed to get wallet balance: {err:?}"); - })?; - } - - debug!("UPLOADER STATE: finished uploading all items {uploader:?}"); - let summary = UploadSummary { - storage_cost: uploader.tokens_spent, - final_balance: uploader.upload_final_balance, - uploaded_addresses: uploader.uploaded_addresses, - uploaded_count: uploader.uploaded_count, - skipped_count: uploader.skipped_count, - uploaded_registers: uploader.uploaded_registers, - }; - - if !uploader.max_repayments_reached.is_empty() { - error!( - "The maximum repayments were reached for these addresses: {:?}", - uploader.max_repayments_reached - ); - return Err(UploadError::MaximumRepaymentsReached { - items: uploader.max_repayments_reached.into_iter().collect(), - }); - } - - return Ok(summary); - } - - #[cfg(feature = "registers")] - { - // try to GET register if we have enough buffer. - // The results of the get & push register steps are used to fill up `pending_to_get_store` cost - // Since the get store cost list is the init state, we don't have to check if it is not full. - while !uploader.pending_to_get_register.is_empty() - && uploader.on_going_get_register.len() < uploader.cfg.batch_size - { - if let Some(reg_addr) = uploader.pending_to_get_register.pop() { - trace!("Conditions met for GET registers {:?}", reg_addr.xorname()); - let _ = uploader.on_going_get_register.insert(reg_addr.xorname()); - interface.submit_get_register_task( - uploader.client.clone(), - reg_addr, - task_result_sender.clone(), - ); - } - } - - // try to push register if we have enough buffer. - // No other checks for the same reason as the above step. - - while !uploader.pending_to_push_register.is_empty() - && uploader.on_going_get_register.len() < uploader.cfg.batch_size - { - let upload_item = uploader.pop_item_for_push_register()?; - trace!( - "Conditions met for push registers {:?}", - upload_item.xorname() - ); - let _ = uploader - .on_going_push_register - .insert(upload_item.xorname()); - interface.submit_push_register_task( - uploader.client.clone(), - upload_item, - uploader.cfg.verify_store, - task_result_sender.clone(), - ); - } - } - - // try to get store cost for an item if pending_to_pay needs items & if we have enough buffer. - while !uploader.pending_to_get_store_cost.is_empty() - && uploader.on_going_get_cost.len() < uploader.cfg.batch_size - && uploader.pending_to_pay.len() < uploader.cfg.payment_batch_size - { - let (xorname, address, get_store_cost_strategy) = - uploader.pop_item_for_get_store_cost()?; - trace!("Conditions met for get store cost. {xorname:?} {get_store_cost_strategy:?}",); - - let _ = uploader.on_going_get_cost.insert(xorname); - interface.submit_get_store_cost_task( - uploader.client.clone(), - xorname, - address, - uploader.payment_proofs.get(&xorname), - get_store_cost_strategy, - uploader.cfg.max_repayments_for_failed_data, - task_result_sender.clone(), - ); - } - - // try to make payment for an item if pending_to_upload needs items & if we have enough buffer. - while !uploader.pending_to_pay.is_empty() - && uploader.on_going_payments.len() < uploader.cfg.payment_batch_size - && uploader.pending_to_upload.len() < uploader.cfg.batch_size - { - let (upload_item, quote) = uploader.pop_item_for_make_payment()?; - trace!( - "Conditions met for making payments. {:?} {quote:?}", - upload_item.xorname() - ); - let _ = uploader.on_going_payments.insert(upload_item.xorname()); - - interface - .submit_make_payment_task(Some((upload_item, quote)), make_payment_sender.clone()); - } - - // try to upload if we have enough buffer to upload. - while !uploader.pending_to_upload.is_empty() - && uploader.on_going_uploads.len() < uploader.cfg.batch_size - { - #[cfg(test)] - trace!("UPLOADER STATE: upload_item : {uploader:?}"); - let upload_item = uploader.pop_item_for_upload_item()?; - let xorname = upload_item.xorname(); - - trace!("Conditions met for uploading. {xorname:?}"); - let _ = uploader.on_going_uploads.insert(xorname); - interface.submit_upload_item_task( - upload_item, - uploader.client.clone(), - uploader.payment_proofs.get(&xorname), - uploader.cfg.verify_store, - uploader.cfg.retry_strategy, - task_result_sender.clone(), - ); - } - - // Fire None to trigger a forced round of making leftover payments, if there are not enough store cost tasks - // to fill up the buffer. - if uploader.pending_to_get_store_cost.is_empty() - && uploader.on_going_get_cost.is_empty() - && !uploader.on_going_payments.is_empty() - && uploader.on_going_payments.len() < uploader.cfg.payment_batch_size - { - #[cfg(test)] - trace!("UPLOADER STATE: make_payment (forced): {uploader:?}"); - - debug!("There are not enough on going payments to trigger a batch Payment and no get_store_costs to fill the batch. Triggering forced round of payment"); - interface.submit_make_payment_task(None, make_payment_sender.clone()); - } - - #[cfg(test)] - trace!("UPLOADER STATE: before await task result: {uploader:?}"); - - trace!("Fetching task result"); - let task_result = mpsc_recv(&mut task_result_receiver) - .await - .ok_or(UploadError::InternalError)?; - trace!("Received task result: {task_result:?}"); - match task_result { - #[cfg(feature = "registers")] - TaskResult::GetRegisterFromNetworkOk { remote_register } => { - // if we got back the register, then merge & PUT it. - let xorname = remote_register.address().xorname(); - trace!("TaskResult::GetRegisterFromNetworkOk for remote register: {xorname:?} \n{remote_register:?}"); - let _ = uploader.on_going_get_register.remove(&xorname); - - let reg = uploader.all_upload_items.get_mut(&xorname).ok_or_else(|| { - error!("Register {xorname:?} not found in all_upload_items."); - UploadError::InternalError - })?; - if let UploadItem::Register { reg, .. } = reg { - reg.merge(&remote_register).inspect_err(|err| { - error!("Uploader failed to merge remote register: {err:?}"); - })?; - uploader.pending_to_push_register.push(xorname); - } - } - #[cfg(feature = "registers")] - TaskResult::GetRegisterFromNetworkErr(xorname) => { - // then the register is a new one. It can follow the same flow as chunks now. - let _ = uploader.on_going_get_register.remove(&xorname); - - uploader - .pending_to_get_store_cost - .push((xorname, GetStoreCostStrategy::Cheapest)); - } - #[cfg(feature = "registers")] - TaskResult::PushRegisterOk { updated_register } => { - // push modifies the register, so we return this instead of the one from all_upload_items - let xorname = updated_register.address().xorname(); - let _ = uploader.on_going_push_register.remove(&xorname); - uploader.skipped_count += 1; - let _ = uploader - .uploaded_addresses - .insert(NetworkAddress::from_register_address( - *updated_register.address(), - )); - - let _old_register = - uploader.all_upload_items.remove(&xorname).ok_or_else(|| { - error!("Register {xorname:?} not found in all_upload_items"); - UploadError::InternalError - })?; - - if uploader.cfg.collect_registers { - let _ = uploader - .uploaded_registers - .insert(*updated_register.address(), updated_register.clone()); - } - uploader.emit_upload_event(UploadEvent::RegisterUpdated(updated_register)); - } - #[cfg(feature = "registers")] - TaskResult::PushRegisterErr(xorname) => { - // the register failed to be Pushed. Retry until failure. - let _ = uploader.on_going_push_register.remove(&xorname); - uploader.pending_to_push_register.push(xorname); - - uploader.push_register_errors += 1; - if uploader.push_register_errors > MAX_SEQUENTIAL_NETWORK_ERRORS { - error!("Max sequential network failures reached during PushRegisterErr."); - return Err(UploadError::SequentialNetworkErrors); - } - } - TaskResult::GetStoreCostOk { xorname, quote } => { - let _ = uploader.on_going_get_cost.remove(&xorname); - uploader.get_store_cost_errors = 0; // reset error if Ok. We only throw error after 'n' sequential errors - - trace!("GetStoreCostOk for {xorname:?}'s store_cost {:?}", quote.2); - - if !quote.2.cost.is_zero() { - uploader.pending_to_pay.push((xorname, quote)); - } - // if cost is 0, then it already in the network. - else { - // remove the item since we have uploaded it. - let removed_item = - uploader.all_upload_items.remove(&xorname).ok_or_else(|| { - error!("Uploadable item not found in all_upload_items: {xorname:?}"); - UploadError::InternalError - })?; - let _ = uploader.uploaded_addresses.insert(removed_item.address()); - trace!("{xorname:?} has store cost of 0 and it already exists on the network"); - uploader.skipped_count += 1; - - // if during the first try we skip the item, then it is already present in the network. - match removed_item { - #[cfg(feature = "data")] - UploadItem::Chunk { address, .. } => { - uploader.emit_upload_event(UploadEvent::ChunkAlreadyExistsInNetwork( - address, - )); - } - #[cfg(feature = "registers")] - UploadItem::Register { reg, .. } => { - if uploader.cfg.collect_registers { - let _ = uploader - .uploaded_registers - .insert(*reg.address(), reg.clone()); - } - uploader.emit_upload_event(UploadEvent::RegisterUpdated(reg)); - } - } - } - } - TaskResult::GetStoreCostErr { - xorname, - get_store_cost_strategy, - max_repayments_reached, - } => { - let _ = uploader.on_going_get_cost.remove(&xorname); - trace!("GetStoreCostErr for {xorname:?} , get_store_cost_strategy: {get_store_cost_strategy:?}, max_repayments_reached: {max_repayments_reached:?}"); - - // If max repayments reached, track it separately. Else retry get_store_cost. - if max_repayments_reached { - error!("Max repayments reached for {xorname:?}. Skipping upload for it"); - uploader.max_repayments_reached.insert(xorname); - uploader.all_upload_items.remove(&xorname); - } else { - // use the same strategy. The repay different payee is set only if upload fails. - uploader - .pending_to_get_store_cost - .push((xorname, get_store_cost_strategy.clone())); - } - uploader.get_store_cost_errors += 1; - if uploader.get_store_cost_errors > MAX_SEQUENTIAL_NETWORK_ERRORS { - error!("Max sequential network failures reached during GetStoreCostErr."); - return Err(UploadError::SequentialNetworkErrors); - } - } - TaskResult::MakePaymentsOk { payment_proofs } => { - let tokens_spent = payment_proofs - .values() - .map(|proof| proof.quote.cost.as_atto()) - .try_fold(Amount::from(0), |acc, cost| acc.checked_add(cost)) - .ok_or_else(|| { - error!("Overflow when summing up tokens spent"); - UploadError::InternalError - })?; - trace!( - "MakePaymentsOk for {} items, with {tokens_spent:?} tokens.", - payment_proofs.len(), - ); - for xorname in payment_proofs.keys() { - let _ = uploader.on_going_payments.remove(xorname); - } - uploader - .pending_to_upload - .extend(payment_proofs.keys().cloned()); - for (xorname, proof) in payment_proofs { - if let Some(payments) = uploader.payment_proofs.get_mut(&xorname) { - payments.push(proof) - } else { - uploader.payment_proofs.insert(xorname, vec![proof]); - } - } - // reset sequential payment fail error if ok. We throw error if payment fails continuously more than - // MAX_SEQUENTIAL_PAYMENT_FAILS errors. - uploader.make_payments_errors = 0; - uploader.tokens_spent = uploader - .tokens_spent - .checked_add(tokens_spent) - .ok_or_else(|| { - error!("Overflow when summing up tokens spent for summary."); - UploadError::InternalError - })?; - - uploader.emit_upload_event(UploadEvent::PaymentMade { tokens_spent }); - } - TaskResult::MakePaymentsErr { failed_xornames } => { - trace!("MakePaymentsErr for {:?} items", failed_xornames.len()); - // TODO: handle insufficient balance error - - for (xorname, quote) in failed_xornames { - let _ = uploader.on_going_payments.remove(&xorname); - uploader.pending_to_pay.push((xorname, quote)); - } - uploader.make_payments_errors += 1; - - if uploader.make_payments_errors >= MAX_SEQUENTIAL_PAYMENT_FAILS { - error!("Max sequential upload failures reached during MakePaymentsErr."); - // Too many sequential overall payment failure indicating - // unrecoverable failure of spend tx continuously rejected by network. - // The entire upload process shall be terminated. - return Err(UploadError::SequentialUploadPaymentError); - } - } - TaskResult::UploadOk(xorname) => { - let _ = uploader.on_going_uploads.remove(&xorname); - uploader.uploaded_count += 1; - trace!("UploadOk for {xorname:?}"); - // remove the previous payments - uploader.payment_proofs.remove(&xorname); - // remove the item since we have uploaded it. - let removed_item = uploader.all_upload_items.remove(&xorname).ok_or_else(|| { - error!("Uploadable item not found in all_upload_items: {xorname:?}"); - UploadError::InternalError - })?; - let _ = uploader.uploaded_addresses.insert(removed_item.address()); - - match removed_item { - #[cfg(feature = "data")] - UploadItem::Chunk { address, .. } => { - uploader.emit_upload_event(UploadEvent::ChunkUploaded(address)); - } - #[cfg(feature = "registers")] - UploadItem::Register { reg, .. } => { - if uploader.cfg.collect_registers { - let _ = uploader - .uploaded_registers - .insert(*reg.address(), reg.clone()); - } - uploader.emit_upload_event(UploadEvent::RegisterUploaded(reg)); - } - } - } - TaskResult::UploadErr { xorname, io_error } => { - if let Some(io_error) = io_error { - error!( - "Upload failed for {xorname:?} with error: {io_error:?}. Stopping upload." - ); - return Err(UploadError::Io(*io_error)); - } - - let _ = uploader.on_going_uploads.remove(&xorname); - debug!("UploadErr for {xorname:?}. Keeping track of failure and trying again."); - - // keep track of the failure - let n_errors = uploader.n_errors_during_uploads.entry(xorname).or_insert(0); - *n_errors += 1; - - // if quote has expired, don't retry the upload again. Instead get the cheapest quote again. - if *n_errors > UPLOAD_FAILURES_BEFORE_SELECTING_DIFFERENT_PAYEE { - // if error > threshold, then select different payee. else retry again - // Also reset n_errors as we want to enable retries for the new payee. - *n_errors = 0; - debug!("Max error during upload reached for {xorname:?}. Selecting a different payee."); - - uploader - .pending_to_get_store_cost - .push((xorname, GetStoreCostStrategy::SelectDifferentPayee)); - } else { - uploader.pending_to_upload.push(xorname); - } - } - } - } -} - -impl UploaderInterface for Uploader { - fn take_inner_uploader(&mut self) -> InnerUploader { - self.inner - .take() - .expect("Uploader::new makes sure inner is present") - } - - fn submit_get_store_cost_task( - &mut self, - client: Client, - xorname: XorName, - address: NetworkAddress, - previous_payments: Option<&Vec>, - get_store_cost_strategy: GetStoreCostStrategy, - max_repayments_for_failed_data: usize, - task_result_sender: mpsc::Sender, - ) { - trace!("Spawning get_store_cost for {xorname:?}"); - let previous_payments_to = if let Some(previous_payments) = previous_payments { - let peer_ids = previous_payments - .iter() - .map(|payment_proof| { - payment_proof - .to_peer_id_payee() - .ok_or_else(|| { - error!("Invalid payment proof found, could not obtain peer_id {payment_proof:?}"); - UploadError::InternalError - }) - }) - .collect::>>(); - peer_ids - } else { - Ok(vec![]) - }; - - let _handle = spawn(async move { - let task_result = match InnerUploader::get_store_cost( - client, - xorname, - address, - get_store_cost_strategy.clone(), - previous_payments_to, - max_repayments_for_failed_data, - ) - .await - { - Ok(quote) => { - debug!("StoreCosts retrieved for {xorname:?} quote: {quote:?}"); - TaskResult::GetStoreCostOk { - xorname, - quote: Box::new(quote), - } - } - Err(err) => { - error!("Encountered error {err:?} when getting store_cost for {xorname:?}",); - - let max_repayments_reached = - matches!(&err, UploadError::MaximumRepaymentsReached { .. }); - - TaskResult::GetStoreCostErr { - xorname, - get_store_cost_strategy, - max_repayments_reached, - } - } - }; - - let _ = task_result_sender.send(task_result).await; - }); - } - - #[cfg(feature = "registers")] - fn submit_get_register_task( - &mut self, - client: Client, - reg_addr: RegisterAddress, - task_result_sender: mpsc::Sender, - ) { - let xorname = reg_addr.xorname(); - trace!("Spawning get_register for {xorname:?}"); - let _handle = spawn(async move { - let task_result = match InnerUploader::get_register(client, reg_addr).await { - Ok(register) => { - debug!("Register retrieved for {xorname:?}"); - TaskResult::GetRegisterFromNetworkOk { - remote_register: register, - } - } - Err(err) => { - // todo match on error to only skip if GetRecordError - warn!("Encountered error {err:?} during get_register. The register has to be PUT as it is a new one."); - TaskResult::GetRegisterFromNetworkErr(xorname) - } - }; - let _ = task_result_sender.send(task_result).await; - }); - } - - #[cfg(feature = "registers")] - fn submit_push_register_task( - &mut self, - client: Client, - upload_item: UploadItem, - verify_store: bool, - task_result_sender: mpsc::Sender, - ) { - let xorname = upload_item.xorname(); - trace!("Spawning push_register for {xorname:?}"); - let _handle = spawn(async move { - let task_result = match InnerUploader::push_register(client, upload_item, verify_store) - .await - { - Ok(reg) => { - debug!("Register pushed: {xorname:?}"); - TaskResult::PushRegisterOk { - updated_register: reg, - } - } - Err(err) => { - // todo match on error to only skip if GetRecordError - error!("Encountered error {err:?} during push_register. The register might not be present in the network"); - TaskResult::PushRegisterErr(xorname) - } - }; - let _ = task_result_sender.send(task_result).await; - }); - } - - fn submit_make_payment_task( - &mut self, - to_send: Option<(UploadItem, Box)>, - make_payment_sender: mpsc::Sender)>>, - ) { - let _handle = spawn(async move { - let _ = make_payment_sender.send(to_send).await; - }); - } - - fn submit_upload_item_task( - &mut self, - upload_item: UploadItem, - client: Client, - previous_payments: Option<&Vec>, - verify_store: bool, - retry_strategy: RetryStrategy, - task_result_sender: mpsc::Sender, - ) { - trace!("Spawning upload item task for {:?}", upload_item.xorname()); - - let last_payment = previous_payments.and_then(|payments| payments.last().cloned()); - - let _handle = spawn(async move { - let xorname = upload_item.xorname(); - let result = InnerUploader::upload_item( - client, - upload_item, - last_payment, - verify_store, - retry_strategy, - ) - .await; - - trace!("Upload item {xorname:?} uploaded with result {result:?}"); - match result { - Ok(_) => { - let _ = task_result_sender.send(TaskResult::UploadOk(xorname)).await; - } - Err(UploadError::Io(io_error)) => { - let _ = task_result_sender - .send(TaskResult::UploadErr { - xorname, - io_error: Some(Box::new(io_error)), - }) - .await; - } - Err(_) => { - let _ = task_result_sender - .send(TaskResult::UploadErr { - xorname, - io_error: None, - }) - .await; - } - }; - }); - } -} - -/// `Uploader` provides functionality for uploading both Chunks and Registers with support for retries and queuing. -/// This struct is not cloneable. To create a new instance with default configuration, use the `new` function. -/// To modify the configuration, use the provided setter methods (`set_...` functions). -#[derive(custom_debug::Debug)] -pub(super) struct InnerUploader { - pub(super) cfg: UploadCfg, - #[debug(skip)] - pub(super) client: Client, - #[debug(skip)] - pub(super) wallet: EvmWallet, - - // states - pub(super) all_upload_items: HashMap, - #[cfg(feature = "registers")] - pub(super) pending_to_get_register: Vec, - #[cfg(feature = "registers")] - pub(super) pending_to_push_register: Vec, - pub(super) pending_to_get_store_cost: Vec<(XorName, GetStoreCostStrategy)>, - pub(super) pending_to_pay: Vec<(XorName, Box)>, - pub(super) pending_to_upload: Vec, - pub(super) payment_proofs: HashMap>, - - // trackers - #[cfg(feature = "registers")] - pub(super) on_going_get_register: HashSet, - #[cfg(feature = "registers")] - pub(super) on_going_push_register: HashSet, - pub(super) on_going_get_cost: HashSet, - pub(super) on_going_payments: HashSet, - pub(super) on_going_uploads: HashSet, - - // error trackers - pub(super) n_errors_during_uploads: HashMap, - #[cfg(feature = "registers")] - pub(super) push_register_errors: usize, - pub(super) get_store_cost_errors: usize, - pub(super) make_payments_errors: usize, - - // Upload summary - pub(super) tokens_spent: Amount, - pub(super) upload_final_balance: Amount, - pub(super) max_repayments_reached: HashSet, - pub(super) uploaded_addresses: HashSet, - #[cfg(feature = "registers")] - pub(super) uploaded_registers: HashMap, - pub(super) uploaded_count: usize, - pub(super) skipped_count: usize, - - // Task channels for testing. Not used in actual code. - pub(super) testing_task_channels: - Option<(mpsc::Sender, mpsc::Receiver)>, - - // Public events events - #[debug(skip)] - pub(super) logged_event_sender_absence: bool, - #[debug(skip)] - pub(super) event_sender: Option>, -} - -impl InnerUploader { - pub(super) fn new(client: Client, wallet: EvmWallet) -> Self { - Self { - cfg: Default::default(), - client, - wallet, - - all_upload_items: Default::default(), - #[cfg(feature = "registers")] - pending_to_get_register: Default::default(), - #[cfg(feature = "registers")] - pending_to_push_register: Default::default(), - pending_to_get_store_cost: Default::default(), - pending_to_pay: Default::default(), - pending_to_upload: Default::default(), - payment_proofs: Default::default(), - - #[cfg(feature = "registers")] - on_going_get_register: Default::default(), - #[cfg(feature = "registers")] - on_going_push_register: Default::default(), - on_going_get_cost: Default::default(), - on_going_payments: Default::default(), - on_going_uploads: Default::default(), - - n_errors_during_uploads: Default::default(), - #[cfg(feature = "registers")] - push_register_errors: Default::default(), - get_store_cost_errors: Default::default(), - max_repayments_reached: Default::default(), - make_payments_errors: Default::default(), - - tokens_spent: Amount::from(0), - upload_final_balance: Amount::from(0), - uploaded_addresses: Default::default(), - #[cfg(feature = "registers")] - uploaded_registers: Default::default(), - uploaded_count: Default::default(), - skipped_count: Default::default(), - - testing_task_channels: None, - logged_event_sender_absence: Default::default(), - event_sender: Default::default(), - } - } - - // ====== Pop items ====== - - #[cfg(feature = "registers")] - fn pop_item_for_push_register(&mut self) -> Result { - if let Some(name) = self.pending_to_push_register.pop() { - let upload_item = self.all_upload_items.get(&name).cloned().ok_or_else(|| { - error!("Uploadable item not found in all_upload_items: {name:?}"); - UploadError::InternalError - })?; - Ok(upload_item) - } else { - // the caller will be making sure this does not happen. - error!("No item found for push register"); - Err(UploadError::InternalError) - } - } - - fn pop_item_for_get_store_cost( - &mut self, - ) -> Result<(XorName, NetworkAddress, GetStoreCostStrategy)> { - let (xorname, strategy) = self.pending_to_get_store_cost.pop().ok_or_else(|| { - error!("No item found for get store cost"); - UploadError::InternalError - })?; - let address = self - .all_upload_items - .get(&xorname) - .map(|item| item.address()) - .ok_or_else(|| { - error!("Uploadable item not found in all_upload_items: {xorname:?}"); - UploadError::InternalError - })?; - Ok((xorname, address, strategy)) - } - - fn pop_item_for_make_payment(&mut self) -> Result<(UploadItem, Box)> { - if let Some((name, quote)) = self.pending_to_pay.pop() { - let upload_item = self.all_upload_items.get(&name).cloned().ok_or_else(|| { - error!("Uploadable item not found in all_upload_items: {name:?}"); - UploadError::InternalError - })?; - Ok((upload_item, quote)) - } else { - // the caller will be making sure this does not happen. - error!("No item found for make payment"); - Err(UploadError::InternalError) - } - } - - fn pop_item_for_upload_item(&mut self) -> Result { - if let Some(name) = self.pending_to_upload.pop() { - let upload_item = self.all_upload_items.get(&name).cloned().ok_or_else(|| { - error!("Uploadable item not found in all_upload_items: {name:?}"); - UploadError::InternalError - })?; - Ok(upload_item) - } else { - // the caller will be making sure this does not happen. - error!("No item found for upload item"); - Err(UploadError::InternalError) - } - } - - // ====== Processing Loop ====== - - // This is spawned as a long running task to prevent us from reading the wallet files - // each time we have to make a payment. - fn start_payment_processing_thread( - &self, - mut make_payment_receiver: mpsc::Receiver)>>, - task_result_sender: mpsc::Sender, - payment_batch_size: usize, - ) -> Result<()> { - let wallet = self.wallet.clone(); - - let _handle = spawn(async move { - debug!("Spawning the long running make payment processing loop."); - - let mut to_be_paid_list = Vec::new(); - let mut cost_map = HashMap::new(); - - let mut got_a_previous_force_payment = false; - while let Some(payment) = mpsc_recv(&mut make_payment_receiver).await { - let make_payments = if let Some((item, quote)) = payment { - to_be_paid_list.push(( - quote.2.hash(), - quote.2.rewards_address, - quote.2.cost.as_atto(), - )); - let xorname = item.xorname(); - debug!("Inserted {xorname:?} into to_be_paid_list"); - - let _ = cost_map.insert(xorname, (quote.0, quote.1, quote.2)); - cost_map.len() >= payment_batch_size || got_a_previous_force_payment - } else { - // using None to indicate as all paid. - let make_payments = !cost_map.is_empty(); - debug!("Got a forced forced round of make payment."); - // Note: There can be a mismatch of ordering between the main loop and the make payment loop because - // the instructions are sent via a task(channel.send().await). And there is no guarantee for the - // order to come in the same order as they were sent. - // - // We cannot just disobey the instruction inside the child loop, as the mainloop would be expecting - // a result back for a particular instruction. - if !make_payments { - got_a_previous_force_payment = true; - warn!( - "We were told to force make payment, but cost_map is empty, so we can't do that just yet. Waiting for a task to insert a quote into cost_map" - ) - } - - make_payments - }; - - if make_payments { - // reset force_make_payment - if got_a_previous_force_payment { - info!("A task inserted a quote into cost_map, so we can now make a forced round of payment!"); - got_a_previous_force_payment = false; - } - - let terminate_process = false; - let data_payments = std::mem::take(&mut to_be_paid_list); - - let result = match wallet.pay_for_quotes(data_payments).await { - Ok(payments) => { - trace!("Made payments for {} records.", payments.len()); - - let payment_proofs = - payment_proof_from_quotes_and_payments(&cost_map, &payments); - - TaskResult::MakePaymentsOk { payment_proofs } - } - Err(err) => { - let error = err.0; - let _succeeded_batch = err.1; - - error!("When paying {} data, got error {error:?}", cost_map.len(),); - // TODO: match on insufficient gas/token error. and set terminate_process = true - TaskResult::MakePaymentsErr { - failed_xornames: cost_map - .into_iter() - .map(|(k, v)| (k, Box::new(v))) - .collect(), - } - } - }; - let result_sender = task_result_sender.clone(); - let _handle = spawn(async move { - let _ = result_sender.send(result).await; - }); - - cost_map = HashMap::new(); - - if terminate_process { - // The error will trigger the entire upload process to be terminated. - // Hence here we shall terminate the inner loop first, - // to avoid the wallet going further to be potentially got corrupted. - warn!( - "Terminating make payment processing loop due to un-recoverable error." - ); - break; - } - } - } - debug!("Make payment processing loop terminated."); - }); - Ok(()) - } - - // ====== Logic ====== - - #[cfg(feature = "registers")] - async fn get_register(client: Client, reg_addr: RegisterAddress) -> Result { - let reg = client.register_get(reg_addr).await?; - Ok(reg) - } - - #[cfg(feature = "registers")] - async fn push_register( - client: Client, - upload_item: UploadItem, - verify_store: bool, - ) -> Result { - let register = if let UploadItem::Register { reg, .. } = upload_item { - reg - } else { - error!("Invalid upload item found: {upload_item:?}"); - return Err(UploadError::InternalError); - }; - - let verification = if verify_store { - let get_cfg = GetRecordCfg { - get_quorum: Quorum::Majority, - retry_strategy: Some(RetryStrategy::default()), - target_record: None, - expected_holders: Default::default(), - is_register: true, - }; - Some((VerificationKind::Network, get_cfg)) - } else { - None - }; - - let put_cfg = PutRecordCfg { - put_quorum: Quorum::All, - retry_strategy: None, - use_put_record_to: None, - verification, - }; - - client.register_upload(®ister, None, &put_cfg).await?; - - Ok(register) - } - - async fn get_store_cost( - client: Client, - xorname: XorName, - address: NetworkAddress, - get_store_cost_strategy: GetStoreCostStrategy, - previous_payments_to: Result>, - max_repayments_for_failed_data: usize, - ) -> Result { - let filter_list = match get_store_cost_strategy { - GetStoreCostStrategy::Cheapest => vec![], - GetStoreCostStrategy::SelectDifferentPayee => { - let filter_list = previous_payments_to?; - - // if we have already made initial + max_repayments, then we should error out. - if Self::have_we_reached_max_repayments( - filter_list.len(), - max_repayments_for_failed_data, - ) { - // error is used by the caller. - return Err(UploadError::MaximumRepaymentsReached { - items: vec![xorname], - }); - } - - debug!("Filtering out payments from {filter_list:?} during get_store_cost for {xorname:?}"); - filter_list - } - }; - let quote = client - .network - .get_store_costs_from_network(address, filter_list) - .await?; - Ok(quote) - } - - async fn upload_item( - client: Client, - upload_item: UploadItem, - previous_payments: Option, - verify_store: bool, - retry_strategy: RetryStrategy, - ) -> Result<()> { - let xorname = upload_item.xorname(); - - let payment_proof = previous_payments.ok_or_else(|| { - error!("No payment proof found for {xorname:?}"); - UploadError::InternalError - })?; - let payee = payment_proof.to_peer_id_payee().ok_or_else(|| { - error!("Invalid payment proof found, could not obtain peer_id {payment_proof:?}"); - UploadError::InternalError - })?; - - debug!("Payments for upload item: {xorname:?} to {payee:?}: {payment_proof:?}"); - - match upload_item { - #[cfg(feature = "data")] - UploadItem::Chunk { address: _, chunk } => { - let chunk = match chunk { - Either::Left(chunk) => chunk, - Either::Right(path) => { - let bytes = std::fs::read(&path).inspect_err(|err| { - error!("Error reading chunk at {path:?}: {err:?}"); - })?; - Chunk::new(Bytes::from(bytes)) - } - }; - - let verification = if verify_store { - let verification_cfg = GetRecordCfg { - get_quorum: Quorum::N(NonZero::new(2).expect("2 is non-zero")), - retry_strategy: Some(retry_strategy), - target_record: None, - expected_holders: Default::default(), - is_register: false, - }; - - let random_nonce = thread_rng().gen::(); - let expected_proof = - ChunkProof::from_chunk(&chunk, random_nonce).map_err(|err| { - error!("Failed to create chunk proof: {err:?}"); - UploadError::Serialization(format!( - "Failed to create chunk proof for {xorname:?}" - )) - })?; - - Some(( - VerificationKind::ChunkProof { - expected_proof, - nonce: random_nonce, - }, - verification_cfg, - )) - } else { - None - }; - - let put_cfg = PutRecordCfg { - put_quorum: Quorum::One, - retry_strategy: Some(retry_strategy), - use_put_record_to: Some(vec![payee]), - verification, - }; - - debug!("Client upload started for chunk: {xorname:?}"); - client - .chunk_upload_with_payment(chunk, payment_proof, Some(put_cfg)) - .await?; - - debug!("Client upload completed for chunk: {xorname:?}"); - } - #[cfg(feature = "registers")] - UploadItem::Register { address: _, reg } => { - debug!("Client upload started for register: {xorname:?}"); - let verification = if verify_store { - let get_cfg = GetRecordCfg { - get_quorum: Quorum::Majority, - retry_strategy: Some(retry_strategy), - target_record: None, - expected_holders: Default::default(), - is_register: true, - }; - Some((VerificationKind::Network, get_cfg)) - } else { - None - }; - - let put_cfg = PutRecordCfg { - put_quorum: Quorum::All, - retry_strategy: Some(retry_strategy), - use_put_record_to: Some(vec![payee]), - verification, - }; - client - .register_upload(®, Some(&payment_proof), &put_cfg) - .await?; - debug!("Client upload completed for register: {xorname:?}"); - } - } - - Ok(()) - } - - // ====== Misc ====== - - fn emit_upload_event(&mut self, event: UploadEvent) { - if let Some(sender) = self.event_sender.as_ref() { - let sender_clone = sender.clone(); - let _handle = spawn(async move { - if let Err(err) = sender_clone.send(event).await { - error!("Error emitting upload event: {err:?}"); - } - }); - } else if !self.logged_event_sender_absence { - info!("FilesUpload upload event sender is not set. Use get_upload_events() if you need to keep track of the progress"); - self.logged_event_sender_absence = true; - } - } - - /// If we have already made initial + max_repayments_allowed, then we should error out. - // separate function as it is used in test. - pub(super) fn have_we_reached_max_repayments( - payments_made: usize, - max_repayments_allowed: usize, - ) -> bool { - // if max_repayments_allowed = 1, then we have reached capacity = true if 2 payments have been made. i.e., - // i.e., 1 initial + 1 repayment. - payments_made > max_repayments_allowed - } - - fn validate_upload_cfg(&self) -> Result<()> { - if self.cfg.payment_batch_size > PAYMENT_BATCH_SIZE { - error!("Payment batch size is greater than the maximum allowed: {PAYMENT_BATCH_SIZE}"); - return Err(UploadError::InvalidCfg(format!( - "Payment batch size is greater than the maximum allowed: {PAYMENT_BATCH_SIZE}" - ))); - } - if self.cfg.payment_batch_size < 1 { - error!("Payment batch size cannot be less than 1"); - return Err(UploadError::InvalidCfg( - "Payment batch size cannot be less than 1".to_string(), - )); - } - if self.cfg.batch_size < 1 { - error!("Batch size cannot be less than 1"); - return Err(UploadError::InvalidCfg( - "Batch size cannot be less than 1".to_string(), - )); - } - - Ok(()) - } -} diff --git a/autonomi/src/utils.rs b/autonomi/src/utils.rs index a7273f9bae..fc9ceb7718 100644 --- a/autonomi/src/utils.rs +++ b/autonomi/src/utils.rs @@ -1,15 +1,14 @@ -use sn_evm::{ProofOfPayment, QuoteHash, TxHash}; -use sn_networking::PayeeQuote; +use sn_evm::{PaymentQuote, ProofOfPayment, QuoteHash, TxHash}; use std::collections::{BTreeMap, HashMap}; use xor_name::XorName; pub fn payment_proof_from_quotes_and_payments( - quotes: &HashMap, + quotes: &HashMap, payments: &BTreeMap, ) -> HashMap { quotes .iter() - .filter_map(|(xor_name, (_, _, quote))| { + .filter_map(|(xor_name, quote)| { payments.get("e.hash()).map(|tx_hash| { ( *xor_name, diff --git a/autonomi/tests/fs.rs b/autonomi/tests/fs.rs index 70787dee0f..b952852bc2 100644 --- a/autonomi/tests/fs.rs +++ b/autonomi/tests/fs.rs @@ -97,7 +97,7 @@ async fn file_into_vault() -> Result<()> { .await?; // now assert over the stored account packet - let new_client = Client::connect(&peers_from_env()?).await?; + let new_client = Client::connect(&[]).await?; let (ap, got_version) = new_client.fetch_and_decrypt_vault(&client_sk).await?; assert_eq!(set_version, got_version); diff --git a/sn_evm/src/lib.rs b/sn_evm/src/lib.rs index a68fe4a01e..49956db39e 100644 --- a/sn_evm/src/lib.rs +++ b/sn_evm/src/lib.rs @@ -13,7 +13,6 @@ pub use evmlib::common::Address as RewardsAddress; pub use evmlib::common::Address as EvmAddress; pub use evmlib::common::QuotePayment; pub use evmlib::common::{QuoteHash, TxHash}; -pub use evmlib::contract::network_token::Error as EvmNetworkTokenError; pub use evmlib::cryptography; #[cfg(feature = "external-signer")] pub use evmlib::external_signer; diff --git a/sn_networking/Cargo.toml b/sn_networking/Cargo.toml index 9c76065bf0..4f2270ff37 100644 --- a/sn_networking/Cargo.toml +++ b/sn_networking/Cargo.toml @@ -94,7 +94,6 @@ workspace = true crate-type = ["cdylib", "rlib"] [target.'cfg(target_arch = "wasm32")'.dependencies] -async-channel = "2.3.1" getrandom = { version = "0.2.12", features = ["js"] } libp2p = { version = "0.54.1", features = [ "tokio", diff --git a/sn_networking/src/target_arch.rs b/sn_networking/src/target_arch.rs index b53ce472c5..35a1b62092 100644 --- a/sn_networking/src/target_arch.rs +++ b/sn_networking/src/target_arch.rs @@ -19,33 +19,12 @@ pub use tokio::{ #[cfg(target_arch = "wasm32")] pub use std::time::Duration; -#[cfg(target_arch = "wasm32")] -pub use wasm_bindgen_futures::spawn_local as spawn; + #[cfg(target_arch = "wasm32")] pub use wasmtimer::{ std::{Instant, SystemTime, UNIX_EPOCH}, tokio::{interval, sleep, timeout, Interval}, }; -/// === Channels ==== - -#[cfg(not(target_arch = "wasm32"))] -pub use tokio::sync::mpsc; -#[cfg(not(target_arch = "wasm32"))] -pub use tokio::sync::mpsc::channel as mpsc_channel; - -#[cfg(not(target_arch = "wasm32"))] -pub async fn mpsc_recv(mpsc: &mut mpsc::Receiver) -> Option { - mpsc.recv().await -} - -// futures crate has different function signatures than tokio, so instead we use async_channel here. -#[cfg(target_arch = "wasm32")] -pub use async_channel as mpsc; #[cfg(target_arch = "wasm32")] -pub use async_channel::bounded as mpsc_channel; - -#[cfg(target_arch = "wasm32")] -pub async fn mpsc_recv(mpsc: &mut mpsc::Receiver) -> Option { - mpsc.recv().await.ok() -} +pub use wasm_bindgen_futures::spawn_local as spawn; diff --git a/sn_protocol/src/messages/chunk_proof.rs b/sn_protocol/src/messages/chunk_proof.rs index 4fa3900d1f..145aae00de 100644 --- a/sn_protocol/src/messages/chunk_proof.rs +++ b/sn_protocol/src/messages/chunk_proof.rs @@ -6,8 +6,6 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::storage::{try_serialize_record, Chunk, RecordKind}; -use crate::Error; use serde::{Deserialize, Serialize}; use std::fmt; @@ -26,13 +24,6 @@ impl ChunkProof { ChunkProof(hash) } - pub fn from_chunk(chunk: &Chunk, nonce: Nonce) -> Result { - let stored_on_node = try_serialize_record(chunk, RecordKind::Chunk)?.to_vec(); - let proof = ChunkProof::new(&stored_on_node, nonce); - - Ok(proof) - } - pub fn verify(&self, other_proof: &ChunkProof) -> bool { self.0 == other_proof.0 }