Skip to content

Commit

Permalink
feat: Test metric for the price - draft
Browse files Browse the repository at this point in the history
  • Loading branch information
czareko committed Sep 27, 2024
1 parent 1d83e0a commit c48bf82
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 44 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
once_cell = "1.10"
prometheus = "0.13"
lazy_static = "1.4"
warp = "0.3.7"


Expand Down
11 changes: 4 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ services:
- ./grafana/prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
networks:
- oracle
grafana:
container_name: grafana
image: grafana/grafana
Expand All @@ -30,8 +28,7 @@ services:
- "3000:3000"
volumes:
- ./grafana/provisioning/datasources:/etc/grafana/provisioning/datasources
networks:
- oracle
networks:
oracle:
driver: bridge
#networks:
# oracle:
# driver: bridge
# external: false
6 changes: 3 additions & 3 deletions grafana/prometheus.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
scrape_configs:
- job_name: 'RingBenchMonitor'
metrics_path: '/actuator/prometheus'
scrape_interval: 3s
metrics_path: '/metrics'
scrape_interval: 1s
static_configs:
- targets: ['host.docker.internal:8080']
- targets: ['host.docker.internal:3030']
labels:
application: 'RingBenchMonitor Prometheus data'
5 changes: 4 additions & 1 deletion src/domain/services/trade_history_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
use crate::domain::entities::trade::{TradeData, TradeSD};
use crate::ports::stats::stats_recorder_service::StatsRecorderService;

// Static singleton for global trade history storage
pub static TRADE_HISTORY: Lazy<Arc<Mutex<VecDeque<TradeData>>>> = Lazy::new(|| {
Expand All @@ -23,9 +24,11 @@ impl TradeHistoryService {
let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;

log::info!("New Trade: {}",&trade_sd.data);

StatsRecorderService::track_price(trade_sd.data.price.parse::<f64>().unwrap_or(0.0));
// Keep trades within a larger window (e.g., 70 seconds)
while let Some(oldest_trade) = trades.front() {
if current_time - oldest_trade.trade_time > 70_000 {
if current_time >= oldest_trade.trade_time && current_time - oldest_trade.trade_time > 70_000 {
trades.pop_front();
} else {
break;
Expand Down
21 changes: 18 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::Arc;
use warp::Filter;
use crate::ports::stats::stats_recorder_service::StatsRecorderService;
use crate::ports::ws_client_trade;

mod ports;
Expand All @@ -11,12 +14,24 @@ async fn main() {
// Log the start of the application
log::info!("Starting application...");

//let trade_history_service = Arc::new(TradeHistoryService);

let websocket_trade_handle = tokio::spawn( async{
log::info!("Starting Trade Stream WebSocket client...");
ws_client_trade::start_websocket().await;
});

tokio::try_join!(websocket_trade_handle).unwrap();
// Serve metrics on port 3030
let metrics_server = tokio::spawn(async {

// Metrics route for Prometheus
let metrics_route = warp::path("metrics")
.and(warp::get())
.map(|| {
let metrics = StatsRecorderService::serve_metrics();
warp::reply::with_header(metrics, "Content-Type", "text/plain")
});

warp::serve(metrics_route).run(([0, 0, 0, 0], 3030)).await;
});

tokio::try_join!(websocket_trade_handle,metrics_server).unwrap();
}
3 changes: 2 additions & 1 deletion src/ports/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod ws_client_trade;
pub mod ws_client_trade;
pub mod stats;
2 changes: 1 addition & 1 deletion src/ports/stats/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
mod stats_recorder;
pub mod stats_recorder_service;
27 changes: 0 additions & 27 deletions src/ports/stats/stats_recorder.rs

This file was deleted.

43 changes: 43 additions & 0 deletions src/ports/stats/stats_recorder_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::collections::VecDeque;
use prometheus::{Gauge, Encoder, TextEncoder, register_gauge, default_registry, Registry};
use std::sync::{Arc, Mutex};
use once_cell::sync::Lazy;
use crate::config::CONFIG;
use crate::domain::entities::trade::TradeData;

// Static singleton for global price gauge storage
pub static PRICE_GAUGE: Lazy<Arc<Mutex<Gauge>>> = Lazy::new(|| {
let gauge = register_gauge!(
"crypto_price",
"Current price of the trading pair"
).unwrap();

let registry = Registry::new();

registry.register(Box::new(gauge.clone())).expect("Failed to register gauge");
// Register gauge with the default registry manually
//default_registry().register(Box::new(gauge.clone())).expect("Failed to register gauge");
log::info!("Prometheus Gauge was registered");
Arc::new(Mutex::new(gauge))
});

#[derive(Debug, Clone, Default)]
pub struct StatsRecorderService;

impl StatsRecorderService{
pub fn track_price(price: f64) {
// Update the shared gauge
let gauge = PRICE_GAUGE.lock().unwrap();
//log::info!("PRICE:: {}",price);
gauge.set(price);
}

pub fn serve_metrics() -> String {
// Serve the Prometheus metrics via HTTP
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
let metrics = prometheus::gather();
encoder.encode(&metrics, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}
}

0 comments on commit c48bf82

Please sign in to comment.