Skip to content

Commit

Permalink
Add keep-alive timeout to coordinator
Browse files Browse the repository at this point in the history
The Heartbeat was meant to serve for this, yet no Heartbeats are fired when we
don't have active tributaries.

libp2p does offer an explicit KeepAlive protocol, yet it's not recommended in
prod. While this likely has the same pit falls as LibP2p's KeepAlive protocol,
it's at least tailored to our timing.
  • Loading branch information
kayabaNerve committed Aug 21, 2023
1 parent 45ea805 commit dc88b29
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 9 deletions.
4 changes: 3 additions & 1 deletion coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ mod substrate;
#[cfg(test)]
pub mod tests;

// This is a static to satisfy lifetime expectations
lazy_static::lazy_static! {
// This is a static to satisfy lifetime expectations
static ref NEW_TRIBUTARIES: RwLock<VecDeque<TributarySpec>> = RwLock::new(VecDeque::new());
}

Expand Down Expand Up @@ -271,6 +271,8 @@ pub async fn handle_p2p<D: Db, P: P2p>(
loop {
let mut msg = p2p.receive().await;
match msg.kind {
P2pMessageKind::KeepAlive => {}

P2pMessageKind::Tributary(genesis) => {
let tributaries = tributaries.read().await;
let Some(tributary) = tributaries.get(&genesis) else {
Expand Down
46 changes: 38 additions & 8 deletions coordinator/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::{time::Duration, fmt, task::Poll};
use std::{sync::Arc, collections::VecDeque, io::Read};
use std::{sync::Arc, time::Instant, collections::VecDeque, io::Read};

use async_trait::async_trait;

Expand Down Expand Up @@ -27,6 +27,7 @@ const LIBP2P_TOPIC: &str = "serai-coordinator";

#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum P2pMessageKind {
KeepAlive,
Tributary([u8; 32]),
Heartbeat([u8; 32]),
Block([u8; 32]),
Expand All @@ -35,18 +36,19 @@ pub enum P2pMessageKind {
impl P2pMessageKind {
fn serialize(&self) -> Vec<u8> {
match self {
P2pMessageKind::KeepAlive => vec![0],
P2pMessageKind::Tributary(genesis) => {
let mut res = vec![0];
let mut res = vec![1];
res.extend(genesis);
res
}
P2pMessageKind::Heartbeat(genesis) => {
let mut res = vec![1];
let mut res = vec![2];
res.extend(genesis);
res
}
P2pMessageKind::Block(genesis) => {
let mut res = vec![2];
let mut res = vec![3];
res.extend(genesis);
res
}
Expand All @@ -57,17 +59,18 @@ impl P2pMessageKind {
let mut kind = [0; 1];
reader.read_exact(&mut kind).ok()?;
match kind[0] {
0 => Some({
0 => Some(P2pMessageKind::KeepAlive),
1 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Tributary(genesis)
}),
1 => Some({
2 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Heartbeat(genesis)
}),
2 => Some({
3 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Block(genesis)
Expand Down Expand Up @@ -103,6 +106,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
log::trace!(
"broadcasting p2p message (kind {})",
match kind {
P2pMessageKind::KeepAlive => "KeepAlive".to_string(),
P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)),
P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)),
P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)),
Expand All @@ -128,6 +132,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
log::trace!(
"received p2p message (kind {})",
match kind {
P2pMessageKind::KeepAlive => "KeepAlive".to_string(),
P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)),
P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)),
P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)),
Expand All @@ -144,6 +149,10 @@ struct Behavior {
mdns: libp2p::mdns::tokio::Behaviour,
}

lazy_static::lazy_static! {
static ref TIME_OF_LAST_P2P_MESSAGE: Mutex<Instant> = Mutex::new(Instant::now());
}

#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct LibP2p(Arc<Mutex<Swarm<Behavior>>>, Arc<Mutex<VecDeque<(PeerId, Vec<u8>)>>>);
Expand Down Expand Up @@ -178,6 +187,8 @@ impl LibP2p {
use blake2::{Digest, Blake2s256};
let config = ConfigBuilder::default()
.max_transmit_size(MAX_LIBP2P_MESSAGE_SIZE)
// We send KeepAlive after 80s
.idle_timeout(Duration::from_secs(85))
.validation_mode(ValidationMode::Strict)
// Uses a content based message ID to avoid duplicates as much as possible
.message_id_fn(|msg| {
Expand Down Expand Up @@ -210,7 +221,7 @@ impl LibP2p {
// TODO: We do tests on release binaries as of right now...
//#[cfg(debug_assertions)]
mdns: {
log::info!("spawning mdns");
log::info!("creating mdns service");
libp2p::mdns::tokio::Behaviour::new(libp2p::mdns::Config::default(), throwaway_peer_id)
.unwrap()
},
Expand All @@ -227,7 +238,21 @@ impl LibP2p {
async move {
// Run this task ad-infinitum
loop {
// If it's been >80s since we've published a message, publish a KeepAlive since we're
// still an active service
// This is useful when we have no active tributaries and accordingly aren't sending
// heartbeats
// If we are sending heartbeats, we should've sent one after 60s of no finalized blocks
// (where a finalized block only occurs due to network activity), meaning this won't be
// run
let time_since_last =
Instant::now().duration_since(*TIME_OF_LAST_P2P_MESSAGE.lock().await);
if time_since_last > Duration::from_secs(80) {
p2p.broadcast_raw(P2pMessageKind::KeepAlive.serialize()).await;
}

// Maintain this lock until it's out of events
// TODO: Is there a less contentious way to run this poll?
let mut p2p_lock = p2p.0.lock().await;
loop {
match futures::poll!(p2p_lock.next()) {
Expand All @@ -236,6 +261,7 @@ impl LibP2p {
libp2p::mdns::Event::Discovered(list),
)))) => {
for (peer, mut addr) in list {
// Check the port is as expected to prevent trying to peer with Substrate nodes
if addr.pop() == Some(libp2p::multiaddr::Protocol::Tcp(PORT)) {
log::info!("found peer via mdns");
p2p_lock.behaviour_mut().gossipsub.add_explicit_peer(&peer);
Expand All @@ -255,6 +281,7 @@ impl LibP2p {
Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub(
GsEvent::Message { propagation_source, message, .. },
)))) => {
*TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now();
p2p.1.lock().await.push_back((propagation_source, message.data));
}
Poll::Ready(Some(_)) => {}
Expand All @@ -281,6 +308,9 @@ impl P2p for LibP2p {
}

async fn broadcast_raw(&self, msg: Vec<u8>) {
// Update the time of last message
*TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now();

match self
.0
.lock()
Expand Down

0 comments on commit dc88b29

Please sign in to comment.