Skip to content

Commit

Permalink
Send mixnet packet stats using task client
Browse files Browse the repository at this point in the history
  • Loading branch information
octol committed Nov 8, 2024
1 parent 74db9ab commit 08e6821
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 27 deletions.
2 changes: 1 addition & 1 deletion common/client-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub(crate) mod helpers;
pub mod inbound_messages;
pub mod key_manager;
pub mod mix_traffic;
pub(crate) mod packet_statistics_control;
pub mod packet_statistics_control;
pub mod real_messages_control;
pub mod received_buffer;
pub mod replies;
Expand Down
95 changes: 69 additions & 26 deletions common/client-core/src/client/packet_statistics_control.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use core::fmt;
use std::{
collections::VecDeque,
time::{Duration, Instant},
Expand Down Expand Up @@ -189,29 +190,64 @@ impl std::ops::Sub for PacketStatistics {
}
}

pub struct MixnetBandwidthStatisticsEvent {
pub rates: PacketRates,
}

impl MixnetBandwidthStatisticsEvent {
pub fn new(rates: PacketRates) -> Self {
Self { rates }
}
}

impl nym_task::TaskStatusEvent for MixnetBandwidthStatisticsEvent {
fn as_any(&self) -> &dyn std::any::Any {
self
}
}

impl fmt::Display for MixnetBandwidthStatisticsEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.rates.summary())
}
}

#[derive(Debug, Clone)]
struct PacketRates {
real_packets_sent: f64,
real_packets_sent_size: f64,
cover_packets_sent: f64,
cover_packets_sent_size: f64,

real_packets_received: f64,
real_packets_received_size: f64,
cover_packets_received: f64,
cover_packets_received_size: f64,

total_acks_received: f64,
total_acks_received_size: f64,
real_acks_received: f64,
real_acks_received_size: f64,
cover_acks_received: f64,
cover_acks_received_size: f64,

real_packets_queued: f64,
retransmissions_queued: f64,
reply_surbs_queued: f64,
additional_reply_surbs_queued: f64,
pub struct PacketRates {
pub real_packets_sent: f64,
pub real_packets_sent_size: f64,
pub cover_packets_sent: f64,
pub cover_packets_sent_size: f64,

pub real_packets_received: f64,
pub real_packets_received_size: f64,
pub cover_packets_received: f64,
pub cover_packets_received_size: f64,

pub total_acks_received: f64,
pub total_acks_received_size: f64,
pub real_acks_received: f64,
pub real_acks_received_size: f64,
pub cover_acks_received: f64,
pub cover_acks_received_size: f64,

pub real_packets_queued: f64,
pub retransmissions_queued: f64,
pub reply_surbs_queued: f64,
pub additional_reply_surbs_queued: f64,
}

impl fmt::Display for PacketRates {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"down: {}/s, up: {}/s (cover down: {}/s, cover up: {}/s)",
bibytes2(self.real_packets_received_size),
bibytes2(self.real_packets_sent_size),
bibytes2(self.cover_packets_received_size),
bibytes2(self.cover_packets_sent_size),
)
}
}

impl From<PacketStatistics> for PacketRates {
Expand Down Expand Up @@ -456,11 +492,13 @@ impl PacketStatisticsControl {
}
}

fn report_rates(&self) {
fn report_rates(&self) -> Option<PacketRates> {
if let Some((_, rates)) = self.rates.back() {
log::debug!("{}", rates.summary());
log::debug!("{}", rates.detailed_summary());
return Some(rates.clone());
}
None
}

fn report_counters(&self) {
Expand Down Expand Up @@ -499,7 +537,7 @@ impl PacketStatisticsControl {
// IDEA: if there is a burst of acks, that could indicate tokio task starvation.
}

pub(crate) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
pub(crate) async fn run_with_shutdown(&mut self, mut task_client: nym_task::TaskClient) {
log::debug!("Started PacketStatisticsControl with graceful shutdown support");

let report_interval = Duration::from_secs(PACKET_REPORT_INTERVAL_SECS);
Expand Down Expand Up @@ -583,11 +621,16 @@ impl PacketStatisticsControl {
self.update_rates();
}
_ = report_interval.tick() => {
self.report_rates();
let rates = self.report_rates();
self.check_for_notable_events();
self.report_counters();

// Report our current bandwidth used to e.g a GUI client
if let Some(rates) = rates {
task_client.send_status_msg(Box::new(MixnetBandwidthStatisticsEvent::new(rates)));
}
}
_ = shutdown.recv_with_delay() => {
_ = task_client.recv_with_delay() => {
log::trace!("PacketStatisticsControl: Received shutdown");
break;
},
Expand Down

0 comments on commit 08e6821

Please sign in to comment.