Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add metrics to Block Streamer #579

Merged
merged 9 commits into from
Feb 23, 2024
399 changes: 395 additions & 4 deletions block-streamer/Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ version = "0.1.0"
edition = "2021"

[dependencies]
actix-web = "4.5.1"
anyhow = "1.0.57"
async-trait = "0.1.74"
aws-config = { version = "1.0.0", features = ["behavior-version-latest"]}
aws-sdk-s3 = "0.39.1"
borsh = "0.10.2"
chrono = "0.4.25"
futures = "0.3.5"
lazy_static = "1.4.0"
mockall = "0.11.4"
prometheus = "0.13.3"
prost = "0.12.3"
redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] }
serde = { version = "1", features = ["derive"] }
Expand Down
141 changes: 42 additions & 99 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use near_lake_framework::near_indexer_primitives;
use tokio::task::JoinHandle;

use crate::indexer_config::IndexerConfig;
use crate::metrics;
use crate::rules::types::ChainId;
use registry_types::Rule;

Expand Down Expand Up @@ -135,6 +136,10 @@ pub(crate) async fn start_block_stream(
) -> anyhow::Result<()> {
tracing::info!("Starting block stream",);

metrics::PUBLISHED_BLOCKS_COUNT
.with_label_values(&[&indexer.get_full_name()])
.reset();

let last_indexed_delta_lake_block = process_delta_lake_blocks(
start_block_height,
delta_lake_client,
Expand Down Expand Up @@ -219,19 +224,14 @@ async fn process_delta_lake_blocks(
blocks_from_index.len(),
);

for block in &blocks_from_index {
let block = block.to_owned();
for block_height in &blocks_from_index {
let block_height = block_height.to_owned();
redis_client
.xadd(redis_stream.clone(), &[("block_height".to_string(), block)])
.await
.context("Failed to add block to Redis Stream")?;
.publish_block(indexer, redis_stream.clone(), block_height)
.await?;
redis_client
.set(
format!("{}:last_published_block", indexer.get_full_name()),
block,
)
.await
.context("Failed to set last_published_block")?;
.set_last_processed_block(indexer, block_height)
.await?;
}

let last_indexed_block =
Expand Down Expand Up @@ -275,12 +275,8 @@ async fn process_near_lake_blocks(
last_indexed_block = block_height;

redis_client
.set(
format!("{}:last_published_block", indexer.get_full_name()),
last_indexed_block,
)
.await
.context("Failed to set last_published_block")?;
.set_last_processed_block(indexer, block_height)
.await?;

let matches = crate::rules::reduce_indexer_rule_matches(
&indexer.rule,
Expand All @@ -290,12 +286,8 @@ async fn process_near_lake_blocks(

if !matches.is_empty() {
redis_client
.xadd(
redis_stream.clone(),
&[("block_height".to_string(), block_height.to_owned())],
)
.await
.context("Failed to add block to Redis Stream")?;
.publish_block(indexer, redis_stream.clone(), block_height)
.await?;
}
}

Expand Down Expand Up @@ -330,15 +322,20 @@ mod tests {

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_xadd::<String, u64>()
.with(predicate::eq("stream key".to_string()), predicate::always())
.returning(|_, fields| {
assert!(vec![107503702, 107503703, 107503705].contains(&fields[0].1));
Ok(())
})
.expect_publish_block()
.with(
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503702, 107503703, 107503705]),
)
.returning(|_, _, _| Ok(()))
.times(3);
mock_redis_client
.expect_set::<String, u64>()
.expect_set_last_processed_block()
.with(
predicate::always(),
predicate::in_iter([107503702, 107503703, 107503704, 107503705]),
)
.returning(|_, _| Ok(()))
.times(4);

Expand Down Expand Up @@ -388,24 +385,9 @@ mod tests {
.expect_list_matching_block_heights()
.never();

let mock_lake_s3_config =
crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields));
Ok(())
})
.times(2);
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields[0].1));
Ok(())
})
.times(2);
mock_redis_client.expect_publish_block().never();
mock_redis_client.expect_set_last_processed_block().never();

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
Expand All @@ -419,14 +401,11 @@ mod tests {
},
};

start_block_stream(
process_delta_lake_blocks(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_config,
&ChainId::Mainnet,
1,
std::sync::Arc::new(mock_redis_client),
&indexer_config,
"stream key".to_string(),
)
.await
Expand All @@ -451,24 +430,9 @@ mod tests {
.expect_list_matching_block_heights()
.never();

let mock_lake_s3_config =
crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields));
Ok(())
})
.times(2);
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields[0].1));
Ok(())
})
.times(2);
mock_redis_client.expect_publish_block().never();
mock_redis_client.expect_set_last_processed_block().never();

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
Expand All @@ -482,14 +446,11 @@ mod tests {
},
};

