Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p/tce): add gossipsub message_id to tce logs #488

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 27 additions & 67 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,25 @@
use std::collections::hash_map::DefaultHasher;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::{
collections::{HashMap, VecDeque},
env,
task::Poll,
time::Duration,
};
use std::{collections::HashMap, task::Poll};

use libp2p::gossipsub::MessageId;
use libp2p::swarm::{ConnectionClosed, FromSwarm};
use libp2p::PeerId;
use libp2p::{
gossipsub::{self, IdentTopic, Message, MessageAuthenticity},
identity::Keypair,
swarm::{NetworkBehaviour, THandlerInEvent, ToSwarm},
};
use prost::Message as ProstMessage;
use topos_core::api::grpc::tce::v1::Batch;
use topos_metrics::P2P_GOSSIP_BATCH_SIZE;
use tracing::{debug, error, warn};
use tracing::{debug, trace, warn};

use crate::error::P2PError;
use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY};

use super::HealthStatus;

const MAX_BATCH_SIZE: usize = 10;

pub struct Behaviour {
batch_size: usize,
gossipsub: gossipsub::Behaviour,
pending: HashMap<&'static str, VecDeque<Vec<u8>>>,
tick: tokio::time::Interval,
/// List of connected peers per topic.
connected_peer: HashMap<&'static str, HashSet<PeerId>>,
/// The health status of the gossip behaviour
Expand All @@ -43,18 +31,22 @@ impl Behaviour {
&mut self,
topic: &'static str,
message: Vec<u8>,
) -> Result<usize, &'static str> {
) -> Result<MessageId, P2PError> {
match topic {
TOPOS_GOSSIP => {
if let Ok(msg_id) = self.gossipsub.publish(IdentTopic::new(topic), message) {
debug!("Published on topos_gossip: {:?}", msg_id);
TOPOS_GOSSIP | TOPOS_ECHO | TOPOS_READY => {
let topic = IdentTopic::new(topic);
let topic_hash = topic.hash();
let msg_id = self.gossipsub.publish(topic, message)?;
trace!("Published on topos_gossip: {:?}", msg_id);

for p in self.gossipsub.mesh_peers(&topic_hash) {
debug!("Sent gossipsub message({}) to {} peer", msg_id, p);
}

Ok(msg_id)
}
TOPOS_ECHO | TOPOS_READY => self.pending.entry(topic).or_default().push_back(message),
_ => return Err("Invalid topic"),
_ => Err(P2PError::InvalidGossipTopic(topic)),
}

Ok(0)
}

pub fn subscribe(&mut self) -> Result<(), P2PError> {
Expand All @@ -71,10 +63,6 @@ impl Behaviour {
}

pub async fn new(peer_key: Keypair) -> Self {
let batch_size = env::var("TOPOS_GOSSIP_BATCH_SIZE")
.map(|v| v.parse::<usize>())
.unwrap_or(Ok(MAX_BATCH_SIZE))
.unwrap();
let gossipsub = gossipsub::ConfigBuilder::default()
.max_transmit_size(2 * 1024 * 1024)
.validation_mode(gossipsub::ValidationMode::Strict)
Expand All @@ -99,21 +87,7 @@ impl Behaviour {
.unwrap();

Self {
batch_size,
gossipsub,
pending: [
(TOPOS_ECHO, VecDeque::new()),
(TOPOS_READY, VecDeque::new()),
]
.into_iter()
.collect(),
tick: tokio::time::interval(Duration::from_millis(
env::var("TOPOS_GOSSIP_INTERVAL")
.map(|v| v.parse::<u64>())
.unwrap_or(Ok(100))
.unwrap(),
)),

connected_peer: Default::default(),
health_status: Default::default(),
}
Expand Down Expand Up @@ -191,26 +165,6 @@ impl NetworkBehaviour for Behaviour {
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if self.tick.poll_tick(cx).is_ready() {
// Publish batch
for (topic, queue) in self.pending.iter_mut() {
if !queue.is_empty() {
let num_of_message = queue.len().min(self.batch_size);
let batch = Batch {
messages: queue.drain(0..num_of_message).collect(),
};

debug!("Publishing {} {}", batch.messages.len(), topic);
let msg = batch.encode_to_vec();
P2P_GOSSIP_BATCH_SIZE.observe(batch.messages.len() as f64);
match self.gossipsub.publish(IdentTopic::new(*topic), msg) {
Ok(message_id) => debug!("Published {} {}", topic, message_id),
Err(error) => error!("Failed to publish {}: {}", topic, error),
}
}
}
}

match self.gossipsub.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
Expand All @@ -227,29 +181,35 @@ impl NetworkBehaviour for Behaviour {
} => match topic.as_str() {
TOPOS_GOSSIP => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent::Message {
Box::new(crate::event::GossipEvent::Message {
propagated_by: propagation_source,
topic: TOPOS_GOSSIP,
message: data,
source,
},
id: message_id,
}),
)))
}
TOPOS_ECHO => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent::Message {
Box::new(crate::event::GossipEvent::Message {
propagated_by: propagation_source,
topic: TOPOS_ECHO,
message: data,
source,
},
id: message_id,
}),
)))
}
TOPOS_READY => {
return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub(
crate::event::GossipEvent::Message {
Box::new(crate::event::GossipEvent::Message {
propagated_by: propagation_source,
topic: TOPOS_READY,
message: data,
source,
},
id: message_id,
}),
)))
}
_ => {}
Expand Down
25 changes: 14 additions & 11 deletions crates/topos-p2p/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::future::BoxFuture;
use futures::TryFutureExt;
use libp2p::PeerId;
use tokio::sync::{
mpsc::{self, error::SendError},
Expand Down Expand Up @@ -39,21 +39,24 @@ impl NetworkClient {
.await
}

pub fn publish<T: std::fmt::Debug + prost::Message + 'static>(
pub async fn publish<T: std::fmt::Debug + prost::Message + 'static>(
&self,
topic: &'static str,
message: T,
) -> BoxFuture<'static, Result<(), SendError<Command>>> {
) -> Result<String, P2PError> {
let network = self.sender.clone();
let (sender, receiver) = oneshot::channel();

network
.send(Command::Gossip {
topic,
data: message.encode_to_vec(),
sender,
})
.map_err(CommandExecutionError::from)
.await?;

Box::pin(async move {
network
.send(Command::Gossip {
topic,
data: message.encode_to_vec(),
})
.await
})
receiver.await?.map(|id| id.to_string())
}

