Skip to content

Commit

Permalink
Update src/lib/drivers/zenoh/mod.rs
Browse files Browse the repository at this point in the history
Co-authored-by: João Antônio Cardoso <[email protected]>
  • Loading branch information
patrickelectric and joaoantoniocardoso authored Nov 8, 2024
1 parent 74899a9 commit 202cbd5
Showing 1 changed file with 29 additions and 20 deletions.
49 changes: 29 additions & 20 deletions src/lib/drivers/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,33 +90,42 @@ impl Zenoh {
};

while let Ok(sample) = subscriber.recv_async().await {
if let Ok(content) = json5::from_str::<MAVLinkMessage<mavlink::ardupilotmega::MavMessage>>(
let Ok(content) = json5::from_str::<MAVLinkMessage<mavlink::ardupilotmega::MavMessage>>(
std::str::from_utf8(&sample.payload().to_bytes()).unwrap(),
) {
) else {
debug!("Failed to parse message, not a valid MAVLinkMessage: {sample:?}");
continue;
};

let bus_message = {
let mut message_raw = mavlink::MAVLinkV2MessageRaw::new();
message_raw.serialize_message(content.header, &content.message);
let bus_message = Arc::new(Protocol::new("zenoh", Packet::from(message_raw)));
context.stats.write().await.stats.update_input(&bus_message);

for future in context.on_message_input.call_all(bus_message.clone()) {
if let Err(error) = future.await {
debug!(
"Dropping message: on_message_input callback returned error: {error:?}"
);
continue;
}
}

if let Err(error) = context.hub_sender.send(bus_message) {
error!("Failed to send message to hub: {error:?}");
Arc::new(Protocol::new("zenoh", Packet::from(message_raw)))
};

trace!("Received message: {bus_message:?}");

context.stats.write().await.stats.update_input(&bus_message);

for future in context.on_message_input.call_all(bus_message.clone()) {
if let Err(error) = future.await {
debug!("Dropping message: on_message_input callback returned error: {error:?}");
continue;
}
return Ok(());
}
return Err(anyhow::anyhow!(
"Failed to parse message, not a valid MAVLinkMessage: {sample:?}"
));

if let Err(error) = context.hub_sender.send(bus_message) {
error!("Failed to send message to hub: {error:?}");
continue;
}

trace!("Message sent to hub");
}
return Err(anyhow::anyhow!("Failed to receive message from zenoh"));

debug!("Driver receiver task stopped!");

Ok(())
}

#[instrument(level = "debug", skip_all)]
Expand Down

0 comments on commit 202cbd5

Please sign in to comment.