Skip to content

Commit

Permalink
Add per-chain aggregate software/hardware telemetry (paritytech#464)
Browse files Browse the repository at this point in the history
* Add per-chain aggregate software/hardware telemetry

* Fix tests' compilation

* Add extra comments for the `Counter` struct

* Replace the boolean argument with an enum

* Rename `replace_hwbench` to `update_hwbench`

* Move `Counter` into a separate file

* Move `ChainStatsCollator` to `chain_stats.rs`

* Fix incorrect key on the unknown table

* Improve types for the stats component; get rid of `any`
  • Loading branch information
koute authored Apr 27, 2022
1 parent 978c070 commit 45878f9
Show file tree
Hide file tree
Showing 22 changed files with 1,034 additions and 18 deletions.
13 changes: 13 additions & 0 deletions backend/common/src/node_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub enum Payload {
BlockImport(Block),
NotifyFinalized(Finalized),
AfgAuthoritySet(AfgAuthoritySet),
HwBench(NodeHwBench),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -93,6 +94,14 @@ pub struct AfgAuthoritySet {
pub authority_set_id: Box<str>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct NodeHwBench {
pub cpu_hashrate_score: u64,
pub memory_memcpy_score: u64,
pub disk_sequential_write_score: Option<u64>,
pub disk_random_write_score: Option<u64>,
}

impl Payload {
pub fn best_block(&self) -> Option<&Block> {
match self {
Expand Down Expand Up @@ -145,9 +154,13 @@ mod tests {
name: "foo".into(),
implementation: "foo".into(),
version: "foo".into(),
target_arch: Some("x86_64".into()),
target_os: Some("linux".into()),
target_env: Some("env".into()),
validator: None,
network_id: ArrayString::new(),
startup_time: None,
sysinfo: None,
},
}),
});
Expand Down
34 changes: 34 additions & 0 deletions backend/common/src/node_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,40 @@ pub struct NodeDetails {
pub validator: Option<Box<str>>,
pub network_id: NetworkId,
pub startup_time: Option<Box<str>>,
pub target_os: Option<Box<str>>,
pub target_arch: Option<Box<str>>,
pub target_env: Option<Box<str>>,
pub sysinfo: Option<NodeSysInfo>,
}

/// Hardware and software information for the node.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct NodeSysInfo {
/// The exact CPU model.
pub cpu: Option<Box<str>>,
/// The total amount of memory, in bytes.
pub memory: Option<u64>,
/// The number of physical CPU cores.
pub core_count: Option<u32>,
/// The Linux kernel version.
pub linux_kernel: Option<Box<str>>,
/// The exact Linux distribution used.
pub linux_distro: Option<Box<str>>,
/// Whether the node's running under a virtual machine.
pub is_virtual_machine: Option<bool>,
}

/// Hardware benchmark results for the node.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct NodeHwBench {
/// The CPU speed, as measured in how many MB/s it can hash using the BLAKE2b-256 hash.
pub cpu_hashrate_score: u64,
/// Memory bandwidth in MB/s, calculated by measuring the throughput of `memcpy`.
pub memory_memcpy_score: u64,
/// Sequential disk write speed in MB/s.
pub disk_sequential_write_score: Option<u64>,
/// Random disk write speed in MB/s.
pub disk_random_write_score: Option<u64>,
}

/// A couple of node statistics.
Expand Down
1 change: 1 addition & 0 deletions backend/telemetry_core/src/aggregator/inner_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ impl InnerLoop {
new_chain.finalized_block().height,
new_chain.finalized_block().hash,
));
feed_serializer.push(feed_message::ChainStatsUpdate(new_chain.stats()));
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes));
}
Expand Down
28 changes: 28 additions & 0 deletions backend/telemetry_core/src/feed_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ actions! {
// We maintain existing IDs for backward compatibility.
20: StaleNode,
21: NodeIOUpdate<'_>,
22: ChainStatsUpdate<'_>,
}

#[derive(Serialize)]
Expand Down Expand Up @@ -202,3 +203,30 @@ impl FeedMessageWrite for AddedNode<'_> {
));
}
}

#[derive(Serialize)]
pub struct ChainStatsUpdate<'a>(pub &'a ChainStats);

#[derive(Serialize, PartialEq, Eq, Default)]
pub struct Ranking<K> {
pub list: Vec<(K, u64)>,
pub other: u64,
pub unknown: u64,
}

