diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index eaf6991ef..9da6cab4f 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -1,13 +1,9 @@ use std::collections::hash_map::DefaultHasher; use std::collections::HashSet; use std::hash::{Hash, Hasher}; -use std::{ - collections::{HashMap, VecDeque}, - env, - task::Poll, - time::Duration, -}; +use std::{collections::HashMap, task::Poll}; +use libp2p::gossipsub::MessageId; use libp2p::swarm::{ConnectionClosed, FromSwarm}; use libp2p::PeerId; use libp2p::{ @@ -15,23 +11,15 @@ use libp2p::{ identity::Keypair, swarm::{NetworkBehaviour, THandlerInEvent, ToSwarm}, }; -use prost::Message as ProstMessage; -use topos_core::api::grpc::tce::v1::Batch; -use topos_metrics::P2P_GOSSIP_BATCH_SIZE; -use tracing::{debug, error, warn}; +use tracing::{debug, trace, warn}; use crate::error::P2PError; use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY}; use super::HealthStatus; -const MAX_BATCH_SIZE: usize = 10; - pub struct Behaviour { - batch_size: usize, gossipsub: gossipsub::Behaviour, - pending: HashMap<&'static str, VecDeque>>, - tick: tokio::time::Interval, /// List of connected peers per topic. connected_peer: HashMap<&'static str, HashSet>, /// The health status of the gossip behaviour @@ -43,18 +31,22 @@ impl Behaviour { &mut self, topic: &'static str, message: Vec, - ) -> Result { + ) -> Result { match topic { - TOPOS_GOSSIP => { - if let Ok(msg_id) = self.gossipsub.publish(IdentTopic::new(topic), message) { - debug!("Published on topos_gossip: {:?}", msg_id); + TOPOS_GOSSIP | TOPOS_ECHO | TOPOS_READY => { + let topic = IdentTopic::new(topic); + let topic_hash = topic.hash(); + let msg_id = self.gossipsub.publish(topic, message)?; + trace!("Published on topos_gossip: {:?}", msg_id); + + for p in self.gossipsub.mesh_peers(&topic_hash) { + debug!("Sent gossipsub message({}) to {} peer", msg_id, p); } + + Ok(msg_id) } - TOPOS_ECHO | TOPOS_READY => self.pending.entry(topic).or_default().push_back(message), - _ => return Err("Invalid topic"), + _ => Err(P2PError::InvalidGossipTopic(topic)), } - - Ok(0) } pub fn subscribe(&mut self) -> Result<(), P2PError> { @@ -71,10 +63,6 @@ impl Behaviour { } pub async fn new(peer_key: Keypair) -> Self { - let batch_size = env::var("TOPOS_GOSSIP_BATCH_SIZE") - .map(|v| v.parse::()) - .unwrap_or(Ok(MAX_BATCH_SIZE)) - .unwrap(); let gossipsub = gossipsub::ConfigBuilder::default() .max_transmit_size(2 * 1024 * 1024) .validation_mode(gossipsub::ValidationMode::Strict) @@ -99,21 +87,7 @@ impl Behaviour { .unwrap(); Self { - batch_size, gossipsub, - pending: [ - (TOPOS_ECHO, VecDeque::new()), - (TOPOS_READY, VecDeque::new()), - ] - .into_iter() - .collect(), - tick: tokio::time::interval(Duration::from_millis( - env::var("TOPOS_GOSSIP_INTERVAL") - .map(|v| v.parse::()) - .unwrap_or(Ok(100)) - .unwrap(), - )), - connected_peer: Default::default(), health_status: Default::default(), } @@ -191,26 +165,6 @@ impl NetworkBehaviour for Behaviour { &mut self, cx: &mut std::task::Context<'_>, ) -> Poll>> { - if self.tick.poll_tick(cx).is_ready() { - // Publish batch - for (topic, queue) in self.pending.iter_mut() { - if !queue.is_empty() { - let num_of_message = queue.len().min(self.batch_size); - let batch = Batch { - messages: queue.drain(0..num_of_message).collect(), - }; - - debug!("Publishing {} {}", batch.messages.len(), topic); - let msg = batch.encode_to_vec(); - P2P_GOSSIP_BATCH_SIZE.observe(batch.messages.len() as f64); - match self.gossipsub.publish(IdentTopic::new(*topic), msg) { - Ok(message_id) => debug!("Published {} {}", topic, message_id), - Err(error) => error!("Failed to publish {}: {}", topic, error), - } - } - } - } - match self.gossipsub.poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(ToSwarm::GenerateEvent(event)) => match event { @@ -227,29 +181,35 @@ impl NetworkBehaviour for Behaviour { } => match topic.as_str() { TOPOS_GOSSIP => { return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub( - crate::event::GossipEvent::Message { + Box::new(crate::event::GossipEvent::Message { + propagated_by: propagation_source, topic: TOPOS_GOSSIP, message: data, source, - }, + id: message_id, + }), ))) } TOPOS_ECHO => { return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub( - crate::event::GossipEvent::Message { + Box::new(crate::event::GossipEvent::Message { + propagated_by: propagation_source, topic: TOPOS_ECHO, message: data, source, - }, + id: message_id, + }), ))) } TOPOS_READY => { return Poll::Ready(ToSwarm::GenerateEvent(ComposedEvent::Gossipsub( - crate::event::GossipEvent::Message { + Box::new(crate::event::GossipEvent::Message { + propagated_by: propagation_source, topic: TOPOS_READY, message: data, source, - }, + id: message_id, + }), ))) } _ => {} diff --git a/crates/topos-p2p/src/client.rs b/crates/topos-p2p/src/client.rs index f33eee9e0..018bb69ef 100644 --- a/crates/topos-p2p/src/client.rs +++ b/crates/topos-p2p/src/client.rs @@ -1,4 +1,4 @@ -use futures::future::BoxFuture; +use futures::TryFutureExt; use libp2p::PeerId; use tokio::sync::{ mpsc::{self, error::SendError}, @@ -39,21 +39,24 @@ impl NetworkClient { .await } - pub fn publish( + pub async fn publish( &self, topic: &'static str, message: T, - ) -> BoxFuture<'static, Result<(), SendError>> { + ) -> Result { let network = self.sender.clone(); + let (sender, receiver) = oneshot::channel(); + + network + .send(Command::Gossip { + topic, + data: message.encode_to_vec(), + sender, + }) + .map_err(CommandExecutionError::from) + .await?; - Box::pin(async move { - network - .send(Command::Gossip { - topic, - data: message.encode_to_vec(), - }) - .await - }) + receiver.await?.map(|id| id.to_string()) } async fn send_command_with_receiver< diff --git a/crates/topos-p2p/src/command.rs b/crates/topos-p2p/src/command.rs index 63432c42c..8bd675504 100644 --- a/crates/topos-p2p/src/command.rs +++ b/crates/topos-p2p/src/command.rs @@ -1,6 +1,6 @@ use std::fmt::Display; -use libp2p::PeerId; +use libp2p::{gossipsub::MessageId, PeerId}; use tokio::sync::oneshot; use crate::{behaviour::grpc::connection::OutboundConnection, error::P2PError}; @@ -15,6 +15,7 @@ pub enum Command { Gossip { topic: &'static str, data: Vec, + sender: oneshot::Sender>, }, /// Ask for the creation of a new proxy connection for a gRPC query. diff --git a/crates/topos-p2p/src/error.rs b/crates/topos-p2p/src/error.rs index 3fe69f0f9..050a5920d 100644 --- a/crates/topos-p2p/src/error.rs +++ b/crates/topos-p2p/src/error.rs @@ -1,8 +1,11 @@ use std::io; use libp2p::{ - gossipsub::SubscriptionError, kad::NoKnownPeers, noise::Error as NoiseError, - request_response::OutboundFailure, TransportError, + gossipsub::{PublishError, SubscriptionError}, + kad::NoKnownPeers, + noise::Error as NoiseError, + request_response::OutboundFailure, + TransportError, }; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; @@ -49,6 +52,12 @@ pub enum P2PError { #[error("Gossip topics subscription failed")] GossipTopicSubscriptionFailure, + + #[error("Gossipsub publish failure: {0}")] + GossipsubPublishFailure(#[from] PublishError), + + #[error("Invalid gossipsub topics: {0}")] + InvalidGossipTopic(&'static str), } #[derive(Error, Debug)] diff --git a/crates/topos-p2p/src/event.rs b/crates/topos-p2p/src/event.rs index a9ea4e2b4..d9a2a75f8 100644 --- a/crates/topos-p2p/src/event.rs +++ b/crates/topos-p2p/src/event.rs @@ -1,4 +1,4 @@ -use libp2p::{identify, kad, PeerId}; +use libp2p::{gossipsub::MessageId, identify, kad, PeerId}; use crate::behaviour::{grpc, HealthStatus}; @@ -7,9 +7,11 @@ use crate::behaviour::{grpc, HealthStatus}; pub enum GossipEvent { /// A message has been received from a peer on one of the subscribed topics Message { + propagated_by: PeerId, source: Option, topic: &'static str, message: Vec, + id: MessageId, }, } @@ -17,7 +19,7 @@ pub enum GossipEvent { pub enum ComposedEvent { Kademlia(Box), PeerInfo(Box), - Gossipsub(GossipEvent), + Gossipsub(Box), Grpc(grpc::Event), Void, } @@ -48,9 +50,15 @@ impl From for ComposedEvent { /// Represents the events that the p2p layer can emit #[derive(Debug)] +#[allow(clippy::large_enum_variant)] pub enum Event { /// An event emitted when a gossip message is received - Gossip { from: PeerId, data: Vec }, + Gossip { + propagated_by: PeerId, + from: PeerId, + data: Vec, + id: String, + }, /// An event emitted when the p2p layer becomes healthy Healthy, /// An event emitted when the p2p layer becomes unhealthy diff --git a/crates/topos-p2p/src/runtime/handle_command.rs b/crates/topos-p2p/src/runtime/handle_command.rs index 572a83798..6af9f09af 100644 --- a/crates/topos-p2p/src/runtime/handle_command.rs +++ b/crates/topos-p2p/src/runtime/handle_command.rs @@ -5,7 +5,7 @@ use crate::{ use rand::{thread_rng, Rng}; use topos_metrics::P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL; -use tracing::{debug, error, warn}; +use tracing::{error, trace, warn}; impl Runtime { pub(crate) async fn handle_command(&mut self, command: Command) { @@ -64,12 +64,17 @@ impl Runtime { Command::Gossip { topic, data: message, + sender, } => match self.swarm.behaviour_mut().gossipsub.publish(topic, message) { Ok(message_id) => { - debug!("Published message to {topic}"); + trace!("Published message to {topic}"); P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL.inc(); + _ = sender.send(Ok(message_id)); + } + Err(err) => { + error!("Failed to publish message to {topic}: {err}"); + _ = sender.send(Err(err)); } - Err(err) => error!("Failed to publish message to {topic}: {err}"), }, } } diff --git a/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs b/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs index ddb2aa9ff..04ed3bb66 100644 --- a/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs +++ b/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs @@ -1,74 +1,58 @@ use topos_metrics::{ - P2P_EVENT_STREAM_CAPACITY_TOTAL, P2P_MESSAGE_DESERIALIZE_FAILURE_TOTAL, - P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL, P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL, - P2P_MESSAGE_RECEIVED_ON_READY_TOTAL, + P2P_EVENT_STREAM_CAPACITY_TOTAL, P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL, + P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL, P2P_MESSAGE_RECEIVED_ON_READY_TOTAL, }; use tracing::{debug, error}; use crate::{constants, event::GossipEvent, Event, Runtime, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY}; -use prost::Message; -use topos_core::api::grpc::tce::v1::Batch; use super::{EventHandler, EventResult}; #[async_trait::async_trait] -impl EventHandler for Runtime { - async fn handle(&mut self, event: GossipEvent) -> EventResult { +impl EventHandler> for Runtime { + async fn handle(&mut self, event: Box) -> EventResult { if let GossipEvent::Message { + propagated_by, source: Some(source), message, topic, - } = event + id, + } = *event { if self.event_sender.capacity() < *constants::CAPACITY_EVENT_STREAM_BUFFER { P2P_EVENT_STREAM_CAPACITY_TOTAL.inc(); } - debug!("Received message from {:?} on topic {:?}", source, topic); - match topic { - TOPOS_GOSSIP => { - P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL.inc(); + debug!( + message_id = id.to_string(), + local_peer_id = self.local_peer_id.to_string(), + propagated_by = propagated_by.to_string(), + "Received message({id}) from source {source} on topic {:?} propagated by {propagated_by}", + topic + ); - if let Err(e) = self - .event_sender - .send(Event::Gossip { - from: source, - data: message, - }) - .await - { - error!("Failed to send gossip event to runtime: {:?}", e); - } - } - TOPOS_ECHO | TOPOS_READY => { - if topic == TOPOS_ECHO { - P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL.inc(); - } else { - P2P_MESSAGE_RECEIVED_ON_READY_TOTAL.inc(); - } - if let Ok(Batch { messages }) = Batch::decode(&message[..]) { - for message in messages { - if let Err(e) = self - .event_sender - .send(Event::Gossip { - from: source, - data: message, - }) - .await - { - error!("Failed to send gossip {} event to runtime: {:?}", topic, e); - } - } - } else { - P2P_MESSAGE_DESERIALIZE_FAILURE_TOTAL - .with_label_values(&[topic]) - .inc(); - } - } + match topic { + TOPOS_GOSSIP => P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL.inc(), + TOPOS_ECHO => P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL.inc(), + TOPOS_READY => P2P_MESSAGE_RECEIVED_ON_READY_TOTAL.inc(), _ => { - error!("Received message on unknown topic {:?}", topic); + error!("Received message({id}) on unknown topic {:?}", topic); + return Ok(()); } } + + if let Err(e) = self + .event_sender + .send(Event::Gossip { + propagated_by, + from: source, + data: message, + id: id.to_string(), + }) + .await + { + error!("Failed to send gossip event({id}) to runtime: {:?}", e); + } } Ok(()) diff --git a/crates/topos-tce/src/app_context/network.rs b/crates/topos-tce/src/app_context/network.rs index acfdb569b..4b26b639a 100644 --- a/crates/topos-tce/src/app_context/network.rs +++ b/crates/topos-tce/src/app_context/network.rs @@ -22,7 +22,13 @@ impl AppContext { &evt ); - if let NetEvent::Gossip { data, from } = evt { + if let NetEvent::Gossip { + data, + propagated_by: received_from, + from, + id, + } = evt + { if let Ok(DoubleEchoRequest { request: Some(double_echo_request), }) = DoubleEchoRequest::decode(&data[..]) @@ -38,14 +44,16 @@ impl AppContext { entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer()); } info!( - "Received certificate {} from GossipSub from {}", - cert.id, from + message_id = id, + "Received certificate {} from GossipSub message({id}) from {from} | propagated by {received_from}", + cert.id, ); match self.validator_store.insert_pending_certificate(&cert).await { Ok(Some(pending_id)) => { let certificate_id = cert.id; debug!( + message_id = id, "Certificate {} has been inserted into pending pool", certificate_id ); @@ -62,6 +70,7 @@ impl AppContext { .is_err() { error!( + message_id = id, "Unable to send DoubleEchoCommand::Broadcast command \ to double echo for {}", certificate_id @@ -71,15 +80,19 @@ impl AppContext { Ok(None) => { debug!( + message_id = id, "Certificate {} from subnet {} has been inserted into \ precedence pool waiting for {}", - cert.id, cert.source_subnet_id, cert.prev_id + cert.id, + cert.source_subnet_id, + cert.prev_id ); } Err(StorageError::InternalStorage( InternalStorageError::CertificateAlreadyPending, )) => { debug!( + message_id = id, "Certificate {} has been already added to the pending \ pool, skipping", cert.id @@ -89,20 +102,26 @@ impl AppContext { InternalStorageError::CertificateAlreadyExists, )) => { debug!( + message_id = id, "Certificate {} has been already delivered, skipping", cert.id ); } Err(error) => { error!( + message_id = id, "Unable to insert pending certificate {}: {}", - cert.id, error + cert.id, + error ); } } } Err(e) => { - error!("Failed to parse the received Certificate: {e}"); + error!( + message_id = id, + "Failed to parse the received Certificate: {e}" + ); } }, double_echo_request::Request::Echo(Echo { @@ -114,6 +133,7 @@ impl AppContext { spawn(async move { let certificate_id = certificate_id.clone().try_into().map_err(|e| { error!( + message_id = id, "Failed to parse the CertificateId {certificate_id} from \ Echo: {e}" ); @@ -121,6 +141,7 @@ impl AppContext { }); let validator_id = validator_id.clone().try_into().map_err(|e| { error!( + message_id = id, "Failed to parse the ValidatorId {validator_id} from Echo: {e}" ); e @@ -129,11 +150,10 @@ impl AppContext { if let (Ok(certificate_id), Ok(validator_id)) = (certificate_id, validator_id) { - trace!( - "Received Echo message, certificate_id: {certificate_id}, \ - validator_id: {validator_id} from: {from}", - certificate_id = certificate_id, - validator_id = validator_id + debug!( + message_id = id, + "Received Echo message({id}), certificate_id: \ + {certificate_id}, validator_id: {validator_id} from: {from}", ); if let Err(e) = channel @@ -144,10 +164,16 @@ impl AppContext { }) .await { - error!("Unable to pass received Echo message: {:?}", e); + error!( + message_id = id, + "Unable to pass received Echo message: {:?}", e + ); } } else { - error!("Unable to process Echo message due to invalid data"); + error!( + message_id = id, + "Unable to process Echo message due to invalid data" + ); } }); } @@ -160,6 +186,7 @@ impl AppContext { spawn(async move { let certificate_id = certificate_id.clone().try_into().map_err(|e| { error!( + message_id = id, "Failed to parse the CertificateId {certificate_id} from \ Ready: {e}" ); @@ -167,6 +194,7 @@ impl AppContext { }); let validator_id = validator_id.clone().try_into().map_err(|e| { error!( + message_id = id, "Failed to parse the ValidatorId {validator_id} from Ready: \ {e}" ); @@ -175,11 +203,10 @@ impl AppContext { if let (Ok(certificate_id), Ok(validator_id)) = (certificate_id, validator_id) { - trace!( - "Received Ready message, certificate_id: {certificate_id}, \ - validator_id: {validator_id} from: {from}", - certificate_id = certificate_id, - validator_id = validator_id + debug!( + message_id = id, + "Received Ready message({id}), certificate_id: \ + {certificate_id}, validator_id: {validator_id} from: {from}", ); if let Err(e) = channel .send(DoubleEchoCommand::Ready { @@ -189,10 +216,16 @@ impl AppContext { }) .await { - error!("Unable to pass received Ready message: {:?}", e); + error!( + message_id = id, + "Unable to pass received Ready message: {:?}", e + ); } } else { - error!("Unable to process Ready message due to invalid data"); + error!( + message_id = id, + "Unable to process Ready message due to invalid data" + ); } }); } diff --git a/crates/topos-tce/src/app_context/protocol.rs b/crates/topos-tce/src/app_context/protocol.rs index 278a13c0a..6057da9e8 100644 --- a/crates/topos-tce/src/app_context/protocol.rs +++ b/crates/topos-tce/src/app_context/protocol.rs @@ -1,6 +1,6 @@ use topos_core::api::grpc::tce::v1::{double_echo_request, DoubleEchoRequest, Echo, Gossip, Ready}; use topos_tce_broadcast::event::ProtocolEvents; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use crate::AppContext; @@ -21,12 +21,16 @@ impl AppContext { }; info!("Sending Gossip for certificate {}", cert_id); - if let Err(e) = self + match self .network_client .publish(topos_p2p::TOPOS_GOSSIP, request) .await { - error!("Unable to send Gossip: {e}"); + Ok(id) => debug!( + message_id = id, + "Sent Gossip message({id}) for certificate {cert_id}" + ), + Err(e) => error!("Unable to send Gossip: {e}"), } } @@ -44,12 +48,16 @@ impl AppContext { })), }; - if let Err(e) = self + match self .network_client .publish(topos_p2p::TOPOS_ECHO, request) .await { - error!("Unable to send Echo: {e}"); + Ok(id) => debug!( + message_id = id, + "Sent Echo message({id}) for certificate {certificate_id}" + ), + Err(e) => error!("Unable to send Echo: {e}"), } } @@ -66,12 +74,16 @@ impl AppContext { })), }; - if let Err(e) = self + match self .network_client .publish(topos_p2p::TOPOS_READY, request) .await { - error!("Unable to send Ready: {e}"); + Ok(id) => debug!( + message_id = id, + "Sent Ready message({id}) for certificate {certificate_id}" + ), + Err(e) => error!("Unable to send Ready: {e}"), } } ProtocolEvents::BroadcastFailed { certificate_id } => { diff --git a/crates/topos-tce/src/tests/mod.rs b/crates/topos-tce/src/tests/mod.rs index 39ea06d8b..d5add42fa 100644 --- a/crates/topos-tce/src/tests/mod.rs +++ b/crates/topos-tce/src/tests/mod.rs @@ -1,12 +1,15 @@ use libp2p::PeerId; use rstest::{fixture, rstest}; -use std::{collections::HashSet, future::IntoFuture, sync::Arc}; +use std::{collections::HashSet, future::IntoFuture, sync::Arc, time::Duration}; use tokio_stream::Stream; use topos_tce_api::RuntimeEvent; use topos_tce_broadcast::event::ProtocolEvents; use topos_tce_gatekeeper::Gatekeeper; -use tokio::sync::{broadcast, mpsc}; +use tokio::{ + spawn, + sync::{broadcast, mpsc}, +}; use topos_crypto::messages::MessageSigner; use topos_p2p::{utils::GrpcOverP2P, NetworkClient}; use topos_tce_broadcast::{ReliableBroadcastClient, ReliableBroadcastConfig}; @@ -24,7 +27,8 @@ mod api; mod network; #[rstest] -#[tokio::test] +#[test_log::test(tokio::test)] +#[timeout(Duration::from_secs(1))] async fn non_validator_publish_gossip( #[future] setup_test: ( AppContext, @@ -34,20 +38,23 @@ async fn non_validator_publish_gossip( ) { let (mut context, mut p2p_receiver, _) = setup_test.await; let certificates = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 1); - context - .on_protocol_event(ProtocolEvents::Gossip { - cert: certificates[0].certificate.clone(), - }) - .await; + spawn(async move { + _ = context + .on_protocol_event(ProtocolEvents::Gossip { + cert: certificates[0].certificate.clone(), + }) + .await; + }); assert!(matches!( - p2p_receiver.try_recv(), - Ok(topos_p2p::Command::Gossip { topic, .. }) if topic == "topos_gossip" + p2p_receiver.recv().await, + Some(topos_p2p::Command::Gossip { topic, .. }) if topic == "topos_gossip" )); } #[rstest] #[tokio::test] +#[timeout(Duration::from_secs(1))] async fn non_validator_do_not_publish_echo( #[future] setup_test: ( AppContext, @@ -56,19 +63,25 @@ async fn non_validator_do_not_publish_echo( ), ) { let (mut context, mut p2p_receiver, message_signer) = setup_test.await; - context - .on_protocol_event(ProtocolEvents::Echo { - certificate_id: CERTIFICATE_ID_1, - signature: message_signer.sign_message(&[]).ok().unwrap(), - validator_id: message_signer.public_address.into(), - }) - .await; - - assert!(p2p_receiver.try_recv().is_err(),); + spawn(async move { + context + .on_protocol_event(ProtocolEvents::Echo { + certificate_id: CERTIFICATE_ID_1, + signature: message_signer.sign_message(&[]).ok().unwrap(), + validator_id: message_signer.public_address.into(), + }) + .await; + }); + + assert!(matches!( + tokio::time::timeout(Duration::from_millis(10), p2p_receiver.recv()).await, + Ok(None) + )); } #[rstest] #[tokio::test] +#[timeout(Duration::from_secs(1))] async fn non_validator_do_not_publish_ready( #[future] setup_test: ( AppContext, @@ -77,15 +90,20 @@ async fn non_validator_do_not_publish_ready( ), ) { let (mut context, mut p2p_receiver, message_signer) = setup_test.await; - context - .on_protocol_event(ProtocolEvents::Ready { - certificate_id: CERTIFICATE_ID_1, - signature: message_signer.sign_message(&[]).ok().unwrap(), - validator_id: message_signer.public_address.into(), - }) - .await; - - assert!(p2p_receiver.try_recv().is_err(),); + spawn(async move { + context + .on_protocol_event(ProtocolEvents::Ready { + certificate_id: CERTIFICATE_ID_1, + signature: message_signer.sign_message(&[]).ok().unwrap(), + validator_id: message_signer.public_address.into(), + }) + .await; + }); + + assert!(matches!( + tokio::time::timeout(Duration::from_millis(10), p2p_receiver.recv()).await, + Ok(None) + )); } #[fixture] diff --git a/crates/topos-tce/src/tests/network.rs b/crates/topos-tce/src/tests/network.rs index 18b10c615..6e33546fb 100644 --- a/crates/topos-tce/src/tests/network.rs +++ b/crates/topos-tce/src/tests/network.rs @@ -37,8 +37,10 @@ async fn handle_gossip( }; context .on_net_event(topos_p2p::Event::Gossip { + propagated_by: PeerId::random(), from: PeerId::random(), data: msg.encode_to_vec(), + id: "0".to_string(), }) .await; } @@ -66,8 +68,10 @@ async fn handle_echo( }; context .on_net_event(topos_p2p::Event::Gossip { + propagated_by: PeerId::random(), from: PeerId::random(), data: msg.encode_to_vec(), + id: "0".to_string(), }) .await; } @@ -95,8 +99,10 @@ async fn handle_ready( }; context .on_net_event(topos_p2p::Event::Gossip { + propagated_by: PeerId::random(), from: PeerId::random(), data: msg.encode_to_vec(), + id: "0".to_string(), }) .await; } @@ -128,8 +134,10 @@ async fn handle_already_delivered( context .on_net_event(topos_p2p::Event::Gossip { + propagated_by: PeerId::random(), from: PeerId::random(), data: msg.encode_to_vec(), + id: "0".to_string(), }) .await; }