From dc88b29b922cfa7719333370914f725c6098a270 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Mon, 21 Aug 2023 02:36:03 -0400 Subject: [PATCH] Add keep-alive timeout to coordinator The Heartbeat was meant to serve for this, yet no Heartbeats are fired when we don't have active tributaries. libp2p does offer an explicit KeepAlive protocol, yet it's not recommended in prod. While this likely has the same pit falls as LibP2p's KeepAlive protocol, it's at least tailored to our timing. --- coordinator/src/main.rs | 4 +++- coordinator/src/p2p.rs | 46 ++++++++++++++++++++++++++++++++++------- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index bc91fdd62..a664b49cf 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -53,8 +53,8 @@ mod substrate; #[cfg(test)] pub mod tests; -// This is a static to satisfy lifetime expectations lazy_static::lazy_static! { + // This is a static to satisfy lifetime expectations static ref NEW_TRIBUTARIES: RwLock> = RwLock::new(VecDeque::new()); } @@ -271,6 +271,8 @@ pub async fn handle_p2p( loop { let mut msg = p2p.receive().await; match msg.kind { + P2pMessageKind::KeepAlive => {} + P2pMessageKind::Tributary(genesis) => { let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 25147d3f1..11db03b03 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -1,5 +1,5 @@ use core::{time::Duration, fmt, task::Poll}; -use std::{sync::Arc, collections::VecDeque, io::Read}; +use std::{sync::Arc, time::Instant, collections::VecDeque, io::Read}; use async_trait::async_trait; @@ -27,6 +27,7 @@ const LIBP2P_TOPIC: &str = "serai-coordinator"; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub enum P2pMessageKind { + KeepAlive, Tributary([u8; 32]), Heartbeat([u8; 32]), Block([u8; 32]), @@ -35,18 +36,19 @@ pub enum P2pMessageKind { impl P2pMessageKind { fn serialize(&self) -> Vec { match self { + P2pMessageKind::KeepAlive => vec![0], P2pMessageKind::Tributary(genesis) => { - let mut res = vec![0]; + let mut res = vec![1]; res.extend(genesis); res } P2pMessageKind::Heartbeat(genesis) => { - let mut res = vec![1]; + let mut res = vec![2]; res.extend(genesis); res } P2pMessageKind::Block(genesis) => { - let mut res = vec![2]; + let mut res = vec![3]; res.extend(genesis); res } @@ -57,17 +59,18 @@ impl P2pMessageKind { let mut kind = [0; 1]; reader.read_exact(&mut kind).ok()?; match kind[0] { - 0 => Some({ + 0 => Some(P2pMessageKind::KeepAlive), + 1 => Some({ let mut genesis = [0; 32]; reader.read_exact(&mut genesis).ok()?; P2pMessageKind::Tributary(genesis) }), - 1 => Some({ + 2 => Some({ let mut genesis = [0; 32]; reader.read_exact(&mut genesis).ok()?; P2pMessageKind::Heartbeat(genesis) }), - 2 => Some({ + 3 => Some({ let mut genesis = [0; 32]; reader.read_exact(&mut genesis).ok()?; P2pMessageKind::Block(genesis) @@ -103,6 +106,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { log::trace!( "broadcasting p2p message (kind {})", match kind { + P2pMessageKind::KeepAlive => "KeepAlive".to_string(), P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)), P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)), P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)), @@ -128,6 +132,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { log::trace!( "received p2p message (kind {})", match kind { + P2pMessageKind::KeepAlive => "KeepAlive".to_string(), P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)), P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)), P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)), @@ -144,6 +149,10 @@ struct Behavior { mdns: libp2p::mdns::tokio::Behaviour, } +lazy_static::lazy_static! { + static ref TIME_OF_LAST_P2P_MESSAGE: Mutex = Mutex::new(Instant::now()); +} + #[allow(clippy::type_complexity)] #[derive(Clone)] pub struct LibP2p(Arc>>, Arc)>>>); @@ -178,6 +187,8 @@ impl LibP2p { use blake2::{Digest, Blake2s256}; let config = ConfigBuilder::default() .max_transmit_size(MAX_LIBP2P_MESSAGE_SIZE) + // We send KeepAlive after 80s + .idle_timeout(Duration::from_secs(85)) .validation_mode(ValidationMode::Strict) // Uses a content based message ID to avoid duplicates as much as possible .message_id_fn(|msg| { @@ -210,7 +221,7 @@ impl LibP2p { // TODO: We do tests on release binaries as of right now... //#[cfg(debug_assertions)] mdns: { - log::info!("spawning mdns"); + log::info!("creating mdns service"); libp2p::mdns::tokio::Behaviour::new(libp2p::mdns::Config::default(), throwaway_peer_id) .unwrap() }, @@ -227,7 +238,21 @@ impl LibP2p { async move { // Run this task ad-infinitum loop { + // If it's been >80s since we've published a message, publish a KeepAlive since we're + // still an active service + // This is useful when we have no active tributaries and accordingly aren't sending + // heartbeats + // If we are sending heartbeats, we should've sent one after 60s of no finalized blocks + // (where a finalized block only occurs due to network activity), meaning this won't be + // run + let time_since_last = + Instant::now().duration_since(*TIME_OF_LAST_P2P_MESSAGE.lock().await); + if time_since_last > Duration::from_secs(80) { + p2p.broadcast_raw(P2pMessageKind::KeepAlive.serialize()).await; + } + // Maintain this lock until it's out of events + // TODO: Is there a less contentious way to run this poll? let mut p2p_lock = p2p.0.lock().await; loop { match futures::poll!(p2p_lock.next()) { @@ -236,6 +261,7 @@ impl LibP2p { libp2p::mdns::Event::Discovered(list), )))) => { for (peer, mut addr) in list { + // Check the port is as expected to prevent trying to peer with Substrate nodes if addr.pop() == Some(libp2p::multiaddr::Protocol::Tcp(PORT)) { log::info!("found peer via mdns"); p2p_lock.behaviour_mut().gossipsub.add_explicit_peer(&peer); @@ -255,6 +281,7 @@ impl LibP2p { Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub( GsEvent::Message { propagation_source, message, .. }, )))) => { + *TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now(); p2p.1.lock().await.push_back((propagation_source, message.data)); } Poll::Ready(Some(_)) => {} @@ -281,6 +308,9 @@ impl P2p for LibP2p { } async fn broadcast_raw(&self, msg: Vec) { + // Update the time of last message + *TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now(); + match self .0 .lock()