Skip to content

Commit

Permalink
feat: Add metrics to Block Streamer (#579)
Browse files Browse the repository at this point in the history
This PR adds the following metrics to Block Streamer:
- `LAST_PROCESSED_BLOCK` - height of last block seen, will be used to
calculate the lag from tip of network
- `PROCESSED_BLOCKS_COUNT` - count of blocks seen, used to calculate BPS
- `PUBLISHED_BLOCKS_COUNT` - count of blocks published to Redis Stream,
to determine whether the block stream is publishing messages
- `LOGS_COUNT` - count logs by level, so we can alert on number of error
logs etc.

I've also slightly refactored `RedisClient` to support this change.
  • Loading branch information
morgsmccauley committed Feb 23, 2024
1 parent 92d5526 commit 22b582e
Show file tree
Hide file tree
Showing 8 changed files with 579 additions and 110 deletions.
399 changes: 395 additions & 4 deletions block-streamer/Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ version = "0.1.0"
edition = "2021"

[dependencies]
actix-web = "4.5.1"
anyhow = "1.0.57"
async-trait = "0.1.74"
aws-config = { version = "1.0.0", features = ["behavior-version-latest"]}
aws-sdk-s3 = "0.39.1"
borsh = "0.10.2"
chrono = "0.4.25"
futures = "0.3.5"
lazy_static = "1.4.0"
mockall = "0.11.4"
prometheus = "0.13.3"
prost = "0.12.3"
redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] }
serde = { version = "1", features = ["derive"] }
Expand Down
141 changes: 42 additions & 99 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use near_lake_framework::near_indexer_primitives;
use tokio::task::JoinHandle;

use crate::indexer_config::IndexerConfig;
use crate::metrics;
use crate::rules::types::ChainId;
use registry_types::Rule;

Expand Down Expand Up @@ -135,6 +136,10 @@ pub(crate) async fn start_block_stream(
) -> anyhow::Result<()> {
tracing::info!("Starting block stream",);

metrics::PUBLISHED_BLOCKS_COUNT
.with_label_values(&[&indexer.get_full_name()])
.reset();

let last_indexed_delta_lake_block = process_delta_lake_blocks(
start_block_height,
delta_lake_client,
Expand Down Expand Up @@ -219,19 +224,14 @@ async fn process_delta_lake_blocks(
blocks_from_index.len(),
);

for block in &blocks_from_index {
let block = block.to_owned();
for block_height in &blocks_from_index {
let block_height = block_height.to_owned();
redis_client
.xadd(redis_stream.clone(), &[("block_height".to_string(), block)])
.await
.context("Failed to add block to Redis Stream")?;
.publish_block(indexer, redis_stream.clone(), block_height)
.await?;
redis_client
.set(
format!("{}:last_published_block", indexer.get_full_name()),
block,
)
.await
.context("Failed to set last_published_block")?;
.set_last_processed_block(indexer, block_height)
.await?;
}

let last_indexed_block =
Expand Down Expand Up @@ -275,12 +275,8 @@ async fn process_near_lake_blocks(
last_indexed_block = block_height;

redis_client
.set(
format!("{}:last_published_block", indexer.get_full_name()),
last_indexed_block,
)
.await
.context("Failed to set last_published_block")?;
.set_last_processed_block(indexer, block_height)
.await?;

let matches = crate::rules::reduce_indexer_rule_matches(
&indexer.rule,
Expand All @@ -290,12 +286,8 @@ async fn process_near_lake_blocks(

if !matches.is_empty() {
redis_client
.xadd(
redis_stream.clone(),
&[("block_height".to_string(), block_height.to_owned())],
)
.await
.context("Failed to add block to Redis Stream")?;
.publish_block(indexer, redis_stream.clone(), block_height)
.await?;
}
}

Expand Down Expand Up @@ -330,15 +322,20 @@ mod tests {

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_xadd::<String, u64>()
.with(predicate::eq("stream key".to_string()), predicate::always())
.returning(|_, fields| {
assert!(vec![107503702, 107503703, 107503705].contains(&fields[0].1));
Ok(())
})
.expect_publish_block()
.with(
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503702, 107503703, 107503705]),
)
.returning(|_, _, _| Ok(()))
.times(3);
mock_redis_client
.expect_set::<String, u64>()
.expect_set_last_processed_block()
.with(
predicate::always(),
predicate::in_iter([107503702, 107503703, 107503704, 107503705]),
)
.returning(|_, _| Ok(()))
.times(4);

Expand Down Expand Up @@ -388,24 +385,9 @@ mod tests {
.expect_list_matching_block_heights()
.never();

let mock_lake_s3_config =
crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields));
Ok(())
})
.times(2);
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields[0].1));
Ok(())
})
.times(2);
mock_redis_client.expect_publish_block().never();
mock_redis_client.expect_set_last_processed_block().never();

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
Expand All @@ -419,14 +401,11 @@ mod tests {
},
};

start_block_stream(
process_delta_lake_blocks(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_config,
&ChainId::Mainnet,
1,
std::sync::Arc::new(mock_redis_client),
&indexer_config,
"stream key".to_string(),
)
.await
Expand All @@ -451,24 +430,9 @@ mod tests {
.expect_list_matching_block_heights()
.never();

let mock_lake_s3_config =
crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields));
Ok(())
})
.times(2);
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields[0].1));
Ok(())
})
.times(2);
mock_redis_client.expect_publish_block().never();
mock_redis_client.expect_set_last_processed_block().never();

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
Expand All @@ -482,14 +446,11 @@ mod tests {
},
};

