Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Oct 10, 2024
1 parent cefb8fe commit efa7cf7
Show file tree
Hide file tree
Showing 10 changed files with 4,145 additions and 53 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ json5 = "0.4.1"
lazy_static = "1.5.0"
# mavlink = { default-features = false, features = ["ardupilotmega", "std", "tokio-1"], path = "../rust-mavlink/mavlink" }
# mavlink = { version = "0.13.1", default-features = false, features = ["ardupilotmega", "std"] }
mavlink = { default-features = false, features = ["ardupilotmega", "serde", "std", "tokio-1"], git = "https://github.com/joaoantoniocardoso/rust-mavlink", branch = "add-tokio" }
mavlink = { default-features = false, features = ["ardupilotmega", "serde", "std", "tokio-1"], git = "https://github.com/joaoantoniocardoso/rust-mavlink", branch = "impl_invalid_crc" }
regex = "1.10.6"
serde = { version = "1", features = ["rc"] }
serde_derive = "1.0.210"
Expand Down
68 changes: 34 additions & 34 deletions src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,40 +68,40 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
)?
.emit()?;

if std::env::var("SKIP_FRONTEND").is_ok() {
return Ok(());
}

if !is_wasm_target_installed() {
install_wasm_target();
}

if get_trunk_version().is_none() {
info!("trunk not found");
install_trunk().unwrap_or_else(|e| {
eprintln!("Error: {}", e);
exit(1);
});
}

let mut trunk_command = Command::new("trunk");
trunk_command.args(["build", "./src/webpage/index.html"]);

// Add --release argument if not in debug mode
if cfg!(not(debug_assertions)) {
trunk_command.arg("--release");
}

let trunk_output = trunk_command.output().expect("Failed to execute trunk");

if !trunk_output.status.success() {
eprintln!(
"Trunk build failed: {}",
String::from_utf8_lossy(&trunk_output.stderr)
);
exit(1);
}
info!("{}", String::from_utf8_lossy(&trunk_output.stdout));
// if std::env::var("SKIP_FRONTEND").is_ok() {
// return Ok(());
// }

// if !is_wasm_target_installed() {
// install_wasm_target();
// }

// if get_trunk_version().is_none() {
// info!("trunk not found");
// install_trunk().unwrap_or_else(|e| {
// eprintln!("Error: {}", e);
// exit(1);
// });
// }

// let mut trunk_command = Command::new("trunk");
// trunk_command.args(["build", "./src/webpage/index.html"]);

// // Add --release argument if not in debug mode
// if cfg!(not(debug_assertions)) {
// trunk_command.arg("--release");
// }

// let trunk_output = trunk_command.output().expect("Failed to execute trunk");

// if !trunk_output.status.success() {
// eprintln!(
// "Trunk build failed: {}",
// String::from_utf8_lossy(&trunk_output.stderr)
// );
// exit(1);
// }
// info!("{}", String::from_utf8_lossy(&trunk_output.stdout));

Ok(())
}
2 changes: 1 addition & 1 deletion src/lib/drivers/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl Driver for FakeSource {
buf.clear();
mavlink::write_v2_msg(&mut buf, header, &data).expect("Failed to write message");

read_all_messages("FakeSource", &mut buf, {
read_all_messages("FakeSource", &mut buf, true, {
let hub_sender = hub_sender.clone();
move |message| {
let message = Arc::new(message);
Expand Down
6 changes: 4 additions & 2 deletions src/lib/drivers/serial/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Serial {
match port.lock().await.read(&mut buf).await {
// We got something
Ok(bytes_received) if bytes_received > 0 => {
read_all_messages("serial", &mut buf, |message| async {
read_all_messages("serial", &mut buf, false, |message| async {
let message = Arc::new(message);

for future in on_message_input.call_all(message.clone()) {
Expand All @@ -92,6 +92,8 @@ impl Serial {
}
}

debug!("Message accepted: {message:?}");

if let Err(error) = hub_sender.send(message) {
error!("Failed to send message to hub: {error:?}, from {port_name:?}");
}
Expand Down Expand Up @@ -130,7 +132,7 @@ impl Serial {
}
}

if let Err(error) = port.lock().await.write_all(&message.raw_bytes()).await {
if let Err(error) = port.lock().await.write_all(message.raw_bytes()).await {
error!("Failed to send serial message: {error:?}");
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/drivers/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn tcp_receive_task(

trace!("Received TCP packet: {buf:?}");

read_all_messages(remote_addr, &mut buf, |message| async {
read_all_messages(remote_addr, &mut buf, true, |message| async {
let message = Arc::new(message);

stats.write().await.stats.update_input(&message);
Expand Down
2 changes: 1 addition & 1 deletion src/lib/drivers/udp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl UdpClient {
Ok((bytes_received, client_addr)) if bytes_received > 0 => {
let client_addr = &client_addr.to_string();

read_all_messages(client_addr, &mut buf, |message| async {
read_all_messages(client_addr, &mut buf, true, |message| async {
let message = Arc::new(message);

self.stats
Expand Down
2 changes: 1 addition & 1 deletion src/lib/drivers/udp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl UdpServer {
Ok((bytes_received, client_addr)) if bytes_received > 0 => {
let client_addr = &client_addr.to_string();

read_all_messages(client_addr, &mut buf, |message| async {
read_all_messages(client_addr, &mut buf, true, |message| async {
let message = Arc::new(message);

self.stats
Expand Down
44 changes: 35 additions & 9 deletions src/lib/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,58 @@ impl Protocol {
}
}

pub async fn read_all_messages<F, Fut>(origin: &str, buf: &mut Vec<u8>, process_message: F)
where
pub async fn read_all_messages<F, Fut>(
origin: &str,
buf: &mut Vec<u8>,
discard_invalid_checksum: bool,
process_message: F,
) where
F: Fn(Protocol) -> Fut,
Fut: Future<Output = ()>,
{
let reader = Cursor::new(buf.as_slice());
let mut reader: mavlink::async_peek_reader::AsyncPeekReader<Cursor<&[u8]>, 280> =
mavlink::async_peek_reader::AsyncPeekReader::new(reader);

loop {
let message = match mavlink::read_v2_raw_message_async::<MavMessage, _>(&mut reader).await {
Ok(message) => Protocol::new(origin, message),
let message = loop {
match mavlink::read_v2_raw_message_async::<MavMessage, _>(&mut reader).await {
Ok(message) => {
break Some(message);
}
Err(error) => {
match error {
mavlink::error::MessageReadError::Io(_) => (),
mavlink::error::MessageReadError::Parse(_) => {
mavlink::error::MessageReadError::Parse(ref parse_error) => {
match parse_error {
mavlink::error::ParserError::InvalidCRC {
crc: _,
calculated_crc: _,
message,
} => {
if !discard_invalid_checksum {
let mavlink::MAVLinkMessageRaw::V2(message) =
message.as_ref().to_owned()
else {
continue;
};
break Some(message);
}
}
_ => (),
}
error!("Failed to parse MAVLink message: {error:?}")
}
}

break;
break None;
}
};
}
};

trace!("Parsed message: {:?}", message.raw_bytes());
if let Some(message) = message {
let message = Protocol::new(origin, message);

trace!("Parsed message: {:?}", message.raw_bytes());
process_message(message).await;
}

Expand Down
Loading

0 comments on commit efa7cf7

Please sign in to comment.