Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Sep 19, 2024
1 parent 2a0e166 commit 21277a0
Show file tree
Hide file tree
Showing 19 changed files with 649 additions and 466 deletions.
298 changes: 124 additions & 174 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 14 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,46 @@ bench = false

[dependencies]
anyhow = "1"
arc-swap = "1.7"
async-trait = "0.1.81"
axum = { version = "0.7.5", features = ["ws"] }
byteorder = "1.5.0"
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.5", features = ["derive"] }
ctrlc = "3.4"
futures = "0.3"
lazy_static = "1.5.0"
include_dir = "0.7.4"
indexmap = "2.5.0"
# mavlink = { version = "0.13.1", default-features = false, features = ["ardupilotmega", "std"] }
json5 = "0.4.1"
lazy_static = "1.5.0"
# mavlink = { default-features = false, features = ["ardupilotmega", "std", "tokio-1"], path = "../rust-mavlink/mavlink" }
# mavlink = { version = "0.13.1", default-features = false, features = ["ardupilotmega", "std"] }
mavlink = { default-features = false, features = ["ardupilotmega", "serde", "std", "tokio-1"], git = "https://github.com/joaoantoniocardoso/rust-mavlink", branch = "add-tokio" }
regex = "1.10.6"
serde = "1"
serde_derive = "1.0.210"
serde_json = "1.0.128"
shellexpand = "3.1"
tokio = { version = "1", features = ["full"] }
tokio-serial = "5.4.4"
url = { version = "2.5.2", features = ["serde"] }
uuid = { version = "1", features = ["v5", "v4"] }

tracing = { version = "0.1.40", features = ["log", "async-await"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing-log = "0.2.0"
# This is a custom revision adding file name suffix on top of tracing-appender-0.2.2" "0.2.2", but compatible with the current tracing version.
# Reference: https://github.com/tokio-rs/tracing/issues/2441
tracing-appender = { git = "https://github.com/joaoantoniocardoso/tracing", branch = "tracing-appender-0.2.2-with-filename-suffix" }
serde_json = "1.0.128"
serde_derive = "1.0.210"
json5 = "0.4.1"
axum = { version = "0.7.5", features = ["ws"] }
include_dir = "0.7.4"
uuid = { version = "1.10.0", features = ["v4"] }

[dev-dependencies]
criterion = "0.5"
tokio = { version = "1", features = ["full"] }

[build-dependencies]
vergen-gix = { version = "1.0.0-beta.2", default-features = false, features = ["build", "cargo"] }

[profile.profiling]
inherits = "release"
debug = 1
jemalloc = false
78 changes: 48 additions & 30 deletions src/drivers/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,28 @@ use tracing::*;
use crate::{
drivers::{Driver, DriverInfo},
protocol::{read_all_messages, Protocol},
stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider},
stats::{
accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider},
driver::DriverUuid,
},
};

pub struct FakeSink {
name: arc_swap::ArcSwap<String>,
uuid: DriverUuid,
on_message_input: Callbacks<Arc<Protocol>>,
print: bool,
stats: Arc<RwLock<AccumulatedDriverStats>>,
stats: Arc<RwLock<AccumulatedDriverStatsInner>>,
}

impl FakeSink {
pub fn builder() -> FakeSinkBuilder {
pub fn builder(name: &str) -> FakeSinkBuilder {
FakeSinkBuilder(Self {
name: arc_swap::ArcSwap::new(Arc::new(name.to_string())),
uuid: Self::generate_uuid(name),
on_message_input: Callbacks::new(),
print: false,
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())),
})
}
}
Expand Down Expand Up @@ -80,16 +87,24 @@ impl Driver for FakeSink {
fn info(&self) -> Box<dyn DriverInfo> {
Box::new(FakeSinkInfo)
}

fn name(&self) -> Arc<String> {
self.name.load_full()
}

fn uuid(&self) -> &DriverUuid {
&self.uuid
}
}

