Skip to content

Commit

Permalink
fix(network): accumulate spends by default if split
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Jun 18, 2024
1 parent 25dd060 commit 7bcd249
Showing 1 changed file with 42 additions and 7 deletions.
49 changes: 42 additions & 7 deletions sn_networking/src/event/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
driver::PendingGetClosestType, get_quorum_value, GetRecordCfg, GetRecordError, NetworkError,
Result, SwarmDriver, CLOSE_GROUP_SIZE,
driver::PendingGetClosestType, get_quorum_value, get_raw_signed_spends_from_record,
GetRecordCfg, GetRecordError, NetworkError, Result, SwarmDriver, CLOSE_GROUP_SIZE,
};
use itertools::Itertools;
use libp2p::kad::{
self, GetClosestPeersError, InboundRequest, PeerRecord, ProgressStep, QueryId, QueryResult,
QueryStats, Record, K_VALUE,
};
use sn_protocol::PrettyPrintRecordKey;
use sn_protocol::{
storage::{try_serialize_record, RecordKind},
PrettyPrintRecordKey,
};
use sn_transfers::SignedSpend;
use std::{
collections::{hash_map::Entry, HashSet},
collections::{hash_map::Entry, BTreeSet, HashSet},
time::Instant,
};
use tokio::sync::oneshot;
Expand Down Expand Up @@ -395,9 +399,40 @@ impl SwarmDriver {
Self::send_record_after_checking_target(sender, peer_record.record, &cfg)?;
} else {
debug!("For record {pretty_key:?} task {query_id:?}, fetch completed with split record");
sender
.send(Err(GetRecordError::SplitRecord { result_map }))
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
let mut accumulated_spends = BTreeSet::new();
for (record, _) in result_map.values() {
match get_raw_signed_spends_from_record(record) {
Ok(spends) => {
accumulated_spends.extend(spends);
}
Err(_) => {
continue;
}
}
}
if !accumulated_spends.is_empty() {
info!("For record {pretty_key:?} task {query_id:?}, found split record for a spend, accumulated and sending them as a single record");
let accumulated_spends = accumulated_spends
.into_iter()
.take(2)
.collect::<Vec<SignedSpend>>();

let bytes = try_serialize_record(&accumulated_spends, RecordKind::Spend)?;

let new_accumulated_record = Record {
key: peer_record.record.key,
value: bytes.to_vec(),
publisher: None,
expires: None,
};
sender
.send(Ok(new_accumulated_record))
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
} else {
sender
.send(Err(GetRecordError::SplitRecord { result_map }))
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}
}

// Stop the query; possibly stops more nodes from being queried.
Expand Down

0 comments on commit 7bcd249

Please sign in to comment.