Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Sep 15, 2024
1 parent 4dde7df commit 692aa43
Show file tree
Hide file tree
Showing 19 changed files with 390 additions and 160 deletions.
22 changes: 11 additions & 11 deletions src/drivers/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ use tracing::*;
use crate::{
drivers::{Driver, DriverInfo},
protocol::{read_all_messages, Protocol},
stats::driver::{DriverStats, DriverStatsInfo},
stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider},
};

pub struct FakeSink {
on_message_input: Callbacks<Arc<Protocol>>,
print: bool,
stats: Arc<RwLock<DriverStatsInfo>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
}

impl FakeSink {
pub fn builder() -> FakeSinkBuilder {
FakeSinkBuilder(Self {
on_message_input: Callbacks::new(),
print: false,
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
})
}
}
Expand Down Expand Up @@ -93,13 +93,13 @@ impl Driver for FakeSink {
}

#[async_trait::async_trait]
impl DriverStats for FakeSink {
async fn stats(&self) -> DriverStatsInfo {
impl AccumulatedDriverStatsProvider for FakeSink {
async fn stats(&self) -> AccumulatedDriverStats {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = DriverStatsInfo {
*self.stats.write().await = AccumulatedDriverStats {
input: None,
output: None,
}
Expand Down Expand Up @@ -146,15 +146,15 @@ impl DriverInfo for FakeSinkInfo {
pub struct FakeSource {
period: std::time::Duration,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<DriverStatsInfo>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
}

impl FakeSource {
pub fn builder(period: std::time::Duration) -> FakeSourceBuilder {
FakeSourceBuilder(Self {
period,
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
})
}
}
Expand Down Expand Up @@ -248,13 +248,13 @@ impl Driver for FakeSource {
}

#[async_trait::async_trait]
impl DriverStats for FakeSource {
async fn stats(&self) -> DriverStatsInfo {
impl AccumulatedDriverStatsProvider for FakeSource {
async fn stats(&self) -> AccumulatedDriverStats {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = DriverStatsInfo {
*self.stats.write().await = AccumulatedDriverStats {
input: None,
output: None,
}
Expand Down
16 changes: 8 additions & 8 deletions src/drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::sync::broadcast;
use tracing::*;
use url::Url;

use crate::{protocol::Protocol, stats::driver::DriverStats};
use crate::{protocol::Protocol, stats::driver::AccumulatedDriverStatsProvider};

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub enum Type {
Expand All @@ -36,7 +36,7 @@ pub struct DriverDescriptionLegacy {
}

#[async_trait::async_trait]
pub trait Driver: Send + Sync + DriverStats {
pub trait Driver: Send + Sync + AccumulatedDriverStatsProvider {
async fn run(&self, hub_sender: broadcast::Sender<Arc<Protocol>>) -> Result<()>;
fn info(&self) -> Box<dyn DriverInfo>;
}
Expand Down Expand Up @@ -224,7 +224,7 @@ mod tests {
use tokio::sync::RwLock;
use tracing::*;

use crate::stats::driver::DriverStatsInfo;
use crate::stats::driver::AccumulatedDriverStats;

use super::*;

Expand All @@ -244,15 +244,15 @@ mod tests {
pub struct ExampleDriver {
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<DriverStatsInfo>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
}

impl ExampleDriver {
pub fn new() -> ExampleDriverBuilder {
ExampleDriverBuilder(Self {
on_message_input: Callbacks::new(),
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
})
}
}
Expand Down Expand Up @@ -306,13 +306,13 @@ mod tests {
}

#[async_trait::async_trait]
impl DriverStats for ExampleDriver {
async fn stats(&self) -> DriverStatsInfo {
impl AccumulatedDriverStatsProvider for ExampleDriver {
async fn stats(&self) -> AccumulatedDriverStats {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = DriverStatsInfo {
*self.stats.write().await = AccumulatedDriverStats {
input: None,
output: None,
}
Expand Down
12 changes: 6 additions & 6 deletions src/drivers/serial/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use tracing::*;
use crate::{
drivers::{Driver, DriverInfo},
protocol::{read_all_messages, Protocol},
stats::driver::{DriverStats, DriverStatsInfo},
stats::driver::{AccumulatedDriverStats, AccumulatedDriverStatsProvider},
};

pub struct Serial {
pub port_name: String,
pub baud_rate: u32,
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<DriverStatsInfo>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
}

pub struct SerialBuilder(Serial);
Expand Down Expand Up @@ -55,7 +55,7 @@ impl Serial {
baud_rate,
on_message_input: Callbacks::new(),
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
})
}

Expand Down Expand Up @@ -179,13 +179,13 @@ impl Driver for Serial {
}

#[async_trait::async_trait]
impl DriverStats for Serial {
async fn stats(&self) -> DriverStatsInfo {
impl AccumulatedDriverStatsProvider for Serial {
async fn stats(&self) -> AccumulatedDriverStats {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = DriverStatsInfo {
*self.stats.write().await = AccumulatedDriverStats {
input: None,
output: None,
}
Expand Down
12 changes: 6 additions & 6 deletions src/drivers/tcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use crate::{
Driver, DriverInfo,
},
protocol::Protocol,
stats::driver::{DriverStats, DriverStatsInfo},
stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats},
};

pub struct TcpClient {
pub remote_addr: String,
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<DriverStatsInfo>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
}

pub struct TcpClientBuilder(TcpClient);
Expand Down Expand Up @@ -55,7 +55,7 @@ impl TcpClient {
remote_addr: remote_addr.to_string(),
on_message_input: Callbacks::new(),
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
})
}
}
Expand Down Expand Up @@ -105,13 +105,13 @@ impl Driver for TcpClient {
}

#[async_trait::async_trait]
impl DriverStats for TcpClient {
async fn stats(&self) -> DriverStatsInfo {
impl AccumulatedDriverStatsProvider for TcpClient {
async fn stats(&self) -> AccumulatedDriverStats {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = DriverStatsInfo {
*self.stats.write().await = AccumulatedDriverStats {
input: None,
output: None,
}
Expand Down
6 changes: 3 additions & 3 deletions src/drivers/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::*;

use crate::{
protocol::{read_all_messages, Protocol},
stats::driver::DriverStatsInfo,
stats::driver::AccumulatedDriverStats,
};

pub mod client;
Expand All @@ -24,7 +24,7 @@ async fn tcp_receive_task(
remote_addr: &str,
hub_sender: Arc<broadcast::Sender<Arc<Protocol>>>,
on_message_input: &Callbacks<Arc<Protocol>>,
stats: &Arc<RwLock<DriverStatsInfo>>,
stats: &Arc<RwLock<AccumulatedDriverStats>>,
) -> Result<()> {
let mut buf = Vec::with_capacity(1024);

Expand Down Expand Up @@ -67,7 +67,7 @@ async fn tcp_send_task(
remote_addr: &str,
mut hub_receiver: broadcast::Receiver<Arc<Protocol>>,
on_message_output: &Callbacks<Arc<Protocol>>,
stats: &Arc<RwLock<DriverStatsInfo>>,
stats: &Arc<RwLock<AccumulatedDriverStats>>,
) -> Result<()> {
loop {
let message = match hub_receiver.recv().await {
Expand Down
14 changes: 7 additions & 7 deletions src/drivers/tcp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ use crate::{
Driver, DriverInfo,
},
protocol::Protocol,
stats::driver::{DriverStats, DriverStatsInfo},
stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats},
};

#[derive(Clone)]
pub struct TcpServer {
pub local_addr: String,
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<DriverStatsInfo>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
}

pub struct TcpServerBuilder(TcpServer);
Expand Down Expand Up @@ -56,7 +56,7 @@ impl TcpServer {
local_addr: local_addr.to_string(),
on_message_input: Callbacks::new(),
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
})
}

Expand All @@ -71,7 +71,7 @@ impl TcpServer {
hub_sender: Arc<broadcast::Sender<Arc<Protocol>>>,
on_message_input: Callbacks<Arc<Protocol>>,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<DriverStatsInfo>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
) -> Result<()> {
let hub_receiver = hub_sender.subscribe();

Expand Down Expand Up @@ -132,13 +132,13 @@ impl Driver for TcpServer {
}

#[async_trait::async_trait]
impl DriverStats for TcpServer {
async fn stats(&self) -> DriverStatsInfo {
impl AccumulatedDriverStatsProvider for TcpServer {
async fn stats(&self) -> AccumulatedDriverStats {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = DriverStatsInfo {
*self.stats.write().await = AccumulatedDriverStats {
input: None,
output: None,
}
Expand Down
12 changes: 6 additions & 6 deletions src/drivers/tlog/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use tracing::*;
use crate::{
drivers::{Driver, DriverInfo},
protocol::Protocol,
stats::driver::{DriverStats, DriverStatsInfo},
stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats},
};

pub struct TlogReader {
pub path: PathBuf,
on_message_input: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<DriverStatsInfo>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
}

pub struct TlogReaderBuilder(TlogReader);
Expand All @@ -41,7 +41,7 @@ impl TlogReader {
TlogReaderBuilder(Self {
path,
on_message_input: Callbacks::new(),
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
})
}

Expand Down Expand Up @@ -141,13 +141,13 @@ impl Driver for TlogReader {
}

#[async_trait::async_trait]
impl DriverStats for TlogReader {
async fn stats(&self) -> DriverStatsInfo {
impl AccumulatedDriverStatsProvider for TlogReader {
async fn stats(&self) -> AccumulatedDriverStats {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = DriverStatsInfo {
*self.stats.write().await = AccumulatedDriverStats {
input: None,
output: None,
}
Expand Down
12 changes: 6 additions & 6 deletions src/drivers/tlog/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use tracing::*;
use crate::{
drivers::{Driver, DriverInfo},
protocol::Protocol,
stats::driver::{DriverStats, DriverStatsInfo},
stats::driver::{AccumulatedDriverStatsProvider, AccumulatedDriverStats},
};

pub struct TlogWriter {
pub path: PathBuf,
on_message_output: Callbacks<Arc<Protocol>>,
stats: Arc<RwLock<DriverStatsInfo>>,
stats: Arc<RwLock<AccumulatedDriverStats>>,
}

pub struct TlogWriterBuilder(TlogWriter);
Expand All @@ -42,7 +42,7 @@ impl TlogWriter {
TlogWriterBuilder(Self {
path,
on_message_output: Callbacks::new(),
stats: Arc::new(RwLock::new(DriverStatsInfo::default())),
stats: Arc::new(RwLock::new(AccumulatedDriverStats::default())),
})
}

Expand Down Expand Up @@ -109,13 +109,13 @@ impl Driver for TlogWriter {
}

#[async_trait::async_trait]
impl DriverStats for TlogWriter {
async fn stats(&self) -> DriverStatsInfo {
impl AccumulatedDriverStatsProvider for TlogWriter {
async fn stats(&self) -> AccumulatedDriverStats {
self.stats.read().await.clone()
}

async fn reset_stats(&self) {
*self.stats.write().await = DriverStatsInfo {
*self.stats.write().await = AccumulatedDriverStats {
input: None,
output: None,
}
Expand Down
Loading

0 comments on commit 692aa43

Please sign in to comment.