#[async_trait::async_trait]
impl AccumulatedDriverStatsProvider for FakeSink {
async fn stats(&self) -> AccumulatedDriverStats {
async fn stats(&self) -> AccumulatedDriverStatsInner {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = AccumulatedDriverStats {
*self.stats.write().await = AccumulatedDriverStatsInner {
input: None,
output: None,
}
Expand All @@ -98,16 +113,12 @@ impl AccumulatedDriverStatsProvider for FakeSink {

pub struct FakeSinkInfo;
impl DriverInfo for FakeSinkInfo {
fn name(&self) -> &str {
fn name(&self) -> &'static str {
"FakeSink"
}

fn valid_schemes(&self) -> Vec<String> {
vec![
"fakeclient".to_string(),
"fakesink".to_string(),
"fakec".to_string(),
]
fn valid_schemes(&self) -> &'static [&'static str] {
&["fakeclient", "fakesink", "fakec"]
}

fn cli_example_legacy(&self) -> Vec<String> {
Expand All @@ -129,22 +140,26 @@ impl DriverInfo for FakeSinkInfo {
}

fn create_endpoint_from_url(&self, _url: &url::Url) -> Option<Arc<dyn Driver>> {
Some(Arc::new(FakeSink::builder().build()))
None
}
}

pub struct FakeSource {
name: arc_swap::ArcSwap<String>,
uuid: DriverUuid,
period: std::time::Duration,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
stats: Arc<RwLock<AccumulatedDriverStatsInner>>,
}

impl FakeSource {
pub fn builder(period: std::time::Duration) -> FakeSourceBuilder {
pub fn builder(name: &str, period: std::time::Duration) -> FakeSourceBuilder {
FakeSourceBuilder(Self {
name: arc_swap::ArcSwap::new(Arc::new(name.to_string())),
uuid: Self::generate_uuid(name),
period,
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())),
})
}
}
Expand Down Expand Up @@ -234,16 +249,24 @@ impl Driver for FakeSource {
fn info(&self) -> Box<dyn DriverInfo> {
Box::new(FakeSourceInfo)
}

fn name(&self) -> Arc<String> {
self.name.load_full()
}

fn uuid(&self) -> &DriverUuid {
&self.uuid
}
}

#[async_trait::async_trait]
impl AccumulatedDriverStatsProvider for FakeSource {
async fn stats(&self) -> AccumulatedDriverStats {
async fn stats(&self) -> AccumulatedDriverStatsInner {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = AccumulatedDriverStats {
*self.stats.write().await = AccumulatedDriverStatsInner {
input: None,
output: None,
}
Expand All @@ -252,17 +275,12 @@ impl AccumulatedDriverStatsProvider for FakeSource {

pub struct FakeSourceInfo;
impl DriverInfo for FakeSourceInfo {
fn name(&self) -> &str {
fn name(&self) -> &'static str {
"FakeSource"
}

fn valid_schemes(&self) -> Vec<String> {
vec![
"fakesource".to_string(),
"fakeserver".to_string(),
"fakesrc".to_string(),
"fakes".to_string(),
]
fn valid_schemes(&self) -> &'static [&'static str] {
&["fakesource", "fakeserver", "fakesrc", "fakes"]
}

fn cli_example_legacy(&self) -> Vec<String> {
Expand All @@ -284,7 +302,7 @@ impl DriverInfo for FakeSourceInfo {
}

fn create_endpoint_from_url(&self, _url: &url::Url) -> Option<Arc<dyn Driver>> {
Some(Arc::new(FakeSink::builder().print().build()))
None
}
}

Expand All @@ -309,7 +327,7 @@ mod test {
let sink_messages = Arc::new(RwLock::new(Vec::<Arc<Protocol>>::with_capacity(1000)));

// FakeSink and task
let sink = FakeSink::builder()
let sink = FakeSink::builder("test")
.on_message_input({
let sink_messages = sink_messages.clone();

Expand All @@ -330,7 +348,7 @@ mod test {
});

// FakeSource and task
let source = FakeSource::builder(message_period)
let source = FakeSource::builder("test", message_period)
.on_message_output({
let source_messages = source_messages.clone();
move |message: Arc<Protocol>| {
Expand Down
Loading

0 comments on commit 21277a0

Please sign in to comment.