diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index 220b01c1d..9da6cab4f 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -34,9 +34,15 @@ impl Behaviour { ) -> Result { match topic { TOPOS_GOSSIP | TOPOS_ECHO | TOPOS_READY => { - let msg_id = self.gossipsub.publish(IdentTopic::new(topic), message)?; + let topic = IdentTopic::new(topic); + let topic_hash = topic.hash(); + let msg_id = self.gossipsub.publish(topic, message)?; trace!("Published on topos_gossip: {:?}", msg_id); + for p in self.gossipsub.mesh_peers(&topic_hash) { + debug!("Sent gossipsub message({}) to {} peer", msg_id, p); + } + Ok(msg_id) } _ => Err(P2PError::InvalidGossipTopic(topic)), @@ -175,32 +181,35 @@ impl NetworkBehaviour for Behaviour { } => match topic.as_str() { TOPOS_GOSSIP => { return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub( - crate::event::GossipEvent::Message { + Box::new(crate::event::GossipEvent::Message { + propagated_by: propagation_source, topic: TOPOS_GOSSIP, message: data, source, id: message_id, - }, + }), ))) } TOPOS_ECHO => { return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub( - crate::event::GossipEvent::Message { + Box::new(crate::event::GossipEvent::Message { + propagated_by: propagation_source, topic: TOPOS_ECHO, message: data, source, id: message_id, - }, + }), ))) } TOPOS_READY => { return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub( - crate::event::GossipEvent::Message { + Box::new(crate::event::GossipEvent::Message { + propagated_by: propagation_source, topic: TOPOS_READY, message: data, source, id: message_id, - }, + }), ))) } _ => {} diff --git a/crates/topos-p2p/src/event.rs b/crates/topos-p2p/src/event.rs index bed4d3aec..d9a2a75f8 100644 --- a/crates/topos-p2p/src/event.rs +++ b/crates/topos-p2p/src/event.rs @@ -7,6 +7,7 @@ use crate::behaviour::{grpc, HealthStatus}; pub enum GossipEvent { /// A message has been received from a peer on one of the subscribed topics Message { + propagated_by: PeerId, source: Option, topic: &'static str, message: Vec, @@ -18,7 +19,7 @@ pub enum GossipEvent { pub enum ComposedEvent { Kademlia(Box), PeerInfo(Box), - Gossipsub(GossipEvent), + Gossipsub(Box), Grpc(grpc::Event), Void, } @@ -49,9 +50,11 @@ impl From for ComposedEvent { /// Represents the events that the p2p layer can emit #[derive(Debug)] +#[allow(clippy::large_enum_variant)] pub enum Event { /// An event emitted when a gossip message is received Gossip { + propagated_by: PeerId, from: PeerId, data: Vec, id: String, diff --git a/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs b/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs index b96f35dcb..6a97e8bf9 100644 --- a/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs +++ b/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs @@ -9,27 +9,31 @@ use crate::{constants, event::GossipEvent, Event, Runtime, TOPOS_ECHO, TOPOS_GOS use super::{EventHandler, EventResult}; #[async_trait::async_trait] -impl EventHandler for Runtime { - async fn handle(&mut self, event: GossipEvent) -> EventResult { +impl EventHandler> for Runtime { + async fn handle(&mut self, event: Box) -> EventResult { if let GossipEvent::Message { + propagated_by, source: Some(source), message, topic, id, - } = event + } = *event { if self.event_sender.capacity() < *constants::CAPACITY_EVENT_STREAM_BUFFER { P2P_EVENT_STREAM_CAPACITY_TOTAL.inc(); } - debug!("Received message from {:?} on topic {:?}", source, topic); + debug!( + "Received message({id}) from source {:?} on topic {:?} propagated by {propagated_by}", + source, topic + ); match topic { TOPOS_GOSSIP => P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL.inc(), TOPOS_ECHO => P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL.inc(), TOPOS_READY => P2P_MESSAGE_RECEIVED_ON_READY_TOTAL.inc(), _ => { - error!("Received message on unknown topic {:?}", topic); + error!("Received message({id}) on unknown topic {:?}", topic); return Ok(()); } } @@ -37,13 +41,14 @@ impl EventHandler for Runtime { if let Err(e) = self .event_sender .send(Event::Gossip { + propagated_by, from: source, data: message, id: id.to_string(), }) .await { - error!("Failed to send gossip event to runtime: {:?}", e); + error!("Failed to send gossip event({id}) to runtime: {:?}", e); } } diff --git a/crates/topos-tce/src/app_context/network.rs b/crates/topos-tce/src/app_context/network.rs index 45cb9a1a4..50764272b 100644 --- a/crates/topos-tce/src/app_context/network.rs +++ b/crates/topos-tce/src/app_context/network.rs @@ -22,7 +22,13 @@ impl AppContext { &evt ); - if let NetEvent::Gossip { data, from, id } = evt { + if let NetEvent::Gossip { + data, + propagated_by: received_from, + from, + id, + } = evt + { if let Ok(DoubleEchoRequest { request: Some(double_echo_request), }) = DoubleEchoRequest::decode(&data[..]) @@ -39,10 +45,8 @@ impl AppContext { } info!( message_id = id, - "Received certificate {} from GossipSub message({}) from {}", + "Received certificate {} from GossipSub message({id}) from {from} | propagated by {received_from}", cert.id, - id, - from ); match self.validator_store.insert_pending_certificate(&cert).await { diff --git a/crates/topos-tce/src/tests/network.rs b/crates/topos-tce/src/tests/network.rs index bdcbb68dc..6e33546fb 100644 --- a/crates/topos-tce/src/tests/network.rs +++ b/crates/topos-tce/src/tests/network.rs @@ -37,6 +37,7 @@ async fn handle_gossip( }; context .on_net_event(topos_p2p::Event::Gossip { + propagated_by: PeerId::random(), from: PeerId::random(), data: msg.encode_to_vec(), id: "0".to_string(), @@ -67,6 +68,7 @@ async fn handle_echo( }; context .on_net_event(topos_p2p::Event::Gossip { + propagated_by: PeerId::random(), from: PeerId::random(), data: msg.encode_to_vec(), id: "0".to_string(), @@ -97,6 +99,7 @@ async fn handle_ready( }; context .on_net_event(topos_p2p::Event::Gossip { + propagated_by: PeerId::random(), from: PeerId::random(), data: msg.encode_to_vec(), id: "0".to_string(), @@ -131,6 +134,7 @@ async fn handle_already_delivered( context .on_net_event(topos_p2p::Event::Gossip { + propagated_by: PeerId::random(), from: PeerId::random(), data: msg.encode_to_vec(), id: "0".to_string(),