async fn send_command_with_receiver<
Expand Down
3 changes: 2 additions & 1 deletion crates/topos-p2p/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Display;

use libp2p::PeerId;
use libp2p::{gossipsub::MessageId, PeerId};
use tokio::sync::oneshot;

use crate::{behaviour::grpc::connection::OutboundConnection, error::P2PError};
Expand All @@ -15,6 +15,7 @@ pub enum Command {
Gossip {
topic: &'static str,
data: Vec<u8>,
sender: oneshot::Sender<Result<MessageId, P2PError>>,
},

/// Ask for the creation of a new proxy connection for a gRPC query.
Expand Down
13 changes: 11 additions & 2 deletions crates/topos-p2p/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::io;

use libp2p::{
gossipsub::SubscriptionError, kad::NoKnownPeers, noise::Error as NoiseError,
request_response::OutboundFailure, TransportError,
gossipsub::{PublishError, SubscriptionError},
kad::NoKnownPeers,
noise::Error as NoiseError,
request_response::OutboundFailure,
TransportError,
};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -49,6 +52,12 @@ pub enum P2PError {

#[error("Gossip topics subscription failed")]
GossipTopicSubscriptionFailure,

#[error("Gossipsub publish failure: {0}")]
GossipsubPublishFailure(#[from] PublishError),

#[error("Invalid gossipsub topics: {0}")]
InvalidGossipTopic(&'static str),
}

#[derive(Error, Debug)]
Expand Down
14 changes: 11 additions & 3 deletions crates/topos-p2p/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use libp2p::{identify, kad, PeerId};
use libp2p::{gossipsub::MessageId, identify, kad, PeerId};

use crate::behaviour::{grpc, HealthStatus};

Expand All @@ -7,17 +7,19 @@ use crate::behaviour::{grpc, HealthStatus};
pub enum GossipEvent {
/// A message has been received from a peer on one of the subscribed topics
Message {
propagated_by: PeerId,
source: Option<PeerId>,
topic: &'static str,
message: Vec<u8>,
id: MessageId,
},
}

#[derive(Debug)]
pub enum ComposedEvent {
Kademlia(Box<kad::Event>),
PeerInfo(Box<identify::Event>),
Gossipsub(GossipEvent),
Gossipsub(Box<GossipEvent>),
Grpc(grpc::Event),
Void,
}
Expand Down Expand Up @@ -48,9 +50,15 @@ impl From<void::Void> for ComposedEvent {

/// Represents the events that the p2p layer can emit
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Event {
/// An event emitted when a gossip message is received
Gossip { from: PeerId, data: Vec<u8> },
Gossip {
propagated_by: PeerId,
from: PeerId,
data: Vec<u8>,
id: String,
},
/// An event emitted when the p2p layer becomes healthy
Healthy,
/// An event emitted when the p2p layer becomes unhealthy
Expand Down
11 changes: 8 additions & 3 deletions crates/topos-p2p/src/runtime/handle_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{

use rand::{thread_rng, Rng};
use topos_metrics::P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL;
use tracing::{debug, error, warn};
use tracing::{error, trace, warn};

impl Runtime {
pub(crate) async fn handle_command(&mut self, command: Command) {
Expand Down Expand Up @@ -64,12 +64,17 @@ impl Runtime {
Command::Gossip {
topic,
data: message,
sender,
} => match self.swarm.behaviour_mut().gossipsub.publish(topic, message) {
Ok(message_id) => {
debug!("Published message to {topic}");
trace!("Published message to {topic}");
P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL.inc();
_ = sender.send(Ok(message_id));
}
Err(err) => {
error!("Failed to publish message to {topic}: {err}");
_ = sender.send(Err(err));
}
Err(err) => error!("Failed to publish message to {topic}: {err}"),
},
}
}
Expand Down
Loading
Loading