Skip to content

Commit

Permalink
Merge pull request #51 from EspressoSystems/rm/metric-unpanic
Browse files Browse the repository at this point in the history
Allow metric initialization failure
  • Loading branch information
rob-maron authored Jul 19, 2024
2 parents 4e1a846 + c4afa9e commit 398b77f
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 17 deletions.
16 changes: 8 additions & 8 deletions cdn-proto/src/connection/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ use prometheus::{register_gauge, register_histogram, Gauge, Histogram};

lazy_static! {
// The total number of bytes sent
pub static ref BYTES_SENT: Gauge =
register_gauge!("total_bytes_sent", "the total number of bytes sent").unwrap();
pub static ref BYTES_SENT: Option<Gauge> =
register_gauge!("total_bytes_sent", "the total number of bytes sent").map_err(|e| eprintln!("Could not register metric: {:?}", e)).ok();

// The total number of bytes received
pub static ref BYTES_RECV: Gauge =
register_gauge!("total_bytes_recv", "the total number of bytes received").unwrap();
pub static ref BYTES_RECV: Option<Gauge> =
register_gauge!("total_bytes_recv", "the total number of bytes received").map_err(|e| eprintln!("Could not register metric: {:?}", e)).ok();

// Per-message latency
pub static ref LATENCY: Histogram =
register_histogram!("latency", "message delivery latency").unwrap();
pub static ref LATENCY: Option<Histogram> =
register_histogram!("latency", "message delivery latency").map_err(|e| eprintln!("Could not register metric: {:?}", e)).ok();

// The per-message latency over the last 30 seconds
pub static ref RUNNING_LATENCY: Gauge =
register_gauge!("running_latency", "average tail latency over the last 30s").unwrap();
pub static ref RUNNING_LATENCY: Option<Gauge> =
register_gauge!("running_latency", "average tail latency over the last 30s").map_err(|e| eprintln!("Could not register metric: {:?}", e)).ok();
}
5 changes: 4 additions & 1 deletion cdn-proto/src/connection/middleware/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ pub struct AllocationPermit(OwnedSemaphorePermit, #[cfg(feature = "metrics")] In
/// as latency.
impl Drop for AllocationPermit {
fn drop(&mut self) {
// Log the latency of the allocation (if available)
#[cfg(feature = "metrics")]
metrics::LATENCY.observe(self.1.elapsed().as_secs_f64());
if let Some(latency) = metrics::LATENCY.as_ref() {
latency.observe(self.1.elapsed().as_secs_f64());
}
}
}

Expand Down
12 changes: 8 additions & 4 deletions cdn-proto/src/connection/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,11 @@ async fn read_length_delimited<R: AsyncReadExt + Unpin + Send>(
"failed to read message"
);

// Add to our metrics, if desired
// Add to our metrics, if desired and available
#[cfg(feature = "metrics")]
metrics::BYTES_RECV.add(f64::from(message_size));
if let Some(bytes_recv) = metrics::BYTES_RECV.as_ref() {
bytes_recv.add(f64::from(message_size));
}

Ok(Bytes::from(buffer, permit))
}
Expand Down Expand Up @@ -376,9 +378,11 @@ async fn write_length_delimited<W: AsyncWriteExt + Unpin + Send>(
"failed to send message"
);

// Increment the number of bytes we've sent by this amount
// Increment the number of bytes we've sent by this amount, if available
#[cfg(feature = "metrics")]
metrics::BYTES_SENT.add(f64::from(message_len));
if let Some(bytes_sent) = metrics::BYTES_SENT.as_ref() {
bytes_sent.add(f64::from(message_len));
}

Ok(())
}
Expand Down
16 changes: 12 additions & 4 deletions cdn-proto/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{net::SocketAddr, time::Duration};

use tokio::time::sleep;
use tracing::error;
use tracing::{error, warn};
use warp::Filter;

use crate::connection::metrics;
Expand Down Expand Up @@ -39,22 +39,30 @@ pub async fn running_latency_calculator() {
let mut latency_sum = 0.0;
let mut latency_count = 0;

// Exit if the latency metrics could not be initialized
let (Some(latency_metric), Some(running_latency_metric)) =
(metrics::LATENCY.as_ref(), metrics::RUNNING_LATENCY.as_ref())
else {
warn!("Running latency calculator exiting: metrics could not be initialized");
return;
};

// Start calculating the latency
loop {
// Sleep for 30s
sleep(Duration::from_secs(30)).await;

// Fetch the current sum and count
let current_sum = metrics::LATENCY.get_sample_sum();
let current_count = metrics::LATENCY.get_sample_count();
let current_sum = latency_metric.get_sample_sum();
let current_count = latency_metric.get_sample_count();

// Calculate the running latency by subtracting the previous sum and count
latency_sum = current_sum - latency_sum;
latency_count = current_count - latency_count;

// Set the running latency if the new count is not 0
if latency_count != 0 {
metrics::RUNNING_LATENCY.set(latency_sum / latency_count as f64);
running_latency_metric.set(latency_sum / latency_count as f64);
}

// Update the previous sum and count for the next iteration
Expand Down

0 comments on commit 398b77f

Please sign in to comment.