Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: records #1902

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use libp2p::{
use prometheus_client::registry::Registry;
use rand::{thread_rng, Rng};
use sn_networking::{
get_signed_spend_from_record, multiaddr_is_global,
get_solitary_signed_spend_from_record, multiaddr_is_global,
target_arch::{interval, spawn, timeout, Instant},
GetRecordCfg, GetRecordError, NetworkBuilder, NetworkError, NetworkEvent, PutRecordCfg,
VerificationKind, CLOSE_GROUP_SIZE,
Expand Down Expand Up @@ -460,6 +460,7 @@ impl Client {
Some(RetryStrategy::Quick)
};
let get_cfg = GetRecordCfg {
accumulate_spend_attempts: false,
get_quorum,
retry_strategy,
target_record: None,
Expand Down Expand Up @@ -654,6 +655,7 @@ impl Client {
get_quorum: Quorum::N(
NonZeroUsize::new(2).ok_or(Error::NonZeroUsizeWasInitialisedAsZero)?,
),
accumulate_spend_attempts: false,
retry_strategy,
target_record: None, // Not used since we use ChunkProof
expected_holders: Default::default(),
Expand Down Expand Up @@ -738,6 +740,7 @@ impl Client {
};

let get_cfg = GetRecordCfg {
accumulate_spend_attempts: false,
get_quorum: Quorum::One,
retry_strategy: Some(retry_strategy.unwrap_or(RetryStrategy::Quick)),
target_record: None,
Expand Down Expand Up @@ -889,6 +892,7 @@ impl Client {

// When there is retry on Put side, no need to have a retry on Get
let verification_cfg = GetRecordCfg {
accumulate_spend_attempts: true,
get_quorum: Quorum::Majority,
retry_strategy: None,
target_record: record_to_verify,
Expand Down Expand Up @@ -933,11 +937,12 @@ impl Client {
/// # }
/// ```
pub async fn get_spend_from_network(&self, address: SpendAddress) -> Result<SignedSpend> {
self.try_fetch_spend_from_network(
self.try_fetch_solitary_spend_from_network(
address,
GetRecordCfg {
get_quorum: Quorum::Majority,
retry_strategy: Some(RetryStrategy::Balanced),
accumulate_spend_attempts: false,
target_record: None,
expected_holders: Default::default(),
},
Expand All @@ -949,11 +954,12 @@ impl Client {
/// Useful to help decide whether a re-put is necessary, or a spend exists already
/// (client side verification).
pub async fn peek_a_spend(&self, address: SpendAddress) -> Result<SignedSpend> {
self.try_fetch_spend_from_network(
self.try_fetch_solitary_spend_from_network(
address,
GetRecordCfg {
get_quorum: Quorum::One,
retry_strategy: None,
accumulate_spend_attempts: true,
target_record: None,
expected_holders: Default::default(),
},
Expand All @@ -964,11 +970,12 @@ impl Client {
/// This is a similar funcation to `get_spend_from_network` to get a spend from network.
/// Just using different `RetryStrategy` to improve the performance during crawling.
pub async fn crawl_spend_from_network(&self, address: SpendAddress) -> Result<SignedSpend> {
self.try_fetch_spend_from_network(
self.try_fetch_solitary_spend_from_network(
address,
GetRecordCfg {
get_quorum: Quorum::Majority,
retry_strategy: None,
accumulate_spend_attempts: true,
target_record: None,
expected_holders: Default::default(),
},
Expand All @@ -983,7 +990,8 @@ impl Client {
self.peek_a_spend(genesis_addr).await.is_ok()
}

async fn try_fetch_spend_from_network(
/// Gets a spend or returns error if DoubleSpendAttempt is detected.
async fn try_fetch_solitary_spend_from_network(
&self,
address: SpendAddress,
get_cfg: GetRecordCfg,
Expand All @@ -1003,7 +1011,7 @@ impl Client {
PrettyPrintRecordKey::from(&record.key)
);

let signed_spend = get_signed_spend_from_record(&address, &record)?;
let signed_spend = get_solitary_signed_spend_from_record(&address, &record)?;

// check addr
let spend_addr = SpendAddress::from_unique_pubkey(signed_spend.unique_pubkey());
Expand Down
1 change: 1 addition & 0 deletions sn_client/src/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,7 @@ impl ClientRegister {
let verification_cfg = GetRecordCfg {
get_quorum: Quorum::One,
retry_strategy: Some(RetryStrategy::Balanced),
accumulate_spend_attempts: false,
target_record: record_to_verify,
expected_holders,
};
Expand Down
2 changes: 1 addition & 1 deletion sn_client/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ impl Client {
let pk = cn.unique_pubkey();
let addr = SpendAddress::from_unique_pubkey(&pk);
let self_clone = self.network.clone();
let _ = tasks.spawn(async move { self_clone.get_spend(addr).await });
let _ = tasks.spawn(async move { self_clone.get_valid_spend(addr).await });
}
while let Some(result) = tasks.join_next().await {
let res = result.map_err(|e| WalletError::FailedToGetSpend(format!("{e}")))?;
Expand Down
4 changes: 4 additions & 0 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ pub struct GetRecordCfg {
pub get_quorum: Quorum,
/// If enabled, the provided `RetryStrategy` is used to retry if a GET attempt fails.
pub retry_strategy: Option<RetryStrategy>,
/// If disabled, we error out if any double spend is found.
/// If enabled all double spends found are accumulated and returned.
/// This is useful for the client to decide if they want to retry the operation.
pub accumulate_spend_attempts: bool,
/// Only return if we fetch the provided record.
pub target_record: Option<Record>,
/// Logs if the record was not fetched from the provided set of peers.
Expand Down
52 changes: 46 additions & 6 deletions sn_networking/src/event/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
driver::PendingGetClosestType, get_quorum_value, GetRecordCfg, GetRecordError, NetworkError,
NetworkEvent, Result, SwarmDriver, CLOSE_GROUP_SIZE,
driver::PendingGetClosestType, get_quorum_value, get_raw_signed_spends_from_record,
GetRecordCfg, GetRecordError, NetworkError, NetworkEvent, Result, SwarmDriver,
CLOSE_GROUP_SIZE,
};
use itertools::Itertools;
use libp2p::kad::{
self, GetClosestPeersError, InboundRequest, PeerRecord, ProgressStep, QueryId, QueryResult,
QueryStats, Record, K_VALUE,
};
use sn_protocol::PrettyPrintRecordKey;
use sn_protocol::{
storage::{try_serialize_record, RecordKind},
PrettyPrintRecordKey,
};
use std::{
collections::{hash_map::Entry, HashSet},
time::Instant,
Expand Down Expand Up @@ -402,9 +406,45 @@ impl SwarmDriver {
Self::send_record_after_checking_target(sender, peer_record.record, &cfg)?;
} else {
debug!("For record {pretty_key:?} task {query_id:?}, fetch completed with split record");
sender
.send(Err(GetRecordError::SplitRecord { result_map }))
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;

// Ensure we're not expecting a concrete record.
if cfg.accumulate_spend_attempts && cfg.target_record.is_none() {
let mut all_found_spends = vec![];
for (_record_hash, (record, _peers)) in result_map {
match get_raw_signed_spends_from_record(&record) {
Ok(spends_here) => {
all_found_spends.extend(spends_here);
}
Err(NetworkError::RecordKindMismatch(record_kind)) => {
trace!("For record {pretty_key:?} task {query_id:?}, found record with kind {record_kind:?} instead of spend record");
}
Err(error) => {
error!("Unexpected error when trying to accumulate spend atttempts: {error:?}");
}
}
}

if all_found_spends.is_empty() {
warn!("For record {pretty_key:?} task {query_id:?}, found no spend records");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd want to send something back to the sender here. Else the caller would be hanging on the get operation forever.

Something like:

 sender
    .send(Err(GetRecordError::NoSpendsFoundDuringAccumulation))
    .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}

Thinking about it, can we move the accumulation of spend attempts to a higher level?
We return SplitRecord error here, so we can use that to accumulate the spends? It'd keep the kad get flow clean imo?

} else {
let bytes = try_serialize_record(&all_found_spends, RecordKind::Spend)?;

let new_accumualted_record = Record {
key: peer_record.record.key,
value: bytes.to_vec(),
publisher: None,
expires: None,
};
sender
.send(Ok(new_accumualted_record))
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}
} else {
warn!("Unexpected split record. If checking spends, you may want to `accumulate_spend_attempts` in the GetRecordCfg.");
sender
.send(Err(GetRecordError::SplitRecord { result_map }))
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}
}

// Stop the query; possibly stops more nodes from being queried.
Expand Down
3 changes: 2 additions & 1 deletion sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
error::{GetRecordError, NetworkError},
event::{MsgResponder, NetworkEvent},
record_store::{calculate_cost_for_records, NodeRecordStore},
transfers::{get_raw_signed_spends_from_record, get_signed_spend_from_record},
transfers::{get_raw_signed_spends_from_record, get_solitary_signed_spend_from_record},
};

use self::{cmd::SwarmCmd, error::Result};
Expand Down Expand Up @@ -516,6 +516,7 @@

// if we don't want to retry, throw permanent error
if cfg.retry_strategy.is_none() {
// TODO this is where to decide if we just return accumulated record eg

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
if let Err(e) = result {
return Err(BackoffError::Permanent(NetworkError::from(e)));
}
Expand Down
4 changes: 3 additions & 1 deletion sn_networking/src/spends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ impl Network {
.inputs
.iter()
.map(|input| input.unique_pubkey);
// This will error with double spend... Is that wanted?
// I think so here as _this spend_ is now invalid
let tasks: Vec<_> = parent_keys
.map(|a| self.get_spend(SpendAddress::from_unique_pubkey(&a)))
.map(|a| self.get_valid_spend(SpendAddress::from_unique_pubkey(&a)))
.collect();
let parent_spends: BTreeSet<SignedSpend> = join_all(tasks)
.await
Expand Down
14 changes: 8 additions & 6 deletions sn_networking/src/transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl Network {
let get_cfg = GetRecordCfg {
get_quorum: Quorum::Majority,
retry_strategy: None,
accumulate_spend_attempts: true,
target_record: None,
expected_holders: Default::default(),
};
Expand All @@ -48,11 +49,12 @@ impl Network {
/// We know it must be there, and has to be fetched from Quorum::All
///
/// If we get a quorum error, we increase the RetryStrategy
pub async fn get_spend(&self, address: SpendAddress) -> Result<SignedSpend> {
pub async fn get_valid_spend(&self, address: SpendAddress) -> Result<SignedSpend> {
let key = NetworkAddress::from_spend_address(address).to_record_key();
let mut get_cfg = GetRecordCfg {
get_quorum: Quorum::All,
retry_strategy: Some(RetryStrategy::Quick),
accumulate_spend_attempts: false,
target_record: None,
expected_holders: Default::default(),
};
Expand Down Expand Up @@ -85,7 +87,7 @@ impl Network {
PrettyPrintRecordKey::from(&record.key)
);

get_signed_spend_from_record(&address, &record)
get_solitary_signed_spend_from_record(&address, &record)
}

/// This function is used to receive a Transfer and turn it back into spendable CashNotes.
Expand Down Expand Up @@ -130,7 +132,7 @@ impl Network {
let mut tasks = JoinSet::new();
for addr in parent_addrs.clone() {
let self_clone = self.clone();
let _ = tasks.spawn(async move { self_clone.get_spend(addr).await });
let _ = tasks.spawn(async move { self_clone.get_valid_spend(addr).await });
}
let mut parent_spends = BTreeSet::new();
while let Some(result) = tasks.join_next().await {
Expand Down Expand Up @@ -188,7 +190,7 @@ impl Network {
}
let self_clone = self.clone();
let addr = SpendAddress::from_unique_pubkey(input_key);
let _ = tasks.spawn(async move { self_clone.get_spend(addr).await });
let _ = tasks.spawn(async move { self_clone.get_valid_spend(addr).await });
}
while let Some(result) = tasks.join_next().await {
let signed_spend = result
Expand Down Expand Up @@ -231,8 +233,8 @@ pub fn get_raw_signed_spends_from_record(record: &Record) -> Result<Vec<SignedSp
}

/// Get the signed spend out of a record.
/// Double spends are returned as an error
pub fn get_signed_spend_from_record(
/// Double spend attempts are returned as an error
pub fn get_solitary_signed_spend_from_record(
address: &SpendAddress,
record: &Record,
) -> Result<SignedSpend> {
Expand Down
3 changes: 3 additions & 0 deletions sn_node/src/put_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ impl Node {

// validate the signed spends against the network and the local knowledge
debug!("Validating spends for {pretty_key:?} with unique key: {unique_pubkey:?}");

// In here we are expecting to have at most 2 spends for the same unique_pubkey
// This could push out original valid spends if there is a double spend.
let (spend1, maybe_spend2) = match self
.signed_spends_to_keep(spends_for_key.clone(), *unique_pubkey)
.await
Expand Down
5 changes: 5 additions & 0 deletions sn_node/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ impl Node {
trace!(
"Can not fetch record {pretty_key:?} from node {holder:?}, fetching from the network"
);

// Here we don't care about how many nodes are storing this record...
let get_cfg = GetRecordCfg {
accumulate_spend_attempts: true,
get_quorum: Quorum::One,
retry_strategy: None,
target_record: None,
expected_holders: Default::default(),
};
// But this can error out with SplitRecord and so keep retrying so we do not replicate
// DoubleSpendAttempted Spends here!
node.network.get_record_from_network(key, &get_cfg).await?
};

Expand Down
Loading