#[derive(Serialize, PartialEq, Eq, Default)]
pub struct ChainStats {
pub version: Ranking<String>,
pub target_os: Ranking<String>,
pub target_arch: Ranking<String>,
pub cpu: Ranking<String>,
pub memory: Ranking<(u32, Option<u32>)>,
pub core_count: Ranking<u32>,
pub linux_kernel: Ranking<String>,
pub linux_distro: Ranking<String>,
pub is_virtual_machine: Ranking<bool>,
pub cpu_hashrate_score: Ranking<(u32, Option<u32>)>,
pub memory_memcpy_score: Ranking<(u32, Option<u32>)>,
pub disk_sequential_write_score: Ranking<(u32, Option<u32>)>,
pub disk_random_write_score: Ranking<(u32, Option<u32>)>,
}
57 changes: 55 additions & 2 deletions backend/telemetry_core/src/state/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ use common::{id_type, time, DenseMap, MostSeen, NumStats};
use once_cell::sync::Lazy;
use std::collections::HashSet;
use std::str::FromStr;
use std::time::{Duration, Instant};

use crate::feed_message::{self, FeedMessageSerializer};
use crate::feed_message::{self, ChainStats, FeedMessageSerializer};
use crate::find_location;

use super::chain_stats::ChainStatsCollator;
use super::counter::CounterValue;
use super::node::Node;

id_type! {
Expand All @@ -35,6 +38,7 @@ id_type! {
pub type Label = Box<str>;

const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes
const STATS_UPDATE_INTERVAL: Duration = Duration::from_secs(5);

pub struct Chain {
/// Labels that nodes use for this chain. We keep track of
Expand All @@ -56,6 +60,12 @@ pub struct Chain {
genesis_hash: BlockHash,
/// Maximum number of nodes allowed to connect from this chain
max_nodes: usize,
/// Collator for the stats.
stats_collator: ChainStatsCollator,
/// Stats for this chain.
stats: ChainStats,
/// Timestamp of when the stats were last regenerated.
stats_last_regenerated: Instant,
}

pub enum AddNodeResult {
Expand Down Expand Up @@ -105,6 +115,9 @@ impl Chain {
timestamp: None,
genesis_hash,
max_nodes,
stats_collator: Default::default(),
stats: Default::default(),
stats_last_regenerated: Instant::now(),
}
}

Expand All @@ -119,7 +132,11 @@ impl Chain {
return AddNodeResult::Overquota;
}

let node_chain_label = &node.details().chain;
let details = node.details();
self.stats_collator
.add_or_remove_node(details, None, CounterValue::Increment);

let node_chain_label = &details.chain;
let label_result = self.labels.insert(node_chain_label);
let node_id = self.nodes.add(node);

Expand All @@ -140,6 +157,10 @@ impl Chain {
}
};

let details = node.details();
self.stats_collator
.add_or_remove_node(details, node.hwbench(), CounterValue::Decrement);

let node_chain_label = &node.details().chain;
let label_result = self.labels.remove(node_chain_label);

Expand Down Expand Up @@ -181,6 +202,19 @@ impl Chain {
}
return;
}
Payload::HwBench(ref hwbench) => {
let new_hwbench = common::node_types::NodeHwBench {
cpu_hashrate_score: hwbench.cpu_hashrate_score,
memory_memcpy_score: hwbench.memory_memcpy_score,
disk_sequential_write_score: hwbench.disk_sequential_write_score,
disk_random_write_score: hwbench.disk_random_write_score,
};
let old_hwbench = node.update_hwbench(new_hwbench);
self.stats_collator
.update_hwbench(old_hwbench.as_ref(), CounterValue::Decrement);
self.stats_collator
.update_hwbench(node.hwbench(), CounterValue::Increment);
}
_ => {}
}

Expand Down Expand Up @@ -210,6 +244,7 @@ impl Chain {
let nodes_len = self.nodes.len();

self.update_stale_nodes(now, feed);
self.regenerate_stats_if_necessary(feed);

let node = match self.nodes.get_mut(nid) {
Some(node) => node,
Expand Down Expand Up @@ -300,6 +335,21 @@ impl Chain {
}
}

fn regenerate_stats_if_necessary(&mut self, feed: &mut FeedMessageSerializer) {
let now = Instant::now();
let elapsed = now - self.stats_last_regenerated;
if elapsed < STATS_UPDATE_INTERVAL {
return;
}

self.stats_last_regenerated = now;
let new_stats = self.stats_collator.generate();
if new_stats != self.stats {
self.stats = new_stats;
feed.push(feed_message::ChainStatsUpdate(&self.stats));
}
}

pub fn update_node_location(
&mut self,
node_id: ChainNodeId,
Expand Down Expand Up @@ -340,4 +390,7 @@ impl Chain {
pub fn genesis_hash(&self) -> BlockHash {
self.genesis_hash
}
pub fn stats(&self) -> &ChainStats {
&self.stats
}
}
Loading

0 comments on commit 45878f9

Please sign in to comment.