Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Sep 8, 2024
1 parent 5627a82 commit 875c121
Show file tree
Hide file tree
Showing 13 changed files with 279 additions and 45 deletions.
57 changes: 43 additions & 14 deletions src/drivers/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,29 @@ use std::sync::Arc;

use anyhow::Result;
use mavlink_server::callbacks::{Callbacks, MessageCallback};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, RwLock};
use tracing::*;

use crate::{
drivers::{Driver, DriverInfo},
drivers::{
stats::{DriverStats, DriverStatsInfo},
Driver, DriverInfo,
},
protocol::{read_all_messages, Protocol},
};

pub struct FakeSink {
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
print: bool,
stats: Arc<RwLock<DriverStatsInfo>>,
}

impl FakeSink {
pub fn builder() -> FakeSinkBuilder {
FakeSinkBuilder(Self {
on_message_input: Callbacks::new(),
on_message_output: Callbacks::new(),
print: false,
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
})
}
}
Expand All @@ -45,14 +48,6 @@ impl FakeSinkBuilder {
self.0.on_message_input.add_callback(callback.into_boxed());
self
}

pub fn on_message_output<C>(self, callback: C) -> Self
where
C: MessageCallback<Arc<Protocol>>,
{
self.0.on_message_output.add_callback(callback.into_boxed());
self
}
}

