diff --git a/src/lib/drivers/zenoh/mod.rs b/src/lib/drivers/zenoh/mod.rs index cabf081..867b10c 100644 --- a/src/lib/drivers/zenoh/mod.rs +++ b/src/lib/drivers/zenoh/mod.rs @@ -90,33 +90,42 @@ impl Zenoh { }; while let Ok(sample) = subscriber.recv_async().await { - if let Ok(content) = json5::from_str::>( + let Ok(content) = json5::from_str::>( std::str::from_utf8(&sample.payload().to_bytes()).unwrap(), - ) { + ) else { + debug!("Failed to parse message, not a valid MAVLinkMessage: {sample:?}"); + continue; + }; + + let bus_message = { let mut message_raw = mavlink::MAVLinkV2MessageRaw::new(); message_raw.serialize_message(content.header, &content.message); - let bus_message = Arc::new(Protocol::new("zenoh", Packet::from(message_raw))); - context.stats.write().await.stats.update_input(&bus_message); - - for future in context.on_message_input.call_all(bus_message.clone()) { - if let Err(error) = future.await { - debug!( - "Dropping message: on_message_input callback returned error: {error:?}" - ); - continue; - } - } - if let Err(error) = context.hub_sender.send(bus_message) { - error!("Failed to send message to hub: {error:?}"); + Arc::new(Protocol::new("zenoh", Packet::from(message_raw))) + }; + + trace!("Received message: {bus_message:?}"); + + context.stats.write().await.stats.update_input(&bus_message); + + for future in context.on_message_input.call_all(bus_message.clone()) { + if let Err(error) = future.await { + debug!("Dropping message: on_message_input callback returned error: {error:?}"); + continue; } - return Ok(()); } - return Err(anyhow::anyhow!( - "Failed to parse message, not a valid MAVLinkMessage: {sample:?}" - )); + + if let Err(error) = context.hub_sender.send(bus_message) { + error!("Failed to send message to hub: {error:?}"); + continue; + } + + trace!("Message sent to hub"); } - return Err(anyhow::anyhow!("Failed to receive message from zenoh")); + + debug!("Driver receiver task stopped!"); + + Ok(()) } #[instrument(level = "debug", skip_all)]