Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Sep 19, 2024
1 parent d6d109d commit fa34620
Show file tree
Hide file tree
Showing 17 changed files with 382 additions and 195 deletions.
49 changes: 34 additions & 15 deletions src/drivers/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,28 @@ use tracing::*;
use crate::{
drivers::{Driver, DriverInfo},
protocol::{read_all_messages, Protocol},
stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider},
stats::{
accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider},
driver::DriverId,
},
};

pub struct FakeSink {
name: String,
uuid: DriverId,
on_message_input: Callbacks<Arc<Protocol>>,
print: bool,
stats: Arc<RwLock<AccumulatedDriverStats>>,
stats: Arc<RwLock<AccumulatedDriverStatsInner>>,
}

impl FakeSink {
pub fn builder() -> FakeSinkBuilder {
pub fn builder(name: &str) -> FakeSinkBuilder {
FakeSinkBuilder(Self {
name: name.to_string(),
uuid: Self::generate_uuid(name),
on_message_input: Callbacks::new(),
print: false,
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())),
})
}
}
Expand Down Expand Up @@ -80,16 +87,20 @@ impl Driver for FakeSink {
fn info(&self) -> Box<dyn DriverInfo> {
Box::new(FakeSinkInfo)
}

fn uuid(&self) -> &DriverId {
&self.uuid
}
}