#[async_trait::async_trait]
Expand All @@ -61,16 +56,28 @@ impl Driver for FakeSink {
let mut hub_receiver = hub_sender.subscribe();

while let Ok(message) = hub_receiver.recv().await {
self.stats
.write()
.await
.update_input(Arc::clone(&message))
.await;

self.stats
.write()
.await
.update_input(Arc::clone(&message))
.await;

for future in self.on_message_input.call_all(Arc::clone(&message)) {
if let Err(error) = future.await {
debug!("Dropping message: on_message_input callback returned error: {error:?}");
continue;
}
}

let mut bytes = mavlink::peek_reader::PeekReader::new(message.raw_bytes());
let mut bytes = mavlink::async_peek_reader::AsyncPeekReader::new(message.raw_bytes());
let (header, message): (mavlink::MavHeader, mavlink::ardupilotmega::MavMessage) =
mavlink::read_v2_msg(&mut bytes).unwrap();
mavlink::read_v2_msg_async(&mut bytes).await.unwrap();
if self.print {
println!("Message received: {header:?} {message:?}");
} else {
Expand All @@ -87,6 +94,13 @@ impl Driver for FakeSink {
}
}

#[async_trait::async_trait]
impl DriverStats for FakeSink {
async fn stats(&self) -> DriverStatsInfo {
self.stats.read().await.clone()
}
}

pub struct FakeSinkInfo;
impl DriverInfo for FakeSinkInfo {
fn name(&self) -> &str {
Expand Down Expand Up @@ -127,13 +141,15 @@ impl DriverInfo for FakeSinkInfo {
pub struct FakeSource {
period: std::time::Duration,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<DriverStatsInfo>>,
}

impl FakeSource {
pub fn builder(period: std::time::Duration) -> FakeSourceBuilder {
FakeSourceBuilder(Self {
period,
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
})
}
}
Expand Down Expand Up @@ -194,6 +210,12 @@ impl Driver for FakeSource {
async move {
trace!("Fake message created: {message:?}");

self.stats
.write()
.await
.update_output(Arc::clone(&message))
.await;

for future in self.on_message_output.call_all(Arc::clone(&message)) {
if let Err(error) = future.await {
debug!(
Expand All @@ -219,6 +241,13 @@ impl Driver for FakeSource {
}
}

#[async_trait::async_trait]
impl DriverStats for FakeSource {
async fn stats(&self) -> DriverStatsInfo {
todo!()
}
}

pub struct FakeSourceInfo;
impl DriverInfo for FakeSourceInfo {
fn name(&self) -> &str {
Expand Down
20 changes: 18 additions & 2 deletions src/drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use std::sync::Arc;

use anyhow::Result;
use regex::Regex;
use stats::DriverStats;
use tokio::sync::broadcast;
use tracing::*;
use url::Url;

use crate::protocol::Protocol;
use crate::{drivers::stats::DriverStatsInfo, protocol::Protocol};

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub enum Type {
Expand All @@ -37,7 +38,7 @@ pub struct DriverDescriptionLegacy {
}

#[async_trait::async_trait]
pub trait Driver: Send + Sync {
pub trait Driver: Send + Sync + DriverStats {
async fn run(&self, hub_sender: broadcast::Sender<Arc<Protocol>>) -> Result<()>;
fn info(&self) -> Box<dyn DriverInfo>;
}
Expand Down Expand Up @@ -243,13 +244,15 @@ mod tests {
pub struct ExampleDriver {
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<DriverStatsInfo>>,
}

impl ExampleDriver {
pub fn new() -> ExampleDriverBuilder {
ExampleDriverBuilder(Self {
on_message_input: Callbacks::new(),
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
})
}
}
Expand All @@ -276,6 +279,12 @@ mod tests {
let mut hub_receiver = hub_sender.subscribe();

while let Ok(message) = hub_receiver.recv().await {
self.stats
.write()
.await
.update_input(Arc::clone(&message))
.await;

for future in self.on_message_input.call_all(Arc::clone(&message)) {
if let Err(error) = future.await {
debug!(
Expand All @@ -296,6 +305,13 @@ mod tests {
}
}

#[async_trait::async_trait]
impl DriverStats for ExampleDriver {
async fn stats(&self) -> DriverStatsInfo {
self.stats.read().await.clone()
}
}

pub struct ExampleDriverInfo;
impl DriverInfo for ExampleDriverInfo {
fn name(&self) -> &str {
Expand Down
16 changes: 14 additions & 2 deletions src/drivers/serial/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ use anyhow::Result;
use mavlink_server::callbacks::{Callbacks, MessageCallback};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::{broadcast, Mutex},
sync::{broadcast, Mutex, RwLock},
};
use tokio_serial::{self, SerialPortBuilderExt};
use tracing::*;

use crate::{
drivers::{Driver, DriverInfo},
drivers::{
stats::{DriverStats, DriverStatsInfo},
Driver, DriverInfo,
},
protocol::{read_all_messages, Protocol},
};

Expand All @@ -19,6 +22,7 @@ pub struct Serial {
pub baud_rate: u32,
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<DriverStatsInfo>>,
}

pub struct SerialBuilder(Serial);
Expand Down Expand Up @@ -53,6 +57,7 @@ impl Serial {
baud_rate,
on_message_input: Callbacks::new(),
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
})
}

Expand Down Expand Up @@ -175,6 +180,13 @@ impl Driver for Serial {
}
}

#[async_trait::async_trait]
impl DriverStats for Serial {
async fn stats(&self) -> DriverStatsInfo {
self.stats.read().await.clone()
}
}

pub struct SerialInfo;
impl DriverInfo for SerialInfo {
fn name(&self) -> &str {
Expand Down
52 changes: 41 additions & 11 deletions src/drivers/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,59 @@ use std::sync::Arc;

use crate::protocol::Protocol;

struct DriverStats {
timestamp: u64,
#[async_trait::async_trait]
pub trait DriverStats {
async fn stats(&self) -> DriverStatsInfo;
}

#[derive(Default, Debug, Clone)]
pub struct DriverStatsInfo {
input: Option<DriverStatsInfoInner>,
output: Option<DriverStatsInfoInner>,
}

impl DriverStatsInfo {
pub async fn update_input(&mut self, message: Arc<Protocol>) {
if let Some(stats) = self.input.as_mut() {
stats.update(message).await;
} else {
self.input.replace(DriverStatsInfoInner::default());
}
}

pub async fn update_output(&mut self, message: Arc<Protocol>) {
if let Some(stats) = self.output.as_mut() {
stats.update(message).await;
} else {
self.output.replace(DriverStatsInfoInner::default());
}
}
}

#[derive(Clone, Debug)]
struct DriverStatsInfoInner {
last_message: u64,
messages: usize,
bytes: usize,
delay: u64,
}

impl Default for DriverStats {
impl Default for DriverStatsInfoInner {
fn default() -> Self {
Self {
timestamp: chrono::Utc::now().timestamp_micros() as u64,
..Default::default()
last_message: chrono::Utc::now().timestamp_micros() as u64,
messages: 0,
bytes: 0,
delay: 0,
}
}
}

impl DriverStats {
async fn update(mut self, message: Arc<Protocol>) -> Self {
self.timestamp = chrono::Utc::now().timestamp_micros() as u64;
impl DriverStatsInfoInner {
pub async fn update(&mut self, message: Arc<Protocol>) {
self.last_message = chrono::Utc::now().timestamp_micros() as u64;
self.bytes += message.raw_bytes().len();
self.messages += 1;
self.delay = self.timestamp - message.timestamp;

self
self.delay = self.last_message - message.timestamp;
}
}
19 changes: 16 additions & 3 deletions src/drivers/tcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ use std::sync::Arc;

use anyhow::Result;
use mavlink_server::callbacks::{Callbacks, MessageCallback};
use tokio::{net::TcpStream, sync::broadcast};
use tokio::{
net::TcpStream,
sync::{broadcast, RwLock},
};
use tracing::*;

use crate::{
drivers::{
stats::{DriverStats, DriverStatsInfo},
tcp::{tcp_receive_task, tcp_send_task},
Driver, DriverInfo,
},
Expand All @@ -17,6 +21,7 @@ pub struct TcpClient {
pub remote_addr: String,
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<DriverStatsInfo>>,
}

pub struct TcpClientBuilder(TcpClient);
Expand Down Expand Up @@ -50,6 +55,7 @@ impl TcpClient {
remote_addr: remote_addr.to_string(),
on_message_input: Callbacks::new(),
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
})
}
}
Expand Down Expand Up @@ -77,12 +83,12 @@ impl Driver for TcpClient {
let hub_sender_cloned = Arc::clone(&hub_sender);

tokio::select! {
result = tcp_receive_task(read, server_addr, hub_sender_cloned, &self.on_message_input) => {
result = tcp_receive_task(read, server_addr, hub_sender_cloned, &self.on_message_input, &self.stats) => {
if let Err(e) = result {
error!("Error in TCP receive task: {e:?}");
}
}
result = tcp_send_task(write, server_addr, hub_receiver, &self.on_message_output) => {
result = tcp_send_task(write, server_addr, hub_receiver, &self.on_message_output, &self.stats) => {
if let Err(e) = result {
error!("Error in TCP send task: {e:?}");
}
Expand All @@ -99,6 +105,13 @@ impl Driver for TcpClient {
}
}

#[async_trait::async_trait]
impl DriverStats for TcpClient {
async fn stats(&self) -> DriverStatsInfo {
self.stats.read().await.clone()
}
}

pub struct TcpClientInfo;
impl DriverInfo for TcpClientInfo {
fn name(&self) -> &str {
Expand Down
Loading

0 comments on commit 875c121

Please sign in to comment.