diff --git a/sn_networking/src/bootstrap.rs b/sn_networking/src/bootstrap.rs index 5320c01e67..9692584987 100644 --- a/sn_networking/src/bootstrap.rs +++ b/sn_networking/src/bootstrap.rs @@ -36,7 +36,7 @@ impl SwarmDriver { ) -> Option { let (should_bootstrap, new_interval) = self .bootstrap - .should_we_bootstrap(self.connected_peers as u32, current_bootstrap_interval) + .should_we_bootstrap(self.peers_in_rt as u32, current_bootstrap_interval) .await; if should_bootstrap { self.trigger_network_discovery(); diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index e7c0686e4d..fdd49e5cca 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -827,12 +827,7 @@ impl SwarmDriver { if *is_bad { warn!("Cleaning out bad_peer {peer_id:?}"); if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) { - self.connected_peers = self.connected_peers.saturating_sub(1); - self.send_event(NetworkEvent::PeerRemoved( - *dead_peer.node.key.preimage(), - self.connected_peers, - )); - self.log_kbuckets(&peer_id); + self.update_on_peer_removal(*dead_peer.node.key.preimage()); let _ = self.check_for_change_in_our_close_group(); } diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index e79766db2b..f190296cd6 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -589,7 +589,7 @@ impl NetworkBuilder { listen_port: self.listen_addr.map(|addr| addr.port()), is_client, is_behind_home_network: self.is_behind_home_network, - connected_peers: 0, + peers_in_rt: 0, bootstrap, relay_manager, close_group: Default::default(), @@ -637,7 +637,7 @@ pub struct SwarmDriver { pub(crate) is_behind_home_network: bool, /// The port that was set by the user pub(crate) listen_port: Option, - pub(crate) connected_peers: usize, + pub(crate) peers_in_rt: usize, pub(crate) bootstrap: ContinuousBootstrap, pub(crate) relay_manager: RelayManager, /// The peers that are closer to our PeerId. Includes self. diff --git a/sn_networking/src/event/kad.rs b/sn_networking/src/event/kad.rs index 7f3e111751..7790fce4a7 100644 --- a/sn_networking/src/event/kad.rs +++ b/sn_networking/src/event/kad.rs @@ -8,7 +8,7 @@ use crate::{ driver::PendingGetClosestType, get_quorum_value, GetRecordCfg, GetRecordError, NetworkError, - NetworkEvent, Result, SwarmDriver, CLOSE_GROUP_SIZE, + Result, SwarmDriver, CLOSE_GROUP_SIZE, }; use itertools::Itertools; use libp2p::kad::{ @@ -245,26 +245,19 @@ impl SwarmDriver { } => { event_string = "kad_event::RoutingUpdated"; if is_new_peer { - self.connected_peers = self.connected_peers.saturating_add(1); - - info!("New peer added to routing table: {peer:?}, now we have #{} connected peers", self.connected_peers); - self.log_kbuckets(&peer); + self.update_on_peer_addition(peer); // This should only happen once if self.bootstrap.notify_new_peer() { info!("Performing the first bootstrap"); self.trigger_network_discovery(); } - self.send_event(NetworkEvent::PeerAdded(peer, self.connected_peers)); } - info!("kad_event::RoutingUpdated {:?}: {peer:?}, is_new_peer: {is_new_peer:?} old_peer: {old_peer:?}", self.connected_peers); - if old_peer.is_some() { - self.connected_peers = self.connected_peers.saturating_sub(1); - + info!("kad_event::RoutingUpdated {:?}: {peer:?}, is_new_peer: {is_new_peer:?} old_peer: {old_peer:?}", self.peers_in_rt); + if let Some(old_peer) = old_peer { info!("Evicted old peer on new peer join: {old_peer:?}"); - self.send_event(NetworkEvent::PeerRemoved(peer, self.connected_peers)); - self.log_kbuckets(&peer); + self.update_on_peer_removal(old_peer); } let _ = self.check_for_change_in_our_close_group(); } diff --git a/sn_networking/src/event/mod.rs b/sn_networking/src/event/mod.rs index fc4b4b51ac..05d8bafa45 100644 --- a/sn_networking/src/event/mod.rs +++ b/sn_networking/src/event/mod.rs @@ -259,6 +259,38 @@ impl SwarmDriver { } } + /// Update state on addition of a peer to the routing table. + pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId) { + self.peers_in_rt = self.peers_in_rt.saturating_add(1); + info!( + "New peer added to routing table: {added_peer:?}, now we have #{} connected peers", + self.peers_in_rt + ); + self.log_kbuckets(&added_peer); + self.send_event(NetworkEvent::PeerAdded(added_peer, self.peers_in_rt)); + + #[cfg(feature = "open-metrics")] + if let Some(metrics) = &self.network_metrics { + metrics.peers_in_routing_table.set(self.peers_in_rt as i64); + } + } + + /// Update state on removal of a peer from the routing table. + pub(crate) fn update_on_peer_removal(&mut self, removed_peer: PeerId) { + self.peers_in_rt = self.peers_in_rt.saturating_sub(1); + info!( + "Peer removed from routing table: {removed_peer:?}, now we have #{} connected peers", + self.peers_in_rt + ); + self.log_kbuckets(&removed_peer); + self.send_event(NetworkEvent::PeerRemoved(removed_peer, self.peers_in_rt)); + + #[cfg(feature = "open-metrics")] + if let Some(metrics) = &self.network_metrics { + metrics.peers_in_routing_table.set(self.peers_in_rt as i64); + } + } + /// Logs the kbuckets also records the bucket info. pub(crate) fn log_kbuckets(&mut self, peer: &PeerId) { let distance = NetworkAddress::from_peer(self.self_peer_id) @@ -301,6 +333,16 @@ impl SwarmDriver { .set(estimated_network_size as i64); } + // Just to warn if our tracking goes out of sync with libp2p. Can happen if someone forgets to call + // update_on_peer_addition or update_on_peer_removal when adding or removing a peer. + // Only log every 10th peer to avoid spamming the logs. + if total_peers % 10 == 0 && total_peers != self.peers_in_rt { + warn!( + "Total peers in routing table: {}, but kbucket table has {total_peers} peers", + self.peers_in_rt + ); + } + info!("kBucketTable has {index:?} kbuckets {total_peers:?} peers, {kbucket_table_stats:?}, estimated network size: {estimated_network_size:?}"); } diff --git a/sn_networking/src/event/swarm.rs b/sn_networking/src/event/swarm.rs index d7879954d3..4f7c9bc462 100644 --- a/sn_networking/src/event/swarm.rs +++ b/sn_networking/src/event/swarm.rs @@ -381,6 +381,15 @@ impl SwarmDriver { connection_id, (peer_id, Instant::now() + Duration::from_secs(60)), ); + #[cfg(feature = "open-metrics")] + if let Some(metrics) = &self.network_metrics { + metrics + .open_connections + .set(self.live_connected_peers.len() as i64); + metrics + .connected_peers + .set(self.swarm.connected_peers().count() as i64); + } if endpoint.is_dialer() { self.dialed_peers.push(peer_id); @@ -396,6 +405,15 @@ impl SwarmDriver { event_string = "ConnectionClosed"; trace!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint)); let _ = self.live_connected_peers.remove(&connection_id); + #[cfg(feature = "open-metrics")] + if let Some(metrics) = &self.network_metrics { + metrics + .open_connections + .set(self.live_connected_peers.len() as i64); + metrics + .connected_peers + .set(self.swarm.connected_peers().count() as i64); + } } SwarmEvent::OutgoingConnectionError { connection_id, @@ -445,7 +463,7 @@ impl SwarmDriver { .any(|(_ilog2, peers)| peers.contains(&failed_peer_id)); if is_bootstrap_peer - && self.connected_peers < self.bootstrap_peers.len() + && self.peers_in_rt < self.bootstrap_peers.len() { warn!("OutgoingConnectionError: On bootstrap peer {failed_peer_id:?}, while still in bootstrap mode, ignoring"); there_is_a_serious_issue = false; @@ -514,19 +532,13 @@ impl SwarmDriver { .kademlia .remove_peer(&failed_peer_id) { - self.connected_peers = self.connected_peers.saturating_sub(1); + self.update_on_peer_removal(*dead_peer.node.key.preimage()); self.handle_cmd(SwarmCmd::RecordNodeIssue { peer_id: failed_peer_id, issue: crate::NodeIssue::ConnectionIssue, })?; - self.send_event(NetworkEvent::PeerRemoved( - *dead_peer.node.key.preimage(), - self.connected_peers, - )); - - self.log_kbuckets(&failed_peer_id); let _ = self.check_for_change_in_our_close_group(); } } @@ -630,12 +642,16 @@ impl SwarmDriver { } } if let Some(to_be_removed_bootstrap) = shall_removed { - trace!("Bootstrap node {to_be_removed_bootstrap:?} to be replaced by peer {peer_id:?}"); - let _entry = self + info!("Bootstrap node {to_be_removed_bootstrap:?} to be replaced by peer {peer_id:?}"); + let entry = self .swarm .behaviour_mut() .kademlia .remove_peer(&to_be_removed_bootstrap); + if let Some(removed_peer) = entry { + self.update_on_peer_removal(*removed_peer.node.key.preimage()); + let _ = self.check_for_change_in_our_close_group(); + } } } @@ -687,6 +703,15 @@ impl SwarmDriver { for (connection_id, peer_id) in shall_removed { let _ = self.live_connected_peers.remove(&connection_id); let result = self.swarm.close_connection(connection_id); + #[cfg(feature = "open-metrics")] + if let Some(metrics) = &self.network_metrics { + metrics + .open_connections + .set(self.live_connected_peers.len() as i64); + metrics + .connected_peers + .set(self.swarm.connected_peers().count() as i64); + } trace!("Removed outdated connection {connection_id:?} to {peer_id:?} with result: {result:?}"); } } diff --git a/sn_networking/src/metrics/mod.rs b/sn_networking/src/metrics/mod.rs index 8603218202..cdb351407e 100644 --- a/sn_networking/src/metrics/mod.rs +++ b/sn_networking/src/metrics/mod.rs @@ -28,8 +28,11 @@ pub(crate) struct NetworkMetrics { libp2p_metrics: Libp2pMetrics, // metrics from sn_networking - pub(crate) records_stored: Gauge, + pub(crate) connected_peers: Gauge, pub(crate) estimated_network_size: Gauge, + pub(crate) open_connections: Gauge, + pub(crate) peers_in_routing_table: Gauge, + pub(crate) records_stored: Gauge, pub(crate) store_cost: Gauge, #[cfg(feature = "upnp")] pub(crate) upnp_events: Family, @@ -51,12 +54,31 @@ impl NetworkMetrics { records_stored.clone(), ); + let connected_peers = Gauge::default(); + sub_registry.register( + "connected_peers", + "The number of peers that we are currently connected to", + connected_peers.clone(), + ); + let estimated_network_size = Gauge::default(); sub_registry.register( "estimated_network_size", "The estimated number of nodes in the network calculated by the peers in our RT", estimated_network_size.clone(), ); + let open_connections = Gauge::default(); + sub_registry.register( + "open_connections", + "The number of active connections to other peers", + open_connections.clone(), + ); + let peers_in_routing_table = Gauge::default(); + sub_registry.register( + "peers_in_routing_table", + "The total number of peers in our routing table", + peers_in_routing_table.clone(), + ); let store_cost = Gauge::default(); sub_registry.register( "store_cost", @@ -91,6 +113,9 @@ impl NetworkMetrics { libp2p_metrics, records_stored, estimated_network_size, + connected_peers, + open_connections, + peers_in_routing_table, store_cost, #[cfg(feature = "upnp")] upnp_events, diff --git a/sn_node/src/metrics.rs b/sn_node/src/metrics.rs index b139d4d265..604b65b2af 100644 --- a/sn_node/src/metrics.rs +++ b/sn_node/src/metrics.rs @@ -17,6 +17,7 @@ use prometheus_client::{ }, registry::Registry, }; +use sn_networking::Instant; #[derive(Clone)] pub(crate) struct NodeMetrics { @@ -35,6 +36,10 @@ pub(crate) struct NodeMetrics { // wallet pub(crate) current_reward_wallet_balance: Gauge, pub(crate) total_forwarded_rewards: Gauge, + + // to track the uptime of the node. + pub(crate) started_instant: Instant, + pub(crate) uptime: Gauge, } #[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)] @@ -109,6 +114,13 @@ impl NodeMetrics { total_forwarded_rewards.clone(), ); + let uptime = Gauge::default(); + sub_registry.register( + "uptime", + "The uptime of the node in seconds", + uptime.clone(), + ); + Self { put_record_ok, put_record_err, @@ -118,6 +130,8 @@ impl NodeMetrics { peer_removed_from_routing_table, current_reward_wallet_balance, total_forwarded_rewards, + started_instant: Instant::now(), + uptime, } } diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index 2f91875b1a..a5efa5cc7e 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -23,7 +23,7 @@ use prometheus_client::metrics::gauge::Gauge; use prometheus_client::registry::Registry; use rand::{rngs::StdRng, thread_rng, Rng, SeedableRng}; use sn_networking::{ - close_group_majority, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, + close_group_majority, Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, SwarmDriver, CLOSE_GROUP_SIZE, }; use sn_protocol::{ @@ -68,6 +68,9 @@ const CHUNK_PROOF_VERIFY_RETRY_INTERVAL: Duration = Duration::from_secs(15); /// Track the forward balance by storing the balance in a file. This is useful to restore the balance between restarts. const FORWARDED_BALANCE_FILE_NAME: &str = "forwarded_balance"; +/// Interval to update the nodes uptime metric +const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); + /// Helper to build and run a Node pub struct NodeBuilder { keypair: Keypair, @@ -277,6 +280,10 @@ impl Node { let mut balance_forward_interval = tokio::time::interval(balance_forward_time); let _ = balance_forward_interval.tick().await; // first tick completes immediately + let mut uptime_metrics_update_interval = + tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL); + let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately + loop { let peers_connected = &peers_connected; @@ -284,7 +291,7 @@ impl Node { net_event = network_event_receiver.recv() => { match net_event { Some(event) => { - let start = std::time::Instant::now(); + let start = Instant::now(); let event_string = format!("{event:?}"); self.handle_network_event(event, peers_connected); @@ -300,7 +307,7 @@ impl Node { } // runs every replication_interval time _ = replication_interval.tick() => { - let start = std::time::Instant::now(); + let start = Instant::now(); trace!("Periodic replication triggered"); let network = self.network.clone(); self.record_metrics(Marker::IntervalReplicationTriggered); @@ -312,7 +319,7 @@ impl Node { } // runs every bad_nodes_check_time time _ = bad_nodes_check_interval.tick() => { - let start = std::time::Instant::now(); + let start = Instant::now(); trace!("Periodic bad_nodes check triggered"); let network = self.network.clone(); self.record_metrics(Marker::IntervalBadNodesCheckTriggered); @@ -332,7 +339,7 @@ impl Node { _ = balance_forward_interval.tick() => { if cfg!(feature = "reward-forward") { if let Some(ref owner) = self.owner { - let start = std::time::Instant::now(); + let start = Instant::now(); trace!("Periodic balance forward triggered"); let network = self.network.clone(); let forwarding_reason = owner.clone(); @@ -352,6 +359,12 @@ impl Node { } } + _ = uptime_metrics_update_interval.tick() => { + #[cfg(feature = "open-metrics")] + if let Some(node_metrics) = &self.node_metrics { + let _ = node_metrics.uptime.set(node_metrics.started_instant.elapsed().as_secs() as i64); + } + } node_cmd = cmds_receiver.recv() => { match node_cmd { Ok(cmd) => { @@ -380,7 +393,7 @@ impl Node { /// Handle a network event. /// Spawns a thread for any likely long running tasks fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc) { - let start = std::time::Instant::now(); + let start = Instant::now(); let event_string = format!("{event:?}"); let event_header; trace!("Handling NetworkEvent {event_string:?}");