#[async_trait::async_trait]
impl AccumulatedDriverStatsProvider for FakeSink {
async fn stats(&self) -> AccumulatedDriverStats {
async fn stats(&self) -> AccumulatedDriverStatsInner {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = AccumulatedDriverStats {
*self.stats.write().await = AccumulatedDriverStatsInner {
input: None,
output: None,
}
Expand Down Expand Up @@ -129,22 +140,26 @@ impl DriverInfo for FakeSinkInfo {
}

fn create_endpoint_from_url(&self, _url: &url::Url) -> Option<Arc<dyn Driver>> {
Some(Arc::new(FakeSink::builder().build()))
None
}
}

pub struct FakeSource {
name: String,
uuid: DriverId,
period: std::time::Duration,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
stats: Arc<RwLock<AccumulatedDriverStatsInner>>,
}

impl FakeSource {
pub fn builder(period: std::time::Duration) -> FakeSourceBuilder {
pub fn builder(name: &str, period: std::time::Duration) -> FakeSourceBuilder {
FakeSourceBuilder(Self {
name: name.to_string(),
uuid: Self::generate_uuid(name),
period,
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())),
})
}
}
Expand Down Expand Up @@ -234,16 +249,20 @@ impl Driver for FakeSource {
fn info(&self) -> Box<dyn DriverInfo> {
Box::new(FakeSourceInfo)
}

fn uuid(&self) -> &DriverId {
&self.uuid
}
}

#[async_trait::async_trait]
impl AccumulatedDriverStatsProvider for FakeSource {
async fn stats(&self) -> AccumulatedDriverStats {
async fn stats(&self) -> AccumulatedDriverStatsInner {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = AccumulatedDriverStats {
*self.stats.write().await = AccumulatedDriverStatsInner {
input: None,
output: None,
}
Expand Down Expand Up @@ -284,7 +303,7 @@ impl DriverInfo for FakeSourceInfo {
}

fn create_endpoint_from_url(&self, _url: &url::Url) -> Option<Arc<dyn Driver>> {
Some(Arc::new(FakeSink::builder().print().build()))
None
}
}

Expand All @@ -309,7 +328,7 @@ mod test {
let sink_messages = Arc::new(RwLock::new(Vec::<Arc<Protocol>>::with_capacity(1000)));

// FakeSink and task
let sink = FakeSink::builder()
let sink = FakeSink::builder("test")
.on_message_input({
let sink_messages = sink_messages.clone();

Expand All @@ -330,7 +349,7 @@ mod test {
});

// FakeSource and task
let source = FakeSource::builder(message_period)
let source = FakeSource::builder("test", message_period)
.on_message_output({
let source_messages = source_messages.clone();
move |message: Arc<Protocol>| {
Expand Down
39 changes: 32 additions & 7 deletions src/drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ pub struct DriverDescriptionLegacy {
pub trait Driver: Send + Sync + AccumulatedDriverStatsProvider {
async fn run(&self, hub_sender: broadcast::Sender<Arc<Protocol>>) -> Result<()>;
fn info(&self) -> Box<dyn DriverInfo>;

fn uuid(&self) -> &uuid::Uuid;

fn generate_uuid(name: &str) -> uuid::Uuid
where
Self: Sized,
{
uuid::Uuid::new_v5(
&uuid::Uuid::NAMESPACE_DNS,
format!(
"{typ}:{name}",
typ = std::any::type_name::<Self>(),
name = name
)
.as_bytes(),
)
}
}

pub trait DriverInfo: Sync + Send {
Expand Down Expand Up @@ -224,7 +241,7 @@ mod tests {
use tokio::sync::RwLock;
use tracing::*;

use crate::stats::accumulated::driver::AccumulatedDriverStats;
use crate::stats::{accumulated::driver::AccumulatedDriverStatsInner, driver::DriverId};

use super::*;

Expand All @@ -242,17 +259,21 @@ mod tests {

// Example struct implementing Driver
pub struct ExampleDriver {
name: String,
uuid: uuid::Uuid,
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
stats: Arc<RwLock<AccumulatedDriverStatsInner>>,
}

impl ExampleDriver {
pub fn new() -> ExampleDriverBuilder {
pub fn new(name: &str, id: &str) -> ExampleDriverBuilder {
ExampleDriverBuilder(Self {
name: name.to_string(),
uuid: Self::generate_uuid(&format!("{name}:{id}")),
on_message_input: Callbacks::new(),
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())),
})
}
}
Expand Down Expand Up @@ -299,16 +320,20 @@ mod tests {
fn info(&self) -> Box<dyn DriverInfo> {
Box::new(ExampleDriverInfo)
}

fn uuid(&self) -> &DriverId {
&self.uuid
}
}

#[async_trait::async_trait]
impl AccumulatedDriverStatsProvider for ExampleDriver {
async fn stats(&self) -> AccumulatedDriverStats {
async fn stats(&self) -> AccumulatedDriverStatsInner {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = AccumulatedDriverStats {
*self.stats.write().await = AccumulatedDriverStatsInner {
input: None,
output: None,
}
Expand Down Expand Up @@ -343,7 +368,7 @@ mod tests {
let (sender, _receiver) = tokio::sync::broadcast::channel(1);

let called = Arc::new(RwLock::new(false));
let driver = ExampleDriver::new()
let driver = ExampleDriver::new("test", "test")
.on_message_input({
let called = called.clone();
move |_msg| {
Expand Down
21 changes: 14 additions & 7 deletions src/drivers/serial/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@ use tracing::*;
use crate::{
drivers::{Driver, DriverInfo},
protocol::{read_all_messages, Protocol},
stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider},
stats::{
accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider},
driver::DriverId,
},
};

pub struct Serial {
uuid: uuid::Uuid,
pub port_name: String,
pub baud_rate: u32,
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
stats: Arc<RwLock<AccumulatedDriverStatsInner>>,
}

pub struct SerialBuilder(Serial);
Expand Down Expand Up @@ -51,11 +55,12 @@ impl Serial {
#[instrument(level = "debug")]
pub fn builder(port_name: &str, baud_rate: u32) -> SerialBuilder {
SerialBuilder(Self {
uuid: Self::generate_uuid(&format!("{port_name}:{baud_rate}")),
port_name: port_name.to_string(),
baud_rate,
on_message_input: Callbacks::new(),
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())),
})
}

Expand Down Expand Up @@ -176,28 +181,30 @@ impl Driver for Serial {
fn info(&self) -> Box<dyn DriverInfo> {
Box::new(SerialInfo)
}

fn uuid(&self) -> &DriverId {
&self.uuid
}
}

#[async_trait::async_trait]
impl AccumulatedDriverStatsProvider for Serial {
async fn stats(&self) -> AccumulatedDriverStats {
async fn stats(&self) -> AccumulatedDriverStatsInner {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = AccumulatedDriverStats {
*self.stats.write().await = AccumulatedDriverStatsInner {
input: None,
output: None,
}
}
}

pub struct SerialInfo;
impl DriverInfo for SerialInfo {
fn name(&self) -> &str {
"Serial"
}

fn valid_schemes(&self) -> Vec<String> {
vec!["serial".to_string()]
}
Expand Down
19 changes: 14 additions & 5 deletions src/drivers/tcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@ use crate::{
Driver, DriverInfo,
},
protocol::Protocol,
stats::accumulated::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider},
stats::{
accumulated::driver::{AccumulatedDriverStatsInner, AccumulatedDriverStatsProvider},
driver::DriverId,
},
};

pub struct TcpClient {
pub remote_addr: String,
uuid: DriverId,
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
stats: Arc<RwLock<AccumulatedDriverStatsInner>>,
}

pub struct TcpClientBuilder(TcpClient);
Expand Down Expand Up @@ -53,9 +57,10 @@ impl TcpClient {
pub fn builder(remote_addr: &str) -> TcpClientBuilder {
TcpClientBuilder(Self {
remote_addr: remote_addr.to_string(),
uuid: Self::generate_uuid(remote_addr),
on_message_input: Callbacks::new(),
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStatsInner::default())),
})
}
}
Expand Down Expand Up @@ -102,16 +107,20 @@ impl Driver for TcpClient {
fn info(&self) -> Box<dyn DriverInfo> {
return Box::new(TcpClientInfo);
}

fn uuid(&self) -> &DriverId{
&self.uuid
}
}

#[async_trait::async_trait]
impl AccumulatedDriverStatsProvider for TcpClient {
async fn stats(&self) -> AccumulatedDriverStats {
async fn stats(&self) -> AccumulatedDriverStatsInner {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = AccumulatedDriverStats {
*self.stats.write().await = AccumulatedDriverStatsInner {
input: None,
output: None,
}
Expand Down
6 changes: 3 additions & 3 deletions src/drivers/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::*;

use crate::{
protocol::{read_all_messages, Protocol},
stats::accumulated::driver::AccumulatedDriverStats,
stats::accumulated::driver::AccumulatedDriverStatsInner,
};

pub mod client;
Expand All @@ -24,7 +24,7 @@ async fn tcp_receive_task(
remote_addr: &str,
hub_sender: Arc<broadcast::Sender<Arc<Protocol>>>,
on_message_input: &Callbacks<Arc<Protocol>>,
stats: &Arc<RwLock<AccumulatedDriverStats>>,
stats: &Arc<RwLock<AccumulatedDriverStatsInner>>,
) -> Result<()> {
let mut buf = Vec::with_capacity(1024);

Expand Down Expand Up @@ -67,7 +67,7 @@ async fn tcp_send_task(
remote_addr: &str,
mut hub_receiver: broadcast::Receiver<Arc<Protocol>>,
on_message_output: &Callbacks<Arc<Protocol>>,
stats: &Arc<RwLock<AccumulatedDriverStats>>,
stats: &Arc<RwLock<AccumulatedDriverStatsInner>>,
) -> Result<()> {
loop {
let message = match hub_receiver.recv().await {
Expand Down
Loading

0 comments on commit fa34620

Please sign in to comment.