From 202cbd5225f592b9533967b3e45f5eaa38c7057d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patrick=20Jos=C3=A9=20Pereira?= Date: Fri, 8 Nov 2024 13:32:19 -0300 Subject: [PATCH] Update src/lib/drivers/zenoh/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Antônio Cardoso --- src/lib/drivers/zenoh/mod.rs | 49 +++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/src/lib/drivers/zenoh/mod.rs b/src/lib/drivers/zenoh/mod.rs index cabf081..867b10c 100644 --- a/src/lib/drivers/zenoh/mod.rs +++ b/src/lib/drivers/zenoh/mod.rs @@ -90,33 +90,42 @@ impl Zenoh { }; while let Ok(sample) = subscriber.recv_async().await { - if let Ok(content) = json5::from_str::>( + let Ok(content) = json5::from_str::>( std::str::from_utf8(&sample.payload().to_bytes()).unwrap(), - ) { + ) else { + debug!("Failed to parse message, not a valid MAVLinkMessage: {sample:?}"); + continue; + }; + + let bus_message = { let mut message_raw = mavlink::MAVLinkV2MessageRaw::new(); message_raw.serialize_message(content.header, &content.message); - let bus_message = Arc::new(Protocol::new("zenoh", Packet::from(message_raw))); - context.stats.write().await.stats.update_input(&bus_message); - - for future in context.on_message_input.call_all(bus_message.clone()) { - if let Err(error) = future.await { - debug!( - "Dropping message: on_message_input callback returned error: {error:?}" - ); - continue; - } - } - if let Err(error) = context.hub_sender.send(bus_message) { - error!("Failed to send message to hub: {error:?}"); + Arc::new(Protocol::new("zenoh", Packet::from(message_raw))) + }; + + trace!("Received message: {bus_message:?}"); + + context.stats.write().await.stats.update_input(&bus_message); + + for future in context.on_message_input.call_all(bus_message.clone()) { + if let Err(error) = future.await { + debug!("Dropping message: on_message_input callback returned error: {error:?}"); + continue; } - return Ok(()); } - return Err(anyhow::anyhow!( - "Failed to parse message, not a valid MAVLinkMessage: {sample:?}" - )); + + if let Err(error) = context.hub_sender.send(bus_message) { + error!("Failed to send message to hub: {error:?}"); + continue; + } + + trace!("Message sent to hub"); } - return Err(anyhow::anyhow!("Failed to receive message from zenoh")); + + debug!("Driver receiver task stopped!"); + + Ok(()) } #[instrument(level = "debug", skip_all)]