start_block_stream(
process_delta_lake_blocks(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_config,
&ChainId::Mainnet,
1,
std::sync::Arc::new(mock_redis_client),
&indexer_config,
"stream key".to_string(),
)
.await
Expand All @@ -514,24 +475,9 @@ mod tests {
.expect_list_matching_block_heights()
.never();

let mock_lake_s3_config =
crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields));
Ok(())
})
.times(2);
mock_redis_client
.expect_xadd::<String, u64>()
.returning(|_, fields| {
assert!(vec![107503704, 107503705].contains(&fields[0].1));
Ok(())
})
.times(2);
mock_redis_client.expect_publish_block().never();
mock_redis_client.expect_set_last_processed_block().never();

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
Expand All @@ -545,14 +491,11 @@ mod tests {
},
};

start_block_stream(
process_delta_lake_blocks(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_config,
&ChainId::Mainnet,
1,
std::sync::Arc::new(mock_redis_client),
&indexer_config,
"stream key".to_string(),
)
.await
Expand Down
5 changes: 5 additions & 0 deletions block-streamer/src/indexer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@ impl IndexerConfig {
self.get_full_name().hash(&mut hasher);
hasher.finish().to_string()
}

pub fn last_processed_block_key(&self) -> String {
// TODO: rename to `last_processed_block`
format!("{}:last_published_block", self.get_full_name())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"published" should mean it actually got added to the Redis Stream

}
}
23 changes: 17 additions & 6 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use tracing_subscriber::prelude::*;
mod block_stream;
mod delta_lake_client;
mod indexer_config;
mod metrics;
mod redis;
mod rules;
mod s3_client;
Expand All @@ -15,26 +16,36 @@ mod test_utils;
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(metrics::LogCounter)
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();

let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set");
let server_port = std::env::var("SERVER_PORT").expect("SERVER_PORT is not set");
let grpc_port = std::env::var("GRPC_PORT").expect("GRPC_PORT is not set");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will also make this change in Terraform

let metrics_port = std::env::var("METRICS_PORT")
.expect("METRICS_PORT is not set")
.parse()
.expect("METRICS_PORT is not a valid number");

tracing::info!(
redis_url,
grpc_port,
metrics_port,
"Starting Block Streamer"
);

tracing::info!("Starting Block Streamer Service...");

tracing::info!("Connecting to Redis...");
let redis_client = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);

let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::Config::from(&aws_config);
let s3_client = crate::s3_client::S3Client::new(s3_config.clone());

tracing::info!("Connecting to Delta Lake...");
let delta_lake_client =
std::sync::Arc::new(crate::delta_lake_client::DeltaLakeClient::new(s3_client));

server::init(&server_port, redis_client, delta_lake_client, s3_config).await?;
tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server"));

server::init(&grpc_port, redis_client, delta_lake_client, s3_config).await?;

Ok(())
}
72 changes: 72 additions & 0 deletions block-streamer/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use actix_web::{get, App, HttpServer, Responder};
use lazy_static::lazy_static;
use prometheus::{
register_int_counter_vec, register_int_gauge_vec, Encoder, IntCounterVec, IntGaugeVec,
};
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;

lazy_static! {
pub static ref LAST_PROCESSED_BLOCK: IntGaugeVec = register_int_gauge_vec!(
"queryapi_block_streamer_last_processed_block",
"Height of last block seen",
&["indexer"]
)
.unwrap();
pub static ref PROCESSED_BLOCKS_COUNT: IntCounterVec = register_int_counter_vec!(
"queryapi_block_streamer_processed_blocks_count",
"Number of blocks processed by block stream",
&["indexer"]
)
.unwrap();
pub static ref PUBLISHED_BLOCKS_COUNT: IntCounterVec = register_int_counter_vec!(
"queryapi_block_streamer_published_blocks_count",
"Number of blocks published to redis stream",
&["indexer"]
)
.unwrap();
pub static ref LOGS_COUNT: IntCounterVec = register_int_counter_vec!(
"queryapi_block_streamer_logs_count",
"Number of messages logged",
&["level"]
)
.unwrap();
}

pub struct LogCounter;

impl<S> Layer<S> for LogCounter
where
S: tracing::Subscriber,
{
fn on_event(&self, event: &tracing::Event, _ctx: Context<S>) {
LOGS_COUNT
.with_label_values(&[event.metadata().level().as_str()])
.inc();
}
}

#[get("/metrics")]
async fn get_metrics() -> impl Responder {
let mut buffer = Vec::<u8>::new();
let encoder = prometheus::TextEncoder::new();
loop {
match encoder.encode(&prometheus::gather(), &mut buffer) {
Ok(_) => break,
Err(err) => {
tracing::error!("Error encoding metrics: {}", err);
}
}
}
String::from_utf8(buffer).unwrap()
}

pub(crate) fn init_server(port: u16) -> anyhow::Result<actix_web::dev::Server> {
tracing::info!("Starting metrics server on 0.0.0.0:{port}");

Ok(HttpServer::new(|| App::new().service(get_metrics))
.bind(("0.0.0.0", port))?
.disable_signals()
.workers(1)
.run())
}
Loading
Loading