Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Sep 19, 2024
1 parent d6d109d commit f9602a5
Show file tree
Hide file tree
Showing 19 changed files with 519 additions and 279 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ shellexpand = "3.1"
tokio = { version = "1", features = ["full"] }
tokio-serial = "5.4.4"
url = { version = "2.5.2", features = ["serde"] }
uuid = { version = "1", features = ["v5"] }

tracing = { version = "0.1.40", features = ["log", "async-await"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
Expand Down
78 changes: 48 additions & 30 deletions src/drivers/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,28 @@ use tracing::*;
use crate::{
drivers::{Driver, DriverInfo},
protocol::{read_all_messages, Protocol},
stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider},
stats::{
accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider},
driver::DriverUuid,
},
};

pub struct FakeSink {
name: Arc<std::sync::RwLock<String>>,
uuid: DriverUuid,
on_message_input: Callbacks<Arc<Protocol>>,
print: bool,
stats: Arc<RwLock<AccumulatedDriverStats>>,
stats: Arc<RwLock<AccumulatedDriverStatsInner>>,
}

impl FakeSink {
pub fn builder() -> FakeSinkBuilder {
pub fn builder(name: &str) -> FakeSinkBuilder {
FakeSinkBuilder(Self {
name: Arc::new(std::sync::RwLock::new(name.to_string())),
uuid: Self::generate_uuid(name),
on_message_input: Callbacks::new(),
print: false,
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())),
})
}
}
Expand Down Expand Up @@ -80,16 +87,24 @@ impl Driver for FakeSink {
fn info(&self) -> Box<dyn DriverInfo> {
Box::new(FakeSinkInfo)
}

fn name(&self) -> Arc<std::sync::RwLock<String>> {
self.name.clone()
}

fn uuid(&self) -> &DriverUuid {
&self.uuid
}
}

