From abada20949bc25e0e9c63258f89ab02a9e817241 Mon Sep 17 00:00:00 2001 From: qima Date: Tue, 7 Jan 2025 21:11:49 +0800 Subject: [PATCH 1/8] chore: rename RecordType to ValidationType --- ant-networking/src/cmd.rs | 12 ++-- ant-networking/src/event/request_response.rs | 4 +- ant-networking/src/lib.rs | 6 +- ant-networking/src/record_store.rs | 56 +++++++++---------- ant-networking/src/record_store_api.rs | 12 ++-- ant-networking/src/replication_fetcher.rs | 29 +++++----- ant-node/src/node.rs | 6 +- ant-node/src/put_validation.rs | 39 +++++++------ ant-node/src/replication.rs | 4 +- ant-protocol/src/messages/cmd.rs | 4 +- ant-protocol/src/storage/header.rs | 2 +- ant-protocol/src/storage/mod.rs | 4 +- .../api/ant-node/README.md | 6 +- .../api/ant-node/network.md | 4 +- 14 files changed, 100 insertions(+), 88 deletions(-) diff --git a/ant-networking/src/cmd.rs b/ant-networking/src/cmd.rs index 17e4e3cfc9..c5191cda41 100644 --- a/ant-networking/src/cmd.rs +++ b/ant-networking/src/cmd.rs @@ -17,7 +17,7 @@ use ant_evm::{PaymentQuote, QuotingMetrics, U256}; use ant_protocol::{ convert_distance_to_u256, messages::{Cmd, Request, Response}, - storage::{RecordHeader, RecordKind, RecordType}, + storage::{RecordHeader, RecordKind, ValidationType}, NetworkAddress, PrettyPrintRecordKey, }; use libp2p::{ @@ -92,7 +92,7 @@ pub enum LocalSwarmCmd { }, /// Get the Addresses of all the Records held locally GetAllLocalRecordAddresses { - sender: oneshot::Sender>, + sender: oneshot::Sender>, }, /// Get data from the local RecordStore GetLocalRecord { @@ -120,7 +120,7 @@ pub enum LocalSwarmCmd { /// This should be done after the record has been stored to disk AddLocalRecordAsStored { key: RecordKey, - record_type: RecordType, + record_type: ValidationType, }, /// Add a peer to the blocklist AddPeerToBlockList { @@ -141,7 +141,7 @@ pub enum LocalSwarmCmd { quotes: Vec<(PeerId, PaymentQuote)>, }, // Notify a fetch completion - FetchCompleted((RecordKey, RecordType)), + FetchCompleted((RecordKey, ValidationType)), /// Triggers interval repliation /// NOTE: This does result in outgoing messages, but is produced locally TriggerIntervalReplication, @@ -661,13 +661,13 @@ impl SwarmDriver { let record_type = match RecordHeader::from_record(&record) { Ok(record_header) => { match record_header.kind { - RecordKind::Chunk => RecordType::Chunk, + RecordKind::Chunk => ValidationType::Chunk, RecordKind::GraphEntry | RecordKind::Pointer | RecordKind::Register | RecordKind::Scratchpad => { let content_hash = XorName::from_content(&record.value); - RecordType::NonChunk(content_hash) + ValidationType::NonChunk(content_hash) } RecordKind::ChunkWithPayment | RecordKind::RegisterWithPayment diff --git a/ant-networking/src/event/request_response.rs b/ant-networking/src/event/request_response.rs index ce6755e8dc..4d8de2131f 100644 --- a/ant-networking/src/event/request_response.rs +++ b/ant-networking/src/event/request_response.rs @@ -12,7 +12,7 @@ use crate::{ }; use ant_protocol::{ messages::{CmdResponse, Request, Response}, - storage::RecordType, + storage::ValidationType, NetworkAddress, }; use libp2p::request_response::{self, Message}; @@ -159,7 +159,7 @@ impl SwarmDriver { fn add_keys_to_replication_fetcher( &mut self, sender: NetworkAddress, - incoming_keys: Vec<(NetworkAddress, RecordType)>, + incoming_keys: Vec<(NetworkAddress, ValidationType)>, ) { let holder = if let Some(peer_id) = sender.as_peer_id() { peer_id diff --git a/ant-networking/src/lib.rs b/ant-networking/src/lib.rs index dda9e1d8d3..cb4f761655 100644 --- a/ant-networking/src/lib.rs +++ b/ant-networking/src/lib.rs @@ -52,7 +52,7 @@ use ant_evm::{PaymentQuote, QuotingMetrics}; use ant_protocol::{ error::Error as ProtocolError, messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response}, - storage::{Pointer, RecordType, RetryStrategy, Scratchpad}, + storage::{Pointer, RetryStrategy, Scratchpad, ValidationType}, NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use futures::future::select_all; @@ -964,7 +964,7 @@ impl Network { /// Notify ReplicationFetch a fetch attempt is completed. /// (but it won't trigger any real writes to disk, say fetched an old version of register) - pub fn notify_fetch_completed(&self, key: RecordKey, record_type: RecordType) { + pub fn notify_fetch_completed(&self, key: RecordKey, record_type: ValidationType) { self.send_local_swarm_cmd(LocalSwarmCmd::FetchCompleted((key, record_type))) } @@ -995,7 +995,7 @@ impl Network { /// Returns the Addresses of all the locally stored Records pub async fn get_all_local_record_addresses( &self, - ) -> Result> { + ) -> Result> { let (sender, receiver) = oneshot::channel(); self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender }); diff --git a/ant-networking/src/record_store.rs b/ant-networking/src/record_store.rs index cabdb6611c..e9e1d2886c 100644 --- a/ant-networking/src/record_store.rs +++ b/ant-networking/src/record_store.rs @@ -19,7 +19,7 @@ use aes_gcm_siv::{ use ant_evm::{QuotingMetrics, U256}; use ant_protocol::{ convert_distance_to_u256, - storage::{RecordHeader, RecordKind, RecordType}, + storage::{RecordHeader, RecordKind, ValidationType}, NetworkAddress, PrettyPrintRecordKey, }; use hkdf::Hkdf; @@ -138,7 +138,7 @@ pub struct NodeRecordStore { /// The configuration of the store. config: NodeRecordStoreConfig, /// Main records store remains unchanged for compatibility - records: HashMap, + records: HashMap, /// Additional index organizing records by distance records_by_distance: BTreeMap, /// FIFO simple cache of records to reduce read times @@ -218,7 +218,7 @@ impl NodeRecordStore { fn update_records_from_an_existing_store( config: &NodeRecordStoreConfig, encryption_details: &(Aes256GcmSiv, [u8; 4]), - ) -> HashMap { + ) -> HashMap { let process_entry = |entry: &DirEntry| -> _ { let path = entry.path(); if path.is_file() { @@ -270,10 +270,10 @@ impl NodeRecordStore { }; let record_type = match RecordHeader::is_record_of_type_chunk(&record) { - Ok(true) => RecordType::Chunk, + Ok(true) => ValidationType::Chunk, Ok(false) => { let xorname_hash = XorName::from_content(&record.value); - RecordType::NonChunk(xorname_hash) + ValidationType::NonChunk(xorname_hash) } Err(error) => { warn!( @@ -585,7 +585,7 @@ impl NodeRecordStore { /// Returns the set of `NetworkAddress::RecordKey` held by the store /// Use `record_addresses_ref` to get a borrowed type - pub(crate) fn record_addresses(&self) -> HashMap { + pub(crate) fn record_addresses(&self) -> HashMap { self.records .iter() .map(|(_record_key, (addr, record_type))| (addr.clone(), record_type.clone())) @@ -593,14 +593,14 @@ impl NodeRecordStore { } /// Returns the reference to the set of `NetworkAddress::RecordKey` held by the store - pub(crate) fn record_addresses_ref(&self) -> &HashMap { + pub(crate) fn record_addresses_ref(&self) -> &HashMap { &self.records } /// The follow up to `put_verified`, this only registers the RecordKey /// in the RecordStore records set. After this it should be safe /// to return the record as stored. - pub(crate) fn mark_as_stored(&mut self, key: Key, record_type: RecordType) { + pub(crate) fn mark_as_stored(&mut self, key: Key, record_type: ValidationType) { let addr = NetworkAddress::from_record_key(&key); let distance = self.local_address.distance(&addr); let distance_u256 = convert_distance_to_u256(&distance); @@ -648,7 +648,7 @@ impl NodeRecordStore { /// /// The record is marked as written to disk once `mark_as_stored` is called, /// this avoids us returning half-written data or registering it as stored before it is. - pub(crate) fn put_verified(&mut self, r: Record, record_type: RecordType) -> Result<()> { + pub(crate) fn put_verified(&mut self, r: Record, record_type: ValidationType) -> Result<()> { let key = &r.key; let record_key = PrettyPrintRecordKey::from(&r.key).into_owned(); debug!("PUTting a verified Record: {record_key:?}"); @@ -838,11 +838,11 @@ impl RecordStore for NodeRecordStore { // otherwise shall be passed further to allow different version of nonchunk // to be detected or updated. match self.records.get(&record.key) { - Some((_addr, RecordType::Chunk)) => { + Some((_addr, ValidationType::Chunk)) => { debug!("Chunk {record_key:?} already exists."); return Ok(()); } - Some((_addr, RecordType::NonChunk(existing_content_hash))) => { + Some((_addr, ValidationType::NonChunk(existing_content_hash))) => { let content_hash = XorName::from_content(&record.value); if content_hash == *existing_content_hash { debug!("A non-chunk record {record_key:?} with same content_hash {content_hash:?} already exists."); @@ -938,7 +938,7 @@ impl RecordStore for NodeRecordStore { /// A place holder RecordStore impl for the client that does nothing #[derive(Default, Debug)] pub struct ClientRecordStore { - empty_record_addresses: HashMap, + empty_record_addresses: HashMap, } impl ClientRecordStore { @@ -946,19 +946,19 @@ impl ClientRecordStore { false } - pub(crate) fn record_addresses(&self) -> HashMap { + pub(crate) fn record_addresses(&self) -> HashMap { HashMap::new() } - pub(crate) fn record_addresses_ref(&self) -> &HashMap { + pub(crate) fn record_addresses_ref(&self) -> &HashMap { &self.empty_record_addresses } - pub(crate) fn put_verified(&mut self, _r: Record, _record_type: RecordType) -> Result<()> { + pub(crate) fn put_verified(&mut self, _r: Record, _record_type: ValidationType) -> Result<()> { Ok(()) } - pub(crate) fn mark_as_stored(&mut self, _r: Key, _t: RecordType) {} + pub(crate) fn mark_as_stored(&mut self, _r: Key, _t: ValidationType) {} } impl RecordStore for ClientRecordStore { @@ -1093,12 +1093,12 @@ mod tests { let returned_record_key = returned_record.key.clone(); assert!(store - .put_verified(returned_record, RecordType::Chunk) + .put_verified(returned_record, ValidationType::Chunk) .is_ok()); // We must also mark the record as stored (which would be triggered after the async write in nodes // via NetworkEvent::CompletedWrite) - store.mark_as_stored(returned_record_key, RecordType::Chunk); + store.mark_as_stored(returned_record_key, ValidationType::Chunk); // loop over store.get max_iterations times to ensure async disk write had time to complete. let max_iterations = 10; @@ -1169,7 +1169,7 @@ mod tests { // Store the chunk using put_verified assert!(store - .put_verified(record.clone(), RecordType::Chunk) + .put_verified(record.clone(), ValidationType::Chunk) .is_ok()); // Wait for the async write operation to complete @@ -1270,11 +1270,11 @@ mod tests { // Store the chunk using put_verified assert!(store - .put_verified(record.clone(), RecordType::Chunk) + .put_verified(record.clone(), ValidationType::Chunk) .is_ok()); // Mark as stored (simulating the CompletedWrite event) - store.mark_as_stored(record.key.clone(), RecordType::Chunk); + store.mark_as_stored(record.key.clone(), ValidationType::Chunk); // Verify the chunk is stored let stored_record = store.get(&record.key); @@ -1343,14 +1343,14 @@ mod tests { assert!(store .put_verified( record.clone(), - RecordType::NonChunk(XorName::from_content(&record.value)) + ValidationType::NonChunk(XorName::from_content(&record.value)) ) .is_ok()); // Mark as stored (simulating the CompletedWrite event) store.mark_as_stored( record.key.clone(), - RecordType::NonChunk(XorName::from_content(&record.value)), + ValidationType::NonChunk(XorName::from_content(&record.value)), ); // Verify the scratchpad is stored @@ -1437,7 +1437,7 @@ mod tests { }; // Will be stored anyway. - let succeeded = store.put_verified(record, RecordType::Chunk).is_ok(); + let succeeded = store.put_verified(record, ValidationType::Chunk).is_ok(); if !succeeded { failed_records.push(record_key.clone()); @@ -1445,7 +1445,7 @@ mod tests { } else { // We must also mark the record as stored (which would be triggered // after the async write in nodes via NetworkEvent::CompletedWrite) - store.mark_as_stored(record_key.clone(), RecordType::Chunk); + store.mark_as_stored(record_key.clone(), ValidationType::Chunk); println!("success sotred len: {:?} ", store.record_addresses().len()); stored_records_at_some_point.push(record_key.clone()); @@ -1499,7 +1499,7 @@ mod tests { // now for any stored data. It either shoudl still be stored OR further away than `most_distant_data` for data in stored_records_at_some_point { let data_addr = NetworkAddress::from_record_key(&data); - if !sorted_stored_data.contains(&(&data_addr, &RecordType::Chunk)) { + if !sorted_stored_data.contains(&(&data_addr, &ValidationType::Chunk)) { assert!( self_address.distance(&data_addr) > self_address.distance(most_distant_data), @@ -1558,10 +1558,10 @@ mod tests { publisher: None, expires: None, }; - assert!(store.put_verified(record, RecordType::Chunk).is_ok()); + assert!(store.put_verified(record, ValidationType::Chunk).is_ok()); // We must also mark the record as stored (which would be triggered after the async write in nodes // via NetworkEvent::CompletedWrite) - store.mark_as_stored(record_key.clone(), RecordType::Chunk); + store.mark_as_stored(record_key.clone(), ValidationType::Chunk); stored_records.push(record_key.clone()); stored_records.sort_by(|a, b| { diff --git a/ant-networking/src/record_store_api.rs b/ant-networking/src/record_store_api.rs index 0955d5499f..7db4f38e54 100644 --- a/ant-networking/src/record_store_api.rs +++ b/ant-networking/src/record_store_api.rs @@ -9,7 +9,7 @@ use crate::record_store::{ClientRecordStore, NodeRecordStore}; use ant_evm::{QuotingMetrics, U256}; -use ant_protocol::{storage::RecordType, NetworkAddress}; +use ant_protocol::{storage::ValidationType, NetworkAddress}; use libp2p::kad::{ store::{RecordStore, Result}, ProviderRecord, Record, RecordKey, @@ -90,21 +90,23 @@ impl UnifiedRecordStore { } } - pub(crate) fn record_addresses(&self) -> HashMap { + pub(crate) fn record_addresses(&self) -> HashMap { match self { Self::Client(store) => store.record_addresses(), Self::Node(store) => store.record_addresses(), } } - pub(crate) fn record_addresses_ref(&self) -> &HashMap { + pub(crate) fn record_addresses_ref( + &self, + ) -> &HashMap { match self { Self::Client(store) => store.record_addresses_ref(), Self::Node(store) => store.record_addresses_ref(), } } - pub(crate) fn put_verified(&mut self, r: Record, record_type: RecordType) -> Result<()> { + pub(crate) fn put_verified(&mut self, r: Record, record_type: ValidationType) -> Result<()> { match self { Self::Client(store) => store.put_verified(r, record_type), Self::Node(store) => store.put_verified(r, record_type), @@ -168,7 +170,7 @@ impl UnifiedRecordStore { /// Mark the record as stored in the store. /// This adds it to records set, so it can now be retrieved /// (to be done after writes are finalised) - pub(crate) fn mark_as_stored(&mut self, k: RecordKey, record_type: RecordType) { + pub(crate) fn mark_as_stored(&mut self, k: RecordKey, record_type: ValidationType) { match self { Self::Client(store) => store.mark_as_stored(k, record_type), Self::Node(store) => store.mark_as_stored(k, record_type), diff --git a/ant-networking/src/replication_fetcher.rs b/ant-networking/src/replication_fetcher.rs index 360e2fbe6b..a4a16abe41 100644 --- a/ant-networking/src/replication_fetcher.rs +++ b/ant-networking/src/replication_fetcher.rs @@ -11,7 +11,7 @@ use crate::time::spawn; use crate::{event::NetworkEvent, time::Instant}; use ant_evm::U256; use ant_protocol::{ - convert_distance_to_u256, storage::RecordType, NetworkAddress, PrettyPrintRecordKey, + convert_distance_to_u256, storage::ValidationType, NetworkAddress, PrettyPrintRecordKey, }; use libp2p::{ kad::{KBucketDistance as Distance, RecordKey, K_VALUE}, @@ -40,9 +40,9 @@ type ReplicationTimeout = Instant; pub(crate) struct ReplicationFetcher { self_peer_id: PeerId, // Pending entries that to be fetched from the target peer. - to_be_fetched: HashMap<(RecordKey, RecordType, PeerId), ReplicationTimeout>, + to_be_fetched: HashMap<(RecordKey, ValidationType, PeerId), ReplicationTimeout>, // Avoid fetching same chunk from different nodes AND carry out too many parallel tasks. - on_going_fetches: HashMap<(RecordKey, RecordType), (PeerId, ReplicationTimeout)>, + on_going_fetches: HashMap<(RecordKey, ValidationType), (PeerId, ReplicationTimeout)>, event_sender: mpsc::Sender, /// Distance range that the incoming key shall be fetched distance_range: Option, @@ -77,8 +77,8 @@ impl ReplicationFetcher { pub(crate) fn add_keys( &mut self, holder: PeerId, - incoming_keys: Vec<(NetworkAddress, RecordType)>, - locally_stored_keys: &HashMap, + incoming_keys: Vec<(NetworkAddress, ValidationType)>, + locally_stored_keys: &HashMap, ) -> Vec<(PeerId, RecordKey)> { // Pre-calculate self_address since it's used multiple times let self_address = NetworkAddress::from_peer(self.self_peer_id); @@ -207,7 +207,7 @@ impl ReplicationFetcher { pub(crate) fn notify_about_new_put( &mut self, new_put: RecordKey, - record_type: RecordType, + record_type: ValidationType, ) -> Vec<(PeerId, RecordKey)> { self.to_be_fetched .retain(|(key, t, _), _| key != &new_put || t != &record_type); @@ -222,7 +222,7 @@ impl ReplicationFetcher { pub(crate) fn notify_fetch_early_completed( &mut self, key_in: RecordKey, - record_type: RecordType, + record_type: ValidationType, ) -> Vec<(PeerId, RecordKey)> { self.to_be_fetched.retain(|(key, current_type, _), _| { if current_type == &record_type { @@ -368,7 +368,7 @@ impl ReplicationFetcher { /// This checks the hash on transactions to ensure we pull in divergent transactions. fn remove_stored_keys( &mut self, - existing_keys: &HashMap, + existing_keys: &HashMap, ) { self.to_be_fetched.retain(|(key, t, _), _| { if let Some((_addr, record_type)) = existing_keys.get(key) { @@ -412,7 +412,7 @@ impl ReplicationFetcher { #[cfg(test)] mod tests { use super::{ReplicationFetcher, FETCH_TIMEOUT, MAX_PARALLEL_FETCH}; - use ant_protocol::{convert_distance_to_u256, storage::RecordType, NetworkAddress}; + use ant_protocol::{convert_distance_to_u256, storage::ValidationType, NetworkAddress}; use eyre::Result; use libp2p::{kad::RecordKey, PeerId}; use std::{collections::HashMap, time::Duration}; @@ -430,7 +430,7 @@ mod tests { (0..MAX_PARALLEL_FETCH * 2).for_each(|_| { let random_data: Vec = (0..50).map(|_| rand::random::()).collect(); let key = NetworkAddress::from_record_key(&RecordKey::from(random_data)); - incoming_keys.push((key, RecordType::Chunk)); + incoming_keys.push((key, ValidationType::Chunk)); }); let keys_to_fetch = @@ -444,7 +444,10 @@ mod tests { let key_2 = NetworkAddress::from_record_key(&RecordKey::from(random_data)); let keys_to_fetch = replication_fetcher.add_keys( PeerId::random(), - vec![(key_1, RecordType::Chunk), (key_2, RecordType::Chunk)], + vec![ + (key_1, ValidationType::Chunk), + (key_2, ValidationType::Chunk), + ], &locally_stored_keys, ); assert!(keys_to_fetch.is_empty()); @@ -454,7 +457,7 @@ mod tests { let key = NetworkAddress::from_record_key(&RecordKey::from(random_data)); let keys_to_fetch = replication_fetcher.add_keys( PeerId::random(), - vec![(key, RecordType::Chunk)], + vec![(key, ValidationType::Chunk)], &locally_stored_keys, ); assert!(!keys_to_fetch.is_empty()); @@ -496,7 +499,7 @@ mod tests { in_range_keys += 1; } - incoming_keys.push((key, RecordType::Chunk)); + incoming_keys.push((key, ValidationType::Chunk)); }); let keys_to_fetch = diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index 3877a31a18..97bdd937c7 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -23,7 +23,7 @@ use ant_protocol::{ convert_distance_to_u256, error::Error as ProtocolError, messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response}, - storage::RecordType, + storage::ValidationType, NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use bytes::Bytes; @@ -811,7 +811,7 @@ impl Node { all_local_records .iter() .filter_map(|(addr, record_type)| { - if *record_type == RecordType::Chunk { + if *record_type == ValidationType::Chunk { Some(addr.clone()) } else { None @@ -876,7 +876,7 @@ impl Node { all_keys .iter() .filter_map(|(addr, record_type)| { - if RecordType::Chunk == *record_type { + if ValidationType::Chunk == *record_type { Some(addr.clone()) } else { None diff --git a/ant-node/src/put_validation.rs b/ant-node/src/put_validation.rs index 946b9168e9..0925c3d1f6 100644 --- a/ant-node/src/put_validation.rs +++ b/ant-node/src/put_validation.rs @@ -16,7 +16,7 @@ use ant_protocol::storage::GraphEntry; use ant_protocol::{ storage::{ try_deserialize_record, try_serialize_record, Chunk, GraphEntryAddress, Pointer, - RecordHeader, RecordKind, RecordType, Scratchpad, + RecordHeader, RecordKind, Scratchpad, ValidationType, }, NetworkAddress, PrettyPrintRecordKey, }; @@ -49,13 +49,13 @@ impl Node { // if we're receiving this chunk PUT again, and we have been paid, // we eagerly retry replicaiton as it seems like other nodes are having trouble // did not manage to get this chunk as yet - self.replicate_valid_fresh_record(record_key, RecordType::Chunk); + self.replicate_valid_fresh_record(record_key, ValidationType::Chunk); // Notify replication_fetcher to mark the attempt as completed. // Send the notification earlier to avoid it got skipped due to: // the record becomes stored during the fetch because of other interleaved process. self.network() - .notify_fetch_completed(record.key.clone(), RecordType::Chunk); + .notify_fetch_completed(record.key.clone(), ValidationType::Chunk); debug!( "Chunk with addr {:?} already exists: {already_exists}, payment extracted.", @@ -75,13 +75,13 @@ impl Node { if store_chunk_result.is_ok() { Marker::ValidPaidChunkPutFromClient(&PrettyPrintRecordKey::from(&record.key)) .log(); - self.replicate_valid_fresh_record(record_key, RecordType::Chunk); + self.replicate_valid_fresh_record(record_key, ValidationType::Chunk); // Notify replication_fetcher to mark the attempt as completed. // Send the notification earlier to avoid it got skipped due to: // the record becomes stored during the fetch because of other interleaved process. self.network() - .notify_fetch_completed(record.key.clone(), RecordType::Chunk); + .notify_fetch_completed(record.key.clone(), ValidationType::Chunk); } store_chunk_result @@ -132,14 +132,16 @@ impl Node { .log(); self.replicate_valid_fresh_record( record_key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); // Notify replication_fetcher to mark the attempt as completed. // Send the notification earlier to avoid it got skipped due to: // the record becomes stored during the fetch because of other interleaved process. - self.network() - .notify_fetch_completed(record_key, RecordType::NonChunk(content_hash)); + self.network().notify_fetch_completed( + record_key, + ValidationType::NonChunk(content_hash), + ); } Err(_) => {} } @@ -213,7 +215,7 @@ impl Node { .log(); self.replicate_valid_fresh_record( record.key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); // Notify replication_fetcher to mark the attempt as completed. @@ -221,7 +223,7 @@ impl Node { // the record becomes stored during the fetch because of other interleaved process. self.network().notify_fetch_completed( record.key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); } res @@ -258,7 +260,7 @@ impl Node { // the record becomes stored during the fetch because of other interleaved process. self.network().notify_fetch_completed( record.key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); } else { warn!("Failed to store register update at {pretty_key:?}"); @@ -307,7 +309,7 @@ impl Node { // the record becomes stored during the fetch because of other interleaved process. self.network().notify_fetch_completed( record.key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); } res @@ -357,13 +359,13 @@ impl Node { .log(); self.replicate_valid_fresh_record( record.key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); // Notify replication_fetcher to mark the attempt as completed. self.network().notify_fetch_completed( record.key.clone(), - RecordType::NonChunk(content_hash), + ValidationType::NonChunk(content_hash), ); } res @@ -561,7 +563,10 @@ impl Node { if is_client_put { let content_hash = XorName::from_content(&record.value); - self.replicate_valid_fresh_record(scratchpad_key, RecordType::NonChunk(content_hash)); + self.replicate_valid_fresh_record( + scratchpad_key, + ValidationType::NonChunk(content_hash), + ); } Ok(()) @@ -613,7 +618,7 @@ impl Node { // However, to avoid `looping of replication`, a `replicated in` register // shall not trigger any further replication out. if is_client_put { - self.replicate_valid_fresh_record(key, RecordType::NonChunk(content_hash)); + self.replicate_valid_fresh_record(key, ValidationType::NonChunk(content_hash)); } Ok(()) @@ -880,7 +885,7 @@ impl Node { self.network().put_local_record(record); let content_hash = XorName::from_content(&pointer.network_address().to_bytes()); - self.replicate_valid_fresh_record(key, RecordType::NonChunk(content_hash)); + self.replicate_valid_fresh_record(key, ValidationType::NonChunk(content_hash)); Ok(()) } diff --git a/ant-node/src/replication.rs b/ant-node/src/replication.rs index 130b23e1f0..b34d6c1a71 100644 --- a/ant-node/src/replication.rs +++ b/ant-node/src/replication.rs @@ -10,7 +10,7 @@ use crate::{error::Result, node::Node}; use ant_networking::{GetRecordCfg, Network}; use ant_protocol::{ messages::{Cmd, Query, QueryResponse, Request, Response}, - storage::RecordType, + storage::ValidationType, NetworkAddress, PrettyPrintRecordKey, }; use libp2p::{ @@ -106,7 +106,7 @@ impl Node { pub(crate) fn replicate_valid_fresh_record( &self, paid_key: RecordKey, - record_type: RecordType, + record_type: ValidationType, ) { let network = self.network().clone(); diff --git a/ant-protocol/src/messages/cmd.rs b/ant-protocol/src/messages/cmd.rs index f0f5e089b4..83d2ed7fa0 100644 --- a/ant-protocol/src/messages/cmd.rs +++ b/ant-protocol/src/messages/cmd.rs @@ -7,7 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. #![allow(clippy::mutable_key_type)] // for Bytes in NetworkAddress -use crate::{storage::RecordType, NetworkAddress}; +use crate::{storage::ValidationType, NetworkAddress}; use serde::{Deserialize, Serialize}; /// Ant protocol cmds @@ -25,7 +25,7 @@ pub enum Cmd { /// Holder of the replication keys. holder: NetworkAddress, /// Keys of copy that shall be replicated. - keys: Vec<(NetworkAddress, RecordType)>, + keys: Vec<(NetworkAddress, ValidationType)>, }, /// Notify the peer it is now being considered as BAD due to the included behaviour PeerConsideredAsBad { diff --git a/ant-protocol/src/storage/header.rs b/ant-protocol/src/storage/header.rs index 00a4c13003..a67274d5be 100644 --- a/ant-protocol/src/storage/header.rs +++ b/ant-protocol/src/storage/header.rs @@ -19,7 +19,7 @@ use xor_name::XorName; /// This is to be only used within the node instance to reflect different content version. /// Hence, only need to have two entries: Chunk and NonChunk. #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, Hash)] -pub enum RecordType { +pub enum ValidationType { Chunk, NonChunk(XorName), } diff --git a/ant-protocol/src/storage/mod.rs b/ant-protocol/src/storage/mod.rs index 706beaede0..033aaab757 100644 --- a/ant-protocol/src/storage/mod.rs +++ b/ant-protocol/src/storage/mod.rs @@ -22,7 +22,9 @@ pub use self::{ address::{ChunkAddress, GraphEntryAddress, PointerAddress, ScratchpadAddress}, chunks::Chunk, graph::GraphEntry, - header::{try_deserialize_record, try_serialize_record, RecordHeader, RecordKind, RecordType}, + header::{ + try_deserialize_record, try_serialize_record, RecordHeader, RecordKind, ValidationType, + }, scratchpad::Scratchpad, }; diff --git a/docs/online-documentation/api/ant-node/README.md b/docs/online-documentation/api/ant-node/README.md index f14302a0a7..989885499b 100644 --- a/docs/online-documentation/api/ant-node/README.md +++ b/docs/online-documentation/api/ant-node/README.md @@ -123,12 +123,12 @@ The Ant Node provides a comprehensive API for running and managing nodes in the === "Rust" ```rust - use ant_protocol::storage::RecordType; + use ant_protocol::storage::ValidationType; // Store data let key = "0123456789abcdef"; // Hex string let value = b"Hello, World!"; - node.store_record(key, value, RecordType::Chunk)?; + node.store_record(key, value, ValidationType::Chunk)?; // Retrieve data let data = node.get_record(key)?; @@ -230,7 +230,7 @@ The Ant Node provides a comprehensive API for running and managing nodes in the ```rust use ant_node::error::Error; - match node.store_record(key, value, RecordType::Chunk) { + match node.store_record(key, value, ValidationType::Chunk) { Ok(_) => println!("Record stored successfully"), Err(Error::StorageFull) => println!("Storage is full"), Err(Error::InvalidKey) => println!("Invalid key format"), diff --git a/docs/online-documentation/api/ant-node/network.md b/docs/online-documentation/api/ant-node/network.md index 214d9cdc34..57bfc82809 100644 --- a/docs/online-documentation/api/ant-node/network.md +++ b/docs/online-documentation/api/ant-node/network.md @@ -172,12 +172,12 @@ This page documents the network operations available in the Ant Node API. === "Rust" ```rust - use ant_node::storage::RecordType; + use ant_node::storage::ValidationType; // Store a record let key = "0123456789abcdef"; // Hex string let value = b"Hello, World!"; - node.store_record(key, value, RecordType::Chunk)?; + node.store_record(key, value, ValidationType::Chunk)?; // Retrieve a record let data = node.get_record(key)?; From bb7ae764707430c5c31768327e8be9f2b3a2a438 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 6 Jan 2025 10:28:00 +0100 Subject: [PATCH 2/8] feat: add arbitrum-sepolia-test EVM network --- evmlib/src/lib.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/evmlib/src/lib.rs b/evmlib/src/lib.rs index e2715c0ed6..bb8fff8047 100644 --- a/evmlib/src/lib.rs +++ b/evmlib/src/lib.rs @@ -27,6 +27,9 @@ pub mod testnet; pub mod utils; pub mod wallet; +/// Timeout for transactions +const TX_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); + static PUBLIC_ARBITRUM_ONE_HTTP_RPC_URL: LazyLock = LazyLock::new(|| { "https://arb1.arbitrum.io/rpc" .parse() @@ -45,6 +48,9 @@ const ARBITRUM_ONE_PAYMENT_TOKEN_ADDRESS: Address = const ARBITRUM_SEPOLIA_PAYMENT_TOKEN_ADDRESS: Address = address!("BE1802c27C324a28aeBcd7eeC7D734246C807194"); +const ARBITRUM_SEPOLIA_TEST_PAYMENT_TOKEN_ADDRESS: Address = + address!("4bc1aCE0E66170375462cB4E6Af42Ad4D5EC689C"); + // Should be updated when the smart contract changes! const ARBITRUM_ONE_DATA_PAYMENTS_ADDRESS: Address = address!("607483B50C5F06c25cDC316b6d1E071084EeC9f5"); @@ -52,8 +58,8 @@ const ARBITRUM_ONE_DATA_PAYMENTS_ADDRESS: Address = const ARBITRUM_SEPOLIA_DATA_PAYMENTS_ADDRESS: Address = address!("993C7739f50899A997fEF20860554b8a28113634"); -/// Timeout for transactions -const TX_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); +const ARBITRUM_SEPOLIA_TEST_DATA_PAYMENTS_ADDRESS: Address = + address!("7f0842a78f7d4085d975ba91d630d680f91b1295"); #[serde_as] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -81,6 +87,7 @@ pub enum Network { #[default] ArbitrumOne, ArbitrumSepolia, + ArbitrumSepoliaTest, Custom(CustomNetwork), } @@ -89,6 +96,7 @@ impl std::fmt::Display for Network { match self { Network::ArbitrumOne => write!(f, "evm-arbitrum-one"), Network::ArbitrumSepolia => write!(f, "evm-arbitrum-sepolia"), + Network::ArbitrumSepoliaTest => write!(f, "evm-arbitrum-sepolia-test"), Network::Custom(_) => write!(f, "evm-custom"), } } @@ -107,6 +115,7 @@ impl Network { match self { Network::ArbitrumOne => "arbitrum-one", Network::ArbitrumSepolia => "arbitrum-sepolia", + Network::ArbitrumSepoliaTest => "arbitrum-sepolia-test", Network::Custom(_) => "custom", } } @@ -115,6 +124,7 @@ impl Network { match self { Network::ArbitrumOne => &PUBLIC_ARBITRUM_ONE_HTTP_RPC_URL, Network::ArbitrumSepolia => &PUBLIC_ARBITRUM_SEPOLIA_HTTP_RPC_URL, + Network::ArbitrumSepoliaTest => &PUBLIC_ARBITRUM_SEPOLIA_HTTP_RPC_URL, Network::Custom(custom) => &custom.rpc_url_http, } } @@ -123,6 +133,7 @@ impl Network { match self { Network::ArbitrumOne => &ARBITRUM_ONE_PAYMENT_TOKEN_ADDRESS, Network::ArbitrumSepolia => &ARBITRUM_SEPOLIA_PAYMENT_TOKEN_ADDRESS, + Network::ArbitrumSepoliaTest => &ARBITRUM_SEPOLIA_TEST_PAYMENT_TOKEN_ADDRESS, Network::Custom(custom) => &custom.payment_token_address, } } @@ -131,6 +142,7 @@ impl Network { match self { Network::ArbitrumOne => &ARBITRUM_ONE_DATA_PAYMENTS_ADDRESS, Network::ArbitrumSepolia => &ARBITRUM_SEPOLIA_DATA_PAYMENTS_ADDRESS, + Network::ArbitrumSepoliaTest => &ARBITRUM_SEPOLIA_TEST_DATA_PAYMENTS_ADDRESS, Network::Custom(custom) => &custom.data_payments_address, } } From 3c97c91e9c6466480745c5713a088bd2f47e1e39 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 6 Jan 2025 12:19:32 +0100 Subject: [PATCH 3/8] test: add quoting test and rename existing tests --- evmlib/tests/payment_vault.rs | 78 +++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 26 deletions(-) diff --git a/evmlib/tests/payment_vault.rs b/evmlib/tests/payment_vault.rs index 41c5881cbb..b9437c6f6d 100644 --- a/evmlib/tests/payment_vault.rs +++ b/evmlib/tests/payment_vault.rs @@ -116,8 +116,8 @@ async fn test_deploy() { } #[tokio::test] -async fn test_proxy_reachable() { - let network = Network::ArbitrumOne; +async fn test_proxy_reachable_on_arb_sepolia() { + let network = Network::ArbitrumSepolia; let provider = http_provider(network.rpc_url().clone()); let payment_vault = PaymentVaultHandler::new(*network.data_payments_address(), provider); @@ -130,12 +130,38 @@ async fn test_proxy_reachable() { } #[tokio::test] -async fn test_verify_payment() { +async fn test_get_quote_on_arb_sepolia_test() { + let network = Network::ArbitrumSepoliaTest; + let provider = http_provider(network.rpc_url().clone()); + let payment_vault = PaymentVaultHandler::new(*network.data_payments_address(), provider); + + let quoting_metrics = QuotingMetrics { + close_records_stored: 10, + max_records: 16 * 1024, + received_payment_count: 0, + live_time: 1400, + network_density: Some([ + 4, 4, 224, 228, 247, 252, 14, 44, 67, 21, 153, 47, 244, 18, 232, 1, 152, 195, 44, 43, + 29, 135, 19, 217, 240, 129, 64, 245, 240, 227, 129, 162, + ]), + network_size: Some(240), + }; + + let amount = payment_vault + .get_quote(vec![quoting_metrics]) + .await + .unwrap(); + + assert_eq!(amount, vec![Amount::from(610678225049958_u64)]); +} + +#[tokio::test] +async fn test_pay_for_quotes_on_local() { let (_anvil, network_token, mut payment_vault) = setup().await; let mut quote_payments = vec![]; - for _ in 0..5 { + for _ in 0..MAX_TRANSFERS_PER_TRANSACTION { let quote_payment = random_quote_payment(); quote_payments.push(quote_payment); } @@ -149,36 +175,18 @@ async fn test_verify_payment() { // so we set it to the same as the network token contract payment_vault.set_provider(network_token.contract.provider().clone()); - let result = payment_vault.pay_for_quotes(quote_payments.clone()).await; + let result = payment_vault.pay_for_quotes(quote_payments).await; assert!(result.is_ok(), "Failed with error: {:?}", result.err()); - - let payment_verifications: Vec<_> = quote_payments - .into_iter() - .map(|v| interface::IPaymentVault::PaymentVerification { - metrics: QuotingMetrics::default().into(), - rewardsAddress: v.1, - quoteHash: v.0, - }) - .collect(); - - let results = payment_vault - .verify_payment(payment_verifications) - .await - .expect("Verify payment failed"); - - for result in results { - assert!(result.isValid); - } } #[tokio::test] -async fn test_pay_for_quotes() { +async fn test_verify_payment_on_local() { let (_anvil, network_token, mut payment_vault) = setup().await; let mut quote_payments = vec![]; - for _ in 0..MAX_TRANSFERS_PER_TRANSACTION { + for _ in 0..5 { let quote_payment = random_quote_payment(); quote_payments.push(quote_payment); } @@ -192,7 +200,25 @@ async fn test_pay_for_quotes() { // so we set it to the same as the network token contract payment_vault.set_provider(network_token.contract.provider().clone()); - let result = payment_vault.pay_for_quotes(quote_payments).await; + let result = payment_vault.pay_for_quotes(quote_payments.clone()).await; assert!(result.is_ok(), "Failed with error: {:?}", result.err()); + + let payment_verifications: Vec<_> = quote_payments + .into_iter() + .map(|v| interface::IPaymentVault::PaymentVerification { + metrics: QuotingMetrics::default().into(), + rewardsAddress: v.1, + quoteHash: v.0, + }) + .collect(); + + let results = payment_vault + .verify_payment(payment_verifications) + .await + .expect("Verify payment failed"); + + for result in results { + assert!(result.isValid); + } } From 0a7422e2bd2d701a91729999bdefdfe68b0d39b6 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 6 Jan 2025 13:06:44 +0100 Subject: [PATCH 4/8] feat: add arb sepolia test network subcommand to node and manager --- ant-node-manager/src/bin/cli/subcommands/evm_network.rs | 4 ++++ ant-node/src/bin/antnode/subcommands.rs | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/ant-node-manager/src/bin/cli/subcommands/evm_network.rs b/ant-node-manager/src/bin/cli/subcommands/evm_network.rs index 2d795846cf..868c33aea2 100644 --- a/ant-node-manager/src/bin/cli/subcommands/evm_network.rs +++ b/ant-node-manager/src/bin/cli/subcommands/evm_network.rs @@ -19,6 +19,9 @@ pub enum EvmNetworkCommand { /// Use the Arbitrum Sepolia network EvmArbitrumSepolia, + /// Use the Arbitrum Sepolia network with test contracts + EvmArbitrumSepoliaTest, + /// Use a custom network EvmCustom { /// The RPC URL for the custom network @@ -45,6 +48,7 @@ impl TryInto for EvmNetworkCommand { match self { Self::EvmArbitrumOne => Ok(EvmNetwork::ArbitrumOne), Self::EvmArbitrumSepolia => Ok(EvmNetwork::ArbitrumSepolia), + Self::EvmArbitrumSepoliaTest => Ok(EvmNetwork::ArbitrumSepoliaTest), Self::EvmLocal => { if !cfg!(feature = "local") { return Err(color_eyre::eyre::eyre!( diff --git a/ant-node/src/bin/antnode/subcommands.rs b/ant-node/src/bin/antnode/subcommands.rs index a9e02d2be4..52c48f1ea7 100644 --- a/ant-node/src/bin/antnode/subcommands.rs +++ b/ant-node/src/bin/antnode/subcommands.rs @@ -10,6 +10,9 @@ pub(crate) enum EvmNetworkCommand { /// Use the Arbitrum Sepolia network EvmArbitrumSepolia, + /// Use the Arbitrum Sepolia network with test contracts + EvmArbitrumSepoliaTest, + /// Use a custom network EvmCustom { /// The RPC URL for the custom network @@ -32,6 +35,7 @@ impl Into for EvmNetworkCommand { match self { Self::EvmArbitrumOne => EvmNetwork::ArbitrumOne, Self::EvmArbitrumSepolia => EvmNetwork::ArbitrumSepolia, + Self::EvmArbitrumSepoliaTest => EvmNetwork::ArbitrumSepoliaTest, Self::EvmCustom { rpc_url, payment_token_address, From 8da56e549848feb6e35196f02e46f5c71eb537f5 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 6 Jan 2025 13:41:27 +0100 Subject: [PATCH 5/8] feat: add arb sepolia test network to evmlib util --- evmlib/src/utils.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/evmlib/src/utils.rs b/evmlib/src/utils.rs index 4e3133713f..8e679d95d7 100644 --- a/evmlib/src/utils.rs +++ b/evmlib/src/utils.rs @@ -112,12 +112,19 @@ pub fn get_evm_network_from_env() -> Result { .map(|v| v == "arbitrum-sepolia") .unwrap_or(false); + let use_arbitrum_sepolia_test = std::env::var("EVM_NETWORK") + .map(|v| v == "arbitrum-sepolia-test") + .unwrap_or(false); + if use_arbitrum_one { info!("Using Arbitrum One EVM network as EVM_NETWORK is set to 'arbitrum-one'"); Ok(Network::ArbitrumOne) } else if use_arbitrum_sepolia { info!("Using Arbitrum Sepolia EVM network as EVM_NETWORK is set to 'arbitrum-sepolia'"); Ok(Network::ArbitrumSepolia) + } else if use_arbitrum_sepolia_test { + info!("Using Arbitrum Sepolia Test EVM network as EVM_NETWORK is set to 'arbitrum-sepolia-test'"); + Ok(Network::ArbitrumSepoliaTest) } else if let Ok(evm_vars) = evm_vars { info!("Using custom EVM network from environment variables"); Ok(Network::Custom(CustomNetwork::new( From ef35c1b21a9bb72f39f2acc280f2b24842bdd65c Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 6 Jan 2025 14:09:05 +0100 Subject: [PATCH 6/8] chore: add more logging to the fetch store quotes fn --- autonomi/src/client/quote.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/autonomi/src/client/quote.rs b/autonomi/src/client/quote.rs index ca8c515ad4..b89e1bbf34 100644 --- a/autonomi/src/client/quote.rs +++ b/autonomi/src/client/quote.rs @@ -62,6 +62,7 @@ impl Client { .into_iter() .map(|content_addr| fetch_store_quote_with_retries(&self.network, content_addr)) .collect(); + let raw_quotes_per_addr = futures::future::try_join_all(futures).await?; // choose the quotes to pay for each address @@ -70,9 +71,15 @@ impl Client { let mut rate_limiter = RateLimiter::new(); for (content_addr, raw_quotes) in raw_quotes_per_addr { + debug!( + "fetching market price for content_addr: {content_addr}, with {} quotes.", + raw_quotes.len() + ); + // FIXME: find better way to deal with paid content addrs and feedback to the user // assume that content addr is already paid for and uploaded if raw_quotes.is_empty() { + debug!("content_addr: {content_addr} is already paid for. No need to fetch market price."); continue; } @@ -90,6 +97,8 @@ impl Client { ) .await?; + debug!("market prices: {all_prices:?}"); + let mut prices: Vec<(PeerId, PaymentQuote, Amount)> = all_prices .into_iter() .zip(raw_quotes.into_iter()) From e837c8e683e967663f9d249d3fbfbe92cae4a4b9 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Tue, 7 Jan 2025 16:17:25 +0100 Subject: [PATCH 7/8] chore: set default evm network to Arbitrum Sepolia --- evmlib/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/evmlib/src/lib.rs b/evmlib/src/lib.rs index e2715c0ed6..480ac8270b 100644 --- a/evmlib/src/lib.rs +++ b/evmlib/src/lib.rs @@ -78,8 +78,8 @@ impl CustomNetwork { #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub enum Network { - #[default] ArbitrumOne, + #[default] ArbitrumSepolia, Custom(CustomNetwork), } From 22123636f2509a4177f74e5202dc9d1a6f176563 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Tue, 7 Jan 2025 16:18:30 +0100 Subject: [PATCH 8/8] feat: add `evm_network` field to `ClientConfig` --- autonomi/src/client/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/autonomi/src/client/mod.rs b/autonomi/src/client/mod.rs index b6ddcfbbb9..c60424735a 100644 --- a/autonomi/src/client/mod.rs +++ b/autonomi/src/client/mod.rs @@ -82,6 +82,9 @@ pub struct ClientConfig { /// /// If not provided, the client will use the default bootstrap peers. pub peers: Option>, + + /// EVM network to use for quotations and payments. + pub evm_network: EvmNetwork, } impl Default for ClientConfig { @@ -92,6 +95,7 @@ impl Default for ClientConfig { #[cfg(not(feature = "local"))] local: false, peers: None, + evm_network: Default::default(), } } } @@ -151,6 +155,7 @@ impl Client { Self::init_with_config(ClientConfig { local, peers: Some(peers), + evm_network: Default::default(), }) .await }