Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feat-report-already-p…
Browse files Browse the repository at this point in the history
…aid-records
  • Loading branch information
b-zee committed Jan 8, 2025
2 parents 52c2240 + 6f94a1e commit b7e2421
Show file tree
Hide file tree
Showing 36 changed files with 432 additions and 323 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ jobs:
ls -l $ANT_DATA_PATH/client_first/logs
mkdir $ANT_DATA_PATH/client
ls -l $ANT_DATA_PATH
cp ./the-test-data.zip ./the-test-data_1.zip
./target/release/ant --log-output-dest=data-dir file upload "./the-test-data_1.zip" > ./second_upload 2>&1
./target/release/ant --log-output-dest=data-dir file upload --public "./the-test-data.zip" > ./upload_output_second 2>&1
rg 'Total cost: 0 AttoTokens' ./upload_output_second -c --stats
env:
ANT_LOG: "all"
timeout-minutes: 25

- name: showing the second upload terminal output
run: cat second_upload
run: cat upload_output_second
shell: bash
if: always()

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 8 additions & 15 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use ant_evm::{PaymentQuote, QuotingMetrics, U256};
use ant_protocol::{
convert_distance_to_u256,
messages::{Cmd, Request, Response},
storage::{RecordHeader, RecordKind, RecordType},
storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
NetworkAddress, PrettyPrintRecordKey,
};
use libp2p::{
Expand Down Expand Up @@ -92,7 +92,7 @@ pub enum LocalSwarmCmd {
},
/// Get the Addresses of all the Records held locally
GetAllLocalRecordAddresses {
sender: oneshot::Sender<HashMap<NetworkAddress, RecordType>>,
sender: oneshot::Sender<HashMap<NetworkAddress, ValidationType>>,
},
/// Get data from the local RecordStore
GetLocalRecord {
Expand Down Expand Up @@ -120,7 +120,7 @@ pub enum LocalSwarmCmd {
/// This should be done after the record has been stored to disk
AddLocalRecordAsStored {
key: RecordKey,
record_type: RecordType,
record_type: ValidationType,
},
/// Add a peer to the blocklist
AddPeerToBlockList {
Expand All @@ -141,7 +141,7 @@ pub enum LocalSwarmCmd {
quotes: Vec<(PeerId, PaymentQuote)>,
},
// Notify a fetch completion
FetchCompleted((RecordKey, RecordType)),
FetchCompleted((RecordKey, ValidationType)),
/// Triggers interval repliation
/// NOTE: This does result in outgoing messages, but is produced locally
TriggerIntervalReplication,
Expand Down Expand Up @@ -661,19 +661,12 @@ impl SwarmDriver {
let record_type = match RecordHeader::from_record(&record) {
Ok(record_header) => {
match record_header.kind {
RecordKind::Chunk => RecordType::Chunk,
RecordKind::GraphEntry
| RecordKind::Pointer
| RecordKind::Register
| RecordKind::Scratchpad => {
RecordKind::DataOnly(DataTypes::Chunk) => ValidationType::Chunk,
RecordKind::DataOnly(_) => {
let content_hash = XorName::from_content(&record.value);
RecordType::NonChunk(content_hash)
ValidationType::NonChunk(content_hash)
}
RecordKind::ChunkWithPayment
| RecordKind::RegisterWithPayment
| RecordKind::PointerWithPayment
| RecordKind::GraphEntryWithPayment
| RecordKind::ScratchpadWithPayment => {
RecordKind::DataWithPayment(_) => {
error!("Record {record_key:?} with payment shall not be stored locally.");
return Err(NetworkError::InCorrectRecordHeader);
}
Expand Down
4 changes: 2 additions & 2 deletions ant-networking/src/event/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
GetRecordCfg, GetRecordError, NetworkError, Result, SwarmDriver, CLOSE_GROUP_SIZE,
};
use ant_protocol::{
storage::{try_serialize_record, GraphEntry, RecordKind},
storage::{try_serialize_record, DataTypes, GraphEntry, RecordKind},
NetworkAddress, PrettyPrintRecordKey,
};
use itertools::Itertools;
Expand Down Expand Up @@ -415,7 +415,7 @@ impl SwarmDriver {

let bytes = try_serialize_record(
&accumulated_transactions,
RecordKind::GraphEntry,
RecordKind::DataOnly(DataTypes::GraphEntry),
)?;

let new_accumulated_record = Record {
Expand Down
4 changes: 2 additions & 2 deletions ant-networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
};
use ant_protocol::{
messages::{CmdResponse, Request, Response},
storage::RecordType,
storage::ValidationType,
NetworkAddress,
};
use libp2p::request_response::{self, Message};
Expand Down Expand Up @@ -159,7 +159,7 @@ impl SwarmDriver {
fn add_keys_to_replication_fetcher(
&mut self,
sender: NetworkAddress,
incoming_keys: Vec<(NetworkAddress, RecordType)>,
incoming_keys: Vec<(NetworkAddress, ValidationType)>,
) {
let holder = if let Some(peer_id) = sender.as_peer_id() {
peer_id
Expand Down
8 changes: 5 additions & 3 deletions ant-networking/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{driver::GetRecordCfg, Network, NetworkError, Result};
use ant_protocol::storage::{GraphEntry, GraphEntryAddress};
use ant_protocol::storage::{DataTypes, GraphEntry, GraphEntryAddress};
use ant_protocol::{
storage::{try_deserialize_record, RecordHeader, RecordKind, RetryStrategy},
NetworkAddress, PrettyPrintRecordKey,
Expand Down Expand Up @@ -37,14 +37,16 @@ impl Network {

pub fn get_graph_entry_from_record(record: &Record) -> Result<Vec<GraphEntry>> {
let header = RecordHeader::from_record(record)?;
if let RecordKind::GraphEntry = header.kind {
if let RecordKind::DataOnly(DataTypes::GraphEntry) = header.kind {
let transactions = try_deserialize_record::<Vec<GraphEntry>>(record)?;
Ok(transactions)
} else {
warn!(
"RecordKind mismatch while trying to retrieve graph_entry from record {:?}",
PrettyPrintRecordKey::from(&record.key)
);
Err(NetworkError::RecordKindMismatch(RecordKind::GraphEntry))
Err(NetworkError::RecordKindMismatch(RecordKind::DataOnly(
DataTypes::GraphEntry,
)))
}
}
64 changes: 32 additions & 32 deletions ant-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use ant_evm::{PaymentQuote, QuotingMetrics};
use ant_protocol::{
error::Error as ProtocolError,
messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response},
storage::{Pointer, RecordType, RetryStrategy, Scratchpad},
storage::{DataTypes, Pointer, RetryStrategy, Scratchpad, ValidationType},
NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
};
use futures::future::select_all;
Expand Down Expand Up @@ -632,16 +632,11 @@ impl Network {
}

match kind {
RecordKind::Chunk
| RecordKind::ChunkWithPayment
| RecordKind::GraphEntryWithPayment
| RecordKind::RegisterWithPayment
| RecordKind::PointerWithPayment
| RecordKind::ScratchpadWithPayment => {
RecordKind::DataOnly(DataTypes::Chunk) | RecordKind::DataWithPayment(_) => {
error!("Encountered a split record for {pretty_key:?} with unexpected RecordKind {kind:?}, skipping.");
continue;
}
RecordKind::GraphEntry => {
RecordKind::DataOnly(DataTypes::GraphEntry) => {
info!("For record {pretty_key:?}, we have a split record for a transaction attempt. Accumulating transactions");

match get_graph_entry_from_record(record) {
Expand All @@ -653,7 +648,7 @@ impl Network {
}
}
}
RecordKind::Register => {
RecordKind::DataOnly(DataTypes::Register) => {
info!("For record {pretty_key:?}, we have a split record for a register. Accumulating registers");
let Ok(register) = try_deserialize_record::<SignedRegister>(record) else {
error!(
Expand All @@ -675,7 +670,7 @@ impl Network {
}
}
}
RecordKind::Pointer => {
RecordKind::DataOnly(DataTypes::Pointer) => {
info!("For record {pretty_key:?}, we have a split record for a pointer. Selecting the one with the highest count");
let Ok(pointer) = try_deserialize_record::<Pointer>(record) else {
error!(
Expand All @@ -697,7 +692,7 @@ impl Network {
}
valid_pointer = Some(pointer);
}
RecordKind::Scratchpad => {
RecordKind::DataOnly(DataTypes::Scratchpad) => {
info!("For record {pretty_key:?}, we have a split record for a scratchpad. Selecting the one with the highest count");
let Ok(scratchpad) = try_deserialize_record::<Scratchpad>(record) else {
error!(
Expand Down Expand Up @@ -733,7 +728,7 @@ impl Network {
.collect::<Vec<GraphEntry>>();
let record = Record {
key: key.clone(),
value: try_serialize_record(&accumulated_transactions, RecordKind::GraphEntry)
value: try_serialize_record(&accumulated_transactions, RecordKind::DataOnly(DataTypes::GraphEntry))
.map_err(|err| {
error!(
"Error while serializing the accumulated transactions for {pretty_key:?}: {err:?}"
Expand All @@ -754,14 +749,15 @@ impl Network {
acc
});

let record_value = try_serialize_record(&signed_register, RecordKind::Register)
.map_err(|err| {
error!(
let record_value =
try_serialize_record(&signed_register, RecordKind::DataOnly(DataTypes::Register))
.map_err(|err| {
error!(
"Error while serializing the merged register for {pretty_key:?}: {err:?}"
);
NetworkError::from(err)
})?
.to_vec();
NetworkError::from(err)
})?
.to_vec();

let record = Record {
key: key.clone(),
Expand All @@ -772,12 +768,13 @@ impl Network {
return Ok(Some(record));
} else if let Some(pointer) = valid_pointer {
info!("For record {pretty_key:?} task found a valid pointer, returning it.");
let record_value = try_serialize_record(&pointer, RecordKind::Pointer)
.map_err(|err| {
error!("Error while serializing the pointer for {pretty_key:?}: {err:?}");
NetworkError::from(err)
})?
.to_vec();
let record_value =
try_serialize_record(&pointer, RecordKind::DataOnly(DataTypes::Pointer))
.map_err(|err| {
error!("Error while serializing the pointer for {pretty_key:?}: {err:?}");
NetworkError::from(err)
})?
.to_vec();

let record = Record {
key: key.clone(),
Expand All @@ -788,12 +785,15 @@ impl Network {
return Ok(Some(record));
} else if let Some(scratchpad) = valid_scratchpad {
info!("For record {pretty_key:?} task found a valid scratchpad, returning it.");
let record_value = try_serialize_record(&scratchpad, RecordKind::Scratchpad)
.map_err(|err| {
error!("Error while serializing the scratchpad for {pretty_key:?}: {err:?}");
NetworkError::from(err)
})?
.to_vec();
let record_value =
try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad))
.map_err(|err| {
error!(
"Error while serializing the scratchpad for {pretty_key:?}: {err:?}"
);
NetworkError::from(err)
})?
.to_vec();

let record = Record {
key: key.clone(),
Expand Down Expand Up @@ -964,7 +964,7 @@ impl Network {

/// Notify ReplicationFetch a fetch attempt is completed.
/// (but it won't trigger any real writes to disk, say fetched an old version of register)
pub fn notify_fetch_completed(&self, key: RecordKey, record_type: RecordType) {
pub fn notify_fetch_completed(&self, key: RecordKey, record_type: ValidationType) {
self.send_local_swarm_cmd(LocalSwarmCmd::FetchCompleted((key, record_type)))
}

Expand Down Expand Up @@ -995,7 +995,7 @@ impl Network {
/// Returns the Addresses of all the locally stored Records
pub async fn get_all_local_record_addresses(
&self,
) -> Result<HashMap<NetworkAddress, RecordType>> {
) -> Result<HashMap<NetworkAddress, ValidationType>> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender });

Expand Down
Loading

0 comments on commit b7e2421

Please sign in to comment.