#[async_trait::async_trait]
impl AccumulatedDriverStatsProvider for FakeSink {
async fn stats(&self) -> AccumulatedDriverStats {
async fn stats(&self) -> AccumulatedDriverStatsInner {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = AccumulatedDriverStats {
*self.stats.write().await = AccumulatedDriverStatsInner {
input: None,
output: None,
}
Expand All @@ -98,16 +113,12 @@ impl AccumulatedDriverStatsProvider for FakeSink {

pub struct FakeSinkInfo;
impl DriverInfo for FakeSinkInfo {
fn name(&self) -> &str {
fn name(&self) -> &'static str {
"FakeSink"
}

fn valid_schemes(&self) -> Vec<String> {
vec![
"fakeclient".to_string(),
"fakesink".to_string(),
"fakec".to_string(),
]
fn valid_schemes(&self) -> &'static [&'static str] {
&["fakeclient", "fakesink", "fakec"]
}

fn cli_example_legacy(&self) -> Vec<String> {
Expand All @@ -129,22 +140,26 @@ impl DriverInfo for FakeSinkInfo {
}

fn create_endpoint_from_url(&self, _url: &url::Url) -> Option<Arc<dyn Driver>> {
Some(Arc::new(FakeSink::builder().build()))
None
}
}

pub struct FakeSource {
name: Arc<std::sync::RwLock<String>>,
uuid: DriverUuid,
period: std::time::Duration,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
stats: Arc<RwLock<AccumulatedDriverStatsInner>>,
}

impl FakeSource {
pub fn builder(period: std::time::Duration) -> FakeSourceBuilder {
pub fn builder(name: &str, period: std::time::Duration) -> FakeSourceBuilder {
FakeSourceBuilder(Self {
name: Arc::new(std::sync::RwLock::new(name.to_string())),
uuid: Self::generate_uuid(name),
period,
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())),
})
}
}
Expand Down Expand Up @@ -234,16 +249,24 @@ impl Driver for FakeSource {
fn info(&self) -> Box<dyn DriverInfo> {
Box::new(FakeSourceInfo)
}

fn name(&self) -> Arc<std::sync::RwLock<String>> {
self.name.clone()
}

fn uuid(&self) -> &DriverUuid {
&self.uuid
}
}

#[async_trait::async_trait]
impl AccumulatedDriverStatsProvider for FakeSource {
async fn stats(&self) -> AccumulatedDriverStats {
async fn stats(&self) -> AccumulatedDriverStatsInner {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = AccumulatedDriverStats {
*self.stats.write().await = AccumulatedDriverStatsInner {
input: None,
output: None,
}
Expand All @@ -252,17 +275,12 @@ impl AccumulatedDriverStatsProvider for FakeSource {

pub struct FakeSourceInfo;
impl DriverInfo for FakeSourceInfo {
fn name(&self) -> &str {
fn name(&self) -> &'static str {
"FakeSource"
}

fn valid_schemes(&self) -> Vec<String> {
vec![
"fakesource".to_string(),
"fakeserver".to_string(),
"fakesrc".to_string(),
"fakes".to_string(),
]
fn valid_schemes(&self) -> &'static [&'static str] {
&["fakesource", "fakeserver", "fakesrc", "fakes"]
}

fn cli_example_legacy(&self) -> Vec<String> {
Expand All @@ -284,7 +302,7 @@ impl DriverInfo for FakeSourceInfo {
}

fn create_endpoint_from_url(&self, _url: &url::Url) -> Option<Arc<dyn Driver>> {
Some(Arc::new(FakeSink::builder().print().build()))
None
}
}

Expand All @@ -309,7 +327,7 @@ mod test {
let sink_messages = Arc::new(RwLock::new(Vec::<Arc<Protocol>>::with_capacity(1000)));

// FakeSink and task
let sink = FakeSink::builder()
let sink = FakeSink::builder("test")
.on_message_input({
let sink_messages = sink_messages.clone();

Expand All @@ -330,7 +348,7 @@ mod test {
});

// FakeSource and task
let source = FakeSource::builder(message_period)
let source = FakeSource::builder("test", message_period)
.on_message_output({
let source_messages = source_messages.clone();
move |message: Arc<Protocol>| {
Expand Down
75 changes: 49 additions & 26 deletions src/drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,38 @@ pub struct DriverDescriptionLegacy {
pub trait Driver: Send + Sync + AccumulatedDriverStatsProvider {
async fn run(&self, hub_sender: broadcast::Sender<Arc<Protocol>>) -> Result<()>;
fn info(&self) -> Box<dyn DriverInfo>;

fn name(&self) -> Arc<std::sync::RwLock<String>>;

fn uuid(&self) -> &uuid::Uuid;

fn generate_uuid(name: &str) -> uuid::Uuid
where
Self: Sized,
{
uuid::Uuid::new_v5(
&uuid::Uuid::NAMESPACE_DNS,
format!(
"{typ}:{name}",
typ = std::any::type_name::<Self>(),
name = name
)
.as_bytes(),
)
}
}

pub trait DriverInfo: Sync + Send {
fn name(&self) -> &str;

fn valid_schemes(&self) -> Vec<String>;
fn name(&self) -> &'static str;
fn valid_schemes(&self) -> &'static [&'static str];
// CLI helpers
fn cli_example_legacy(&self) -> Vec<String>;
fn cli_example_url(&self) -> Vec<String>;

fn create_endpoint_from_url(&self, url: &Url) -> Option<Arc<dyn Driver>>;

fn default_scheme(&self) -> String {
self.valid_schemes().first().unwrap().clone()
fn default_scheme(&self) -> &'static str {
self.valid_schemes().first().unwrap()
}

// This is mostly used by network based endpoints, other endpoints can overwrite it
Expand Down Expand Up @@ -127,12 +145,9 @@ fn process_old_format(entry: &str) -> Option<DriverDescriptionLegacy> {
let arg2 = captures.name("arg2").map(|m| m.as_str());

let endpoints = endpoints();
let endpoint = endpoints.iter().find(|endpoint| {
endpoint
.driver_ext
.valid_schemes()
.contains(&prefix.to_string())
})?;
let endpoint = endpoints
.iter()
.find(|endpoint| endpoint.driver_ext.valid_schemes().contains(&prefix))?;

Some(DriverDescriptionLegacy {
typ: endpoint.typ.clone(),
Expand All @@ -155,10 +170,7 @@ pub fn create_driver_from_entry(entry: &str) -> Result<Arc<dyn Driver>, String>
}

let url = Url::parse(entry).map_err(|e| e.to_string()).ok()?;
if driver_ext
.valid_schemes()
.contains(&url.scheme().to_string())
{
if driver_ext.valid_schemes().contains(&url.scheme()) {
return driver_ext.create_endpoint_from_url(&url);
}
None
Expand Down Expand Up @@ -224,7 +236,7 @@ mod tests {
use tokio::sync::RwLock;
use tracing::*;

use crate::stats::accumulated::driver::AccumulatedDriverStats;
use crate::stats::{accumulated::driver::AccumulatedDriverStatsInner, driver::DriverUuid};

use super::*;

Expand All @@ -242,17 +254,21 @@ mod tests {

// Example struct implementing Driver
pub struct ExampleDriver {
name: Arc<std::sync::RwLock<String>>,
uuid: DriverUuid,
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
stats: Arc<RwLock<AccumulatedDriverStatsInner>>,
}

impl ExampleDriver {
pub fn new() -> ExampleDriverBuilder {
pub fn new(name: &str, id: &str) -> ExampleDriverBuilder {
ExampleDriverBuilder(Self {
name: Arc::new(std::sync::RwLock::new(name.to_string())),
uuid: Self::generate_uuid(&format!("{name}:{id}")),
on_message_input: Callbacks::new(),
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())),
})
}
}
Expand Down Expand Up @@ -299,16 +315,24 @@ mod tests {
fn info(&self) -> Box<dyn DriverInfo> {
Box::new(ExampleDriverInfo)
}

fn name(&self) -> Arc<std::sync::RwLock<String>> {
self.name.clone()
}

fn uuid(&self) -> &DriverUuid {
&self.uuid
}
}

#[async_trait::async_trait]
impl AccumulatedDriverStatsProvider for ExampleDriver {
async fn stats(&self) -> AccumulatedDriverStats {
async fn stats(&self) -> AccumulatedDriverStatsInner {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = AccumulatedDriverStats {
*self.stats.write().await = AccumulatedDriverStatsInner {
input: None,
output: None,
}
Expand All @@ -317,12 +341,11 @@ mod tests {

pub struct ExampleDriverInfo;
impl DriverInfo for ExampleDriverInfo {
fn name(&self) -> &str {
fn name(&self) -> &'static str {
"ExampleDriver"
}

fn valid_schemes(&self) -> Vec<String> {
vec!["exampledriver".to_string()]
fn valid_schemes(&self) -> &'static [&'static str] {
&["exampledriver"]
}

fn cli_example_legacy(&self) -> Vec<String> {
Expand All @@ -343,7 +366,7 @@ mod tests {
let (sender, _receiver) = tokio::sync::broadcast::channel(1);

let called = Arc::new(RwLock::new(false));
let driver = ExampleDriver::new()
let driver = ExampleDriver::new("test", "test")
.on_message_input({
let called = called.clone();
move |_msg| {
Expand Down
Loading

0 comments on commit f9602a5

Please sign in to comment.