Skip to content

Commit

Permalink
Update src/lib/drivers/zenoh/mod.rs
Browse files Browse the repository at this point in the history
Co-authored-by: João Antônio Cardoso <[email protected]>
  • Loading branch information
patrickelectric and joaoantoniocardoso authored Nov 8, 2024
1 parent 202cbd5 commit 2e97389
Showing 1 changed file with 59 additions and 42 deletions.
101 changes: 59 additions & 42 deletions src/lib/drivers/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,55 +133,72 @@ impl Zenoh {
let mut hub_receiver = context.hub_sender.subscribe();

loop {
match hub_receiver.recv().await {
Ok(message) => {
context.stats.write().await.stats.update_output(&message);
for future in context.on_message_output.call_all(message.clone()) {
if let Err(error) = future.await {
debug!("Dropping message: on_message_output callback returned error: {error:?}");
continue;
}
}
let message = match hub_receiver.recv().await {
Ok(message) => message,
Err(broadcast::error::RecvError::Closed) => {
error!("Hub channel closed!");
break;
}
Err(broadcast::error::RecvError::Lagged(count)) => {
warn!("Channel lagged by {count} messages.");
continue;
}
};

if message.origin.eq("zenoh") {
continue; // Don't do loopback
}

let mut bytes =
mavlink::async_peek_reader::AsyncPeekReader::new(message.as_slice());
let Ok((header, message)) = mavlink::read_v2_msg_async::<
mavlink::ardupilotmega::MavMessage,
_,
>(&mut bytes)
.await
else {
continue;
};

let message_name = message.message_name();
let mavlink_message = MAVLinkMessage { header, message };
let json_string = match json5::to_string(&mavlink_message) {
Ok(json) => json,
Err(error) => {
error!(
"Failed to transform mavlink message {message_name} to json: {error:?}"
);
continue;
}
};
let topic_name = "mavlink/out";
if let Err(error) = session.put(topic_name, json_string.clone()).await {
error!("Failed to send message to {topic_name}: {error:?}");
}
let topic_name = format!(
"mavlink/{}/{}/{}",
header.system_id, header.component_id, message_name
context.stats.write().await.stats.update_output(&message);

for future in context.on_message_output.call_all(message.clone()) {
if let Err(error) = future.await {
debug!(
"Dropping message: on_message_output callback returned error: {error:?}"
);
if let Err(error) = session.put(topic_name.clone(), json_string).await {
error!("Failed to send message to {topic_name}: {error:?}");
}
continue;
}
}

let mut bytes = mavlink::async_peek_reader::AsyncPeekReader::new(message.as_slice());
let Ok((header, message)) =
mavlink::read_v2_msg_async::<mavlink::ardupilotmega::MavMessage, _>(&mut bytes)
.await
else {
continue;
};

let message_name = message.message_name();
let mavlink_message = MAVLinkMessage { header, message };
let json_string = &match json5::to_string(&mavlink_message) {
Ok(json) => json,
Err(error) => {
error!("Failed to receive message from hub: {error:?}");
error!("Failed to transform mavlink message {message_name} to json: {error:?}");
continue;
}
};

let topic_name = "mavlink/out";
if let Err(error) = session.put(topic_name, json_string).await {
error!("Failed to send message to {topic_name}: {error:?}");
} else {
trace!("Message sent to {topic_name}: {json_string:?}");
}

let topic_name = &format!(
"mavlink/{}/{}/{}",
header.system_id, header.component_id, message_name
);
if let Err(error) = session.put(topic_name, json_string).await {
error!("Failed to send message to {topic_name}: {error:?}");
} else {
trace!("Message sent to {topic_name}: {json_string:?}");
}
}

debug!("Driver sender task stopped!");

Ok(())
}
}

Expand Down

0 comments on commit 2e97389

Please sign in to comment.