From fac01ae6b305e39ee3c68ac9a7f8e3ede31087f6 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 8 Aug 2023 14:14:20 +1200 Subject: [PATCH 1/3] feat: Add `UNPROCESSED_STREAM_MESSAGES` prometheus metric --- indexer/queryapi_coordinator/src/metrics.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/indexer/queryapi_coordinator/src/metrics.rs b/indexer/queryapi_coordinator/src/metrics.rs index f106a0624..6ba2d0090 100644 --- a/indexer/queryapi_coordinator/src/metrics.rs +++ b/indexer/queryapi_coordinator/src/metrics.rs @@ -1,6 +1,6 @@ use actix_web::{get, App, HttpServer, Responder}; use lazy_static::lazy_static; -use prometheus::{Encoder, IntCounter, IntGauge, Opts}; +use prometheus::{Encoder, IntCounter, IntGauge, IntGaugeVec, Opts}; use tracing::info; lazy_static! { @@ -14,6 +14,23 @@ lazy_static! { "Number of indexed blocks" ) .unwrap(); + pub(crate) static ref UNPROCESSED_STREAM_MESSAGES: IntGaugeVec = try_create_int_gauge_vec( + "queryapi_coordinator_unprocessed_stream_messages", + "Number of Redis Stream messages not processed by Runner", + &["stream"] + ) + .unwrap(); +} + +fn try_create_int_gauge_vec( + name: &str, + help: &str, + labels: &[&str], +) -> prometheus::Result { + let opts = Opts::new(name, help); + let gauge = IntGaugeVec::new(opts, labels)?; + prometheus::register(Box::new(gauge.clone()))?; + Ok(gauge) } fn try_create_int_gauge(name: &str, help: &str) -> prometheus::Result { From b2ac42279ccbee9e0db83b91f8cd95f09da71df8 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 8 Aug 2023 14:16:54 +1200 Subject: [PATCH 2/3] feat: Publish `UNPROCESSED_STREAM_MESSAGES` for each stream --- indexer/queryapi_coordinator/src/utils.rs | 30 ++++++++++++++++ indexer/storage/src/lib.rs | 44 +++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/indexer/queryapi_coordinator/src/utils.rs b/indexer/queryapi_coordinator/src/utils.rs index 2cf5207ed..601978966 100644 --- a/indexer/queryapi_coordinator/src/utils.rs +++ b/indexer/queryapi_coordinator/src/utils.rs @@ -1,3 +1,5 @@ +use crate::metrics; + pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) { let interval_secs = 10; let mut previous_processed_blocks: u64 = @@ -46,6 +48,34 @@ pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) alert_rules_count, ); previous_processed_blocks = processed_blocks; + + let streams = storage::smembers(&redis_connection_manager, storage::INDEXER_SET_KEY) + .await + .unwrap_or(Vec::new()); + + for stream in streams { + let latest_id = storage::get::( + &redis_connection_manager, + storage::generate_stream_last_id_key(&stream), + ) + .await + .unwrap_or(storage::STREAM_SMALLEST_ID.to_string()); + + let unprocessed_message_count = storage::xrange( + &redis_connection_manager, + storage::generate_stream_key(&stream), + &latest_id, + storage::STREAM_LARGEST_ID, + ) + .await + .unwrap_or(Vec::new()) + .len() as i64; + + metrics::UNPROCESSED_STREAM_MESSAGES + .with_label_values(&[&stream]) + .set(unprocessed_message_count); + } + tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; } } diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 3753e186f..183ef0bbe 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -3,6 +3,8 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; const STORAGE: &str = "storage_alertexer"; pub const INDEXER_SET_KEY: &str = "indexers"; +pub const STREAM_SMALLEST_ID: &str = "-"; +pub const STREAM_LARGEST_ID: &str = "+"; pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") @@ -16,6 +18,10 @@ pub fn generate_stream_key(name: &str) -> String { format!("{}:stream", name) } +pub fn generate_stream_last_id_key(name: &str) -> String { + format!("{}:stream:lastId", name) +} + pub async fn connect(redis_connection_str: &str) -> anyhow::Result { Ok(get_redis_client(redis_connection_str) .await @@ -105,6 +111,44 @@ pub async fn xadd( Ok(()) } +pub async fn smembers( + redis_connection_manager: &ConnectionManager, + key: &str, +) -> anyhow::Result> { + tracing::debug!(target: STORAGE, "SMEMBERS: {:?}", key); + + let members: Vec = redis::cmd("SMEMBERS") + .arg(key) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(members) +} + +pub async fn xrange( + redis_connection_manager: &ConnectionManager, + stream_key: impl ToRedisArgs + std::fmt::Debug, + start_id: &str, + end_id: &str, +) -> anyhow::Result> { + tracing::debug!( + target: STORAGE, + "XRANGE: {:?}, {:?}, {:?}", + stream_key, + start_id, + end_id + ); + + let results = redis::cmd("XRANGE") + .arg(stream_key) + .arg(start_id) + .arg(end_id) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(results) +} + /// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage. /// Increments the counter `receipts_{transaction_hash}` by one. /// The counter holds how many Receipts related to the Transaction are in watching list From 90d8cdef8871497a375d5e7730e2cebb273c1b5e Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 8 Aug 2023 14:17:15 +1200 Subject: [PATCH 3/3] feat: Decrease stats interval to 5sec --- indexer/queryapi_coordinator/src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/queryapi_coordinator/src/utils.rs b/indexer/queryapi_coordinator/src/utils.rs index 601978966..a2ec8ab1a 100644 --- a/indexer/queryapi_coordinator/src/utils.rs +++ b/indexer/queryapi_coordinator/src/utils.rs @@ -1,7 +1,7 @@ use crate::metrics; pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) { - let interval_secs = 10; + let interval_secs = 5; let mut previous_processed_blocks: u64 = storage::get::(&redis_connection_manager, "blocks_processed") .await