start_block_stream(
process_delta_lake_blocks(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_config,
&ChainId::Mainnet,
1,
std::sync::Arc::new(mock_redis_client),
&indexer_config,
"stream key".to_string(),
)
.await
Expand All @@ -514,24 +475,9 @@ mod tests {
.expect_list_matching_block_heights()
.never();

let mock_lake_s3_config =
crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields));
Ok(())
})
.times(2);
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields[0].1));
Ok(())
})
.times(2);
mock_redis_client.expect_publish_block().never();
mock_redis_client.expect_set_last_processed_block().never();

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
Expand All @@ -545,14 +491,11 @@ mod tests {
},
};

start_block_stream(
process_delta_lake_blocks(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_config,
&ChainId::Mainnet,
1,
std::sync::Arc::new(mock_redis_client),
&indexer_config,
"stream key".to_string(),
)
.await
Expand Down
5 changes: 5 additions & 0 deletions block-streamer/src/indexer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@ impl IndexerConfig {
self.get_full_name().hash(&mut hasher);
hasher.finish().to_string()
}

pub fn last_processed_block_key(&self) -> String {
// TODO: rename to `last_processed_block`
format!("{}:last_published_block", self.get_full_name())
}
}
23 changes: 17 additions & 6 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use tracing_subscriber::prelude::*;
mod block_stream;
mod delta_lake_client;
mod indexer_config;
mod metrics;
mod redis;
mod rules;
mod s3_client;
Expand All @@ -15,26 +16,36 @@ mod test_utils;
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(metrics::LogCounter)
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();

let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set");
let server_port = std::env::var("SERVER_PORT").expect("SERVER_PORT is not set");
let grpc_port = std::env::var("GRPC_PORT").expect("GRPC_PORT is not set");
let metrics_port = std::env::var("METRICS_PORT")
.expect("METRICS_PORT is not set")
.parse()
.expect("METRICS_PORT is not a valid number");

tracing::info!(
redis_url,
grpc_port,
metrics_port,
"Starting Block Streamer"
);

tracing::info!("Starting Block Streamer Service...");

tracing::info!("Connecting to Redis...");
let redis_client = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);

let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::Config::from(&aws_config);
let s3_client = crate::s3_client::S3Client::new(s3_config.clone());

tracing::info!("Connecting to Delta Lake...");
let delta_lake_client =
std::sync::Arc::new(crate::delta_lake_client::DeltaLakeClient::new(s3_client));

server::init(&server_port, redis_client, delta_lake_client, s3_config).await?;
tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server"));

server::init(&grpc_port, redis_client, delta_lake_client, s3_config).await?;

Ok(())
}
72 changes: 72 additions & 0 deletions block-streamer/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use actix_web::{get, App, HttpServer, Responder};
use lazy_static::lazy_static;
use prometheus::{
register_int_counter_vec, register_int_gauge_vec, Encoder, IntCounterVec, IntGaugeVec,
};
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;

lazy_static! {
pub static ref LAST_PROCESSED_BLOCK: IntGaugeVec = register_int_gauge_vec!(
"queryapi_block_streamer_last_processed_block",
"Height of last block seen",
&["indexer"]
)
.unwrap();
pub static ref PROCESSED_BLOCKS_COUNT: IntCounterVec = register_int_counter_vec!(
"queryapi_block_streamer_processed_blocks_count",
"Number of blocks processed by block stream",
&["indexer"]
)
.unwrap();
pub static ref PUBLISHED_BLOCKS_COUNT: IntCounterVec = register_int_counter_vec!(
"queryapi_block_streamer_published_blocks_count",
"Number of blocks published to redis stream",
&["indexer"]
)
.unwrap();
pub static ref LOGS_COUNT: IntCounterVec = register_int_counter_vec!(
"queryapi_block_streamer_logs_count",
"Number of messages logged",
&["level"]
)
.unwrap();
}

pub struct LogCounter;

impl<S> Layer<S> for LogCounter
where
S: tracing::Subscriber,
{
fn on_event(&self, event: &tracing::Event, _ctx: Context<S>) {
LOGS_COUNT
.with_label_values(&[event.metadata().level().as_str()])
.inc();
}
}

#[get("/metrics")]
async fn get_metrics() -> impl Responder {
let mut buffer = Vec::<u8>::new();
let encoder = prometheus::TextEncoder::new();
loop {
match encoder.encode(&prometheus::gather(), &mut buffer) {
Ok(_) => break,
Err(err) => {
tracing::error!("Error encoding metrics: {}", err);
}
}
}
String::from_utf8(buffer).unwrap()
}

pub(crate) fn init_server(port: u16) -> anyhow::Result<actix_web::dev::Server> {
tracing::info!("Starting metrics server on 0.0.0.0:{port}");

Ok(HttpServer::new(|| App::new().service(get_metrics))
.bind(("0.0.0.0", port))?
.disable_signals()
.workers(1)
.run())
}
Loading

0 comments on commit 22b582e

Please sign in to comment.