From 22b582eeb4ab6a90cf09a74d531e21334e52fe6c Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Fri, 23 Feb 2024 15:18:45 +1300 Subject: [PATCH] feat: Add metrics to Block Streamer (#579) This PR adds the following metrics to Block Streamer: - `LAST_PROCESSED_BLOCK` - height of last block seen, will be used to calculate the lag from tip of network - `PROCESSED_BLOCKS_COUNT` - count of blocks seen, used to calculate BPS - `PUBLISHED_BLOCKS_COUNT` - count of blocks published to Redis Stream, to determine whether the block stream is publishing messages - `LOGS_COUNT` - count logs by level, so we can alert on number of error logs etc. I've also slightly refactored `RedisClient` to support this change. --- block-streamer/Cargo.lock | 399 ++++++++++++++++++++++++++- block-streamer/Cargo.toml | 3 + block-streamer/src/block_stream.rs | 141 +++------- block-streamer/src/indexer_config.rs | 5 + block-streamer/src/main.rs | 23 +- block-streamer/src/metrics.rs | 72 +++++ block-streamer/src/redis.rs | 44 +++ block-streamer/src/server/mod.rs | 2 +- 8 files changed, 579 insertions(+), 110 deletions(-) create mode 100644 block-streamer/src/metrics.rs diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index 6198e3dfd..70b4a03ef 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -2,6 +2,191 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "actix-codec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f7b0a21988c1bf877cf4759ef5ddaac04c1c9fe808c9142ecb78ba97d97a28a" +dependencies = [ + "bitflags 2.4.1", + "bytes", + "futures-core", + "futures-sink", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "actix-http" +version = "3.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d223b13fd481fc0d1f83bb12659ae774d9e3601814c68a0bc539731698cca743" +dependencies = [ + "actix-codec", + "actix-rt", + "actix-service", + "actix-utils", + "ahash", + "base64", + "bitflags 2.4.1", + "brotli", + "bytes", + "bytestring", + "derive_more", + "encoding_rs", + "flate2", + "futures-core", + "h2", + "http", + "httparse", + "httpdate", + "itoa", + "language-tags", + "local-channel", + "mime", + "percent-encoding", + "pin-project-lite", + "rand 0.8.5", + "sha1 0.10.6", + "smallvec", + "tokio", + "tokio-util", + "tracing", + "zstd", +] + +[[package]] +name = "actix-macros" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" +dependencies = [ + "quote", + "syn 2.0.39", +] + +[[package]] +name = "actix-router" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d22475596539443685426b6bdadb926ad0ecaefdfc5fb05e5e3441f15463c511" +dependencies = [ + "bytestring", + "http", + "regex", + "serde", + "tracing", +] + +[[package]] +name = "actix-rt" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28f32d40287d3f402ae0028a9d54bef51af15c8769492826a69d28f81893151d" +dependencies = [ + "futures-core", + "tokio", +] + +[[package]] +name = "actix-server" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3eb13e7eef0423ea6eab0e59f6c72e7cb46d33691ad56a726b3cd07ddec2c2d4" +dependencies = [ + "actix-rt", + "actix-service", + "actix-utils", + "futures-core", + "futures-util", + "mio", + "socket2 0.5.5", + "tokio", + "tracing", +] + +[[package]] +name = "actix-service" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b894941f818cfdc7ccc4b9e60fa7e53b5042a2e8567270f9147d5591893373a" +dependencies = [ + "futures-core", + "paste", + "pin-project-lite", +] + +[[package]] +name = "actix-utils" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88a1dcdff1466e3c2488e1cb5c36a71822750ad43839937f85d2f4d9f8b705d8" +dependencies = [ + "local-waker", + "pin-project-lite", +] + +[[package]] +name = "actix-web" +version = "4.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a6556ddebb638c2358714d853257ed226ece6023ef9364f23f0c70737ea984" +dependencies = [ + "actix-codec", + "actix-http", + "actix-macros", + "actix-router", + "actix-rt", + "actix-server", + "actix-service", + "actix-utils", + "actix-web-codegen", + "ahash", + "bytes", + "bytestring", + "cfg-if", + "cookie", + "derive_more", + "encoding_rs", + "futures-core", + "futures-util", + "itoa", + "language-tags", + "log", + "mime", + "once_cell", + "pin-project-lite", + "regex", + "serde", + "serde_json", + "serde_urlencoded", + "smallvec", + "socket2 0.5.5", + "time", + "url", +] + +[[package]] +name = "actix-web-codegen" +version = "4.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb1f50ebbb30eca122b188319a4398b3f7bb4a8cdf50ecfb73bfc6a3c3ce54f5" +dependencies = [ + "actix-router", + "proc-macro2", + "quote", + "syn 2.0.39", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "ahash" version = "0.8.6" @@ -9,6 +194,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" dependencies = [ "cfg-if", + "getrandom 0.2.11", "once_cell", "version_check", "zerocopy", @@ -23,6 +209,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -645,6 +846,7 @@ dependencies = [ name = "block-streamer" version = "0.1.0" dependencies = [ + "actix-web", "anyhow", "async-trait", "aws-config", @@ -655,8 +857,10 @@ dependencies = [ "chrono", "futures", "http", + "lazy_static", "mockall", "near-lake-framework", + "prometheus", "prost", "redis", "registry-types", @@ -741,6 +945,27 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "brotli" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "516074a47ef4bce09577a3b379392300159ce5b1ba2e501ff1c819950066100f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bs58" version = "0.4.0" @@ -784,6 +1009,15 @@ dependencies = [ "serde", ] +[[package]] +name = "bytestring" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d80203ea6b29df88012294f62733de21cfeab47f17b41af3a38bc30a03ee72" +dependencies = [ + "bytes", +] + [[package]] name = "c2-chacha" version = "0.3.3" @@ -800,6 +1034,7 @@ version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" dependencies = [ + "jobserver", "libc", ] @@ -865,6 +1100,17 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "cookie" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb" +dependencies = [ + "percent-encoding", + "time", + "version_check", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -1219,6 +1465,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encoding_rs" +version = "0.8.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +dependencies = [ + "cfg-if", +] + [[package]] name = "enum-map" version = "2.7.2" @@ -1298,6 +1553,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flate2" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "float-cmp" version = "0.9.0" @@ -1472,9 +1737,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.22" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" +checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" dependencies = [ "bytes", "fnv", @@ -1600,7 +1865,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -1711,6 +1976,15 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +[[package]] +name = "jobserver" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab46a6e9526ddef3ae7f787c06f0f2600639ba80ea3eade3d8e670a2230f51d6" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.65" @@ -1726,6 +2000,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9dbbfed4e59ba9750e15ba154fdfd9329cee16ff3df539c2666b70f58cc32105" +[[package]] +name = "language-tags" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" + [[package]] name = "lazy_static" version = "1.4.0" @@ -1744,6 +2024,23 @@ version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829" +[[package]] +name = "local-channel" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6cbc85e69b8df4b8bb8b89ec634e7189099cea8927a276b7384ce5488e53ec8" +dependencies = [ + "futures-core", + "futures-sink", + "local-waker", +] + +[[package]] +name = "local-waker" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d873d7c67ce09b42110d801813efbc9364414e356be9935700d368351657487" + [[package]] name = "lock_api" version = "0.4.11" @@ -1797,6 +2094,15 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "miniz_oxide" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.8.9" @@ -1804,6 +2110,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", + "log", "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] @@ -2169,6 +2476,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + [[package]] name = "percent-encoding" version = "2.3.1" @@ -2227,6 +2540,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" + [[package]] name = "powerfmt" version = "0.2.0" @@ -2351,6 +2670,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + [[package]] name = "prost" version = "0.12.3" @@ -2405,6 +2739,12 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "quote" version = "1.0.33" @@ -2824,6 +3164,18 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_with" version = "3.4.0" @@ -2980,6 +3332,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "spin" version = "0.9.8" @@ -3183,7 +3545,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.4.10", "tokio-macros", "windows-sys", ] @@ -3348,6 +3710,7 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3764,3 +4127,31 @@ dependencies = [ "quote", "syn 2.0.39", ] + +[[package]] +name = "zstd" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.9+zstd.1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index 1d393b667..9805907a5 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +actix-web = "4.5.1" anyhow = "1.0.57" async-trait = "0.1.74" aws-config = { version = "1.0.0", features = ["behavior-version-latest"]} @@ -11,7 +12,9 @@ aws-sdk-s3 = "0.39.1" borsh = "0.10.2" chrono = "0.4.25" futures = "0.3.5" +lazy_static = "1.4.0" mockall = "0.11.4" +prometheus = "0.13.3" prost = "0.12.3" redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] } serde = { version = "1", features = ["derive"] } diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index cc6cd24c9..bc4605c73 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -3,6 +3,7 @@ use near_lake_framework::near_indexer_primitives; use tokio::task::JoinHandle; use crate::indexer_config::IndexerConfig; +use crate::metrics; use crate::rules::types::ChainId; use registry_types::Rule; @@ -135,6 +136,10 @@ pub(crate) async fn start_block_stream( ) -> anyhow::Result<()> { tracing::info!("Starting block stream",); + metrics::PUBLISHED_BLOCKS_COUNT + .with_label_values(&[&indexer.get_full_name()]) + .reset(); + let last_indexed_delta_lake_block = process_delta_lake_blocks( start_block_height, delta_lake_client, @@ -219,19 +224,14 @@ async fn process_delta_lake_blocks( blocks_from_index.len(), ); - for block in &blocks_from_index { - let block = block.to_owned(); + for block_height in &blocks_from_index { + let block_height = block_height.to_owned(); redis_client - .xadd(redis_stream.clone(), &[("block_height".to_string(), block)]) - .await - .context("Failed to add block to Redis Stream")?; + .publish_block(indexer, redis_stream.clone(), block_height) + .await?; redis_client - .set( - format!("{}:last_published_block", indexer.get_full_name()), - block, - ) - .await - .context("Failed to set last_published_block")?; + .set_last_processed_block(indexer, block_height) + .await?; } let last_indexed_block = @@ -275,12 +275,8 @@ async fn process_near_lake_blocks( last_indexed_block = block_height; redis_client - .set( - format!("{}:last_published_block", indexer.get_full_name()), - last_indexed_block, - ) - .await - .context("Failed to set last_published_block")?; + .set_last_processed_block(indexer, block_height) + .await?; let matches = crate::rules::reduce_indexer_rule_matches( &indexer.rule, @@ -290,12 +286,8 @@ async fn process_near_lake_blocks( if !matches.is_empty() { redis_client - .xadd( - redis_stream.clone(), - &[("block_height".to_string(), block_height.to_owned())], - ) - .await - .context("Failed to add block to Redis Stream")?; + .publish_block(indexer, redis_stream.clone(), block_height) + .await?; } } @@ -330,15 +322,20 @@ mod tests { let mut mock_redis_client = crate::redis::RedisClient::default(); mock_redis_client - .expect_xadd::() - .with(predicate::eq("stream key".to_string()), predicate::always()) - .returning(|_, fields| { - assert!(vec![107503702, 107503703, 107503705].contains(&fields[0].1)); - Ok(()) - }) + .expect_publish_block() + .with( + predicate::always(), + predicate::eq("stream key".to_string()), + predicate::in_iter([107503702, 107503703, 107503705]), + ) + .returning(|_, _, _| Ok(())) .times(3); mock_redis_client - .expect_set::() + .expect_set_last_processed_block() + .with( + predicate::always(), + predicate::in_iter([107503702, 107503703, 107503704, 107503705]), + ) .returning(|_, _| Ok(())) .times(4); @@ -388,24 +385,9 @@ mod tests { .expect_list_matching_block_heights() .never(); - let mock_lake_s3_config = - crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); - let mut mock_redis_client = crate::redis::RedisClient::default(); - mock_redis_client - .expect_set::() - .returning(|_, fields| { - assert!(vec![107503704, 107503705].contains(&fields)); - Ok(()) - }) - .times(2); - mock_redis_client - .expect_xadd::() - .returning(|_, fields| { - assert!(vec![107503704, 107503705].contains(&fields[0].1)); - Ok(()) - }) - .times(2); + mock_redis_client.expect_publish_block().never(); + mock_redis_client.expect_set_last_processed_block().never(); let indexer_config = crate::indexer_config::IndexerConfig { account_id: near_indexer_primitives::types::AccountId::try_from( @@ -419,14 +401,11 @@ mod tests { }, }; - start_block_stream( + process_delta_lake_blocks( 107503704, - &indexer_config, - std::sync::Arc::new(mock_redis_client), std::sync::Arc::new(mock_delta_lake_client), - mock_lake_s3_config, - &ChainId::Mainnet, - 1, + std::sync::Arc::new(mock_redis_client), + &indexer_config, "stream key".to_string(), ) .await @@ -451,24 +430,9 @@ mod tests { .expect_list_matching_block_heights() .never(); - let mock_lake_s3_config = - crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); - let mut mock_redis_client = crate::redis::RedisClient::default(); - mock_redis_client - .expect_set::() - .returning(|_, fields| { - assert!(vec![107503704, 107503705].contains(&fields)); - Ok(()) - }) - .times(2); - mock_redis_client - .expect_xadd::() - .returning(|_, fields| { - assert!(vec![107503704, 107503705].contains(&fields[0].1)); - Ok(()) - }) - .times(2); + mock_redis_client.expect_publish_block().never(); + mock_redis_client.expect_set_last_processed_block().never(); let indexer_config = crate::indexer_config::IndexerConfig { account_id: near_indexer_primitives::types::AccountId::try_from( @@ -482,14 +446,11 @@ mod tests { }, }; - start_block_stream( + process_delta_lake_blocks( 107503704, - &indexer_config, - std::sync::Arc::new(mock_redis_client), std::sync::Arc::new(mock_delta_lake_client), - mock_lake_s3_config, - &ChainId::Mainnet, - 1, + std::sync::Arc::new(mock_redis_client), + &indexer_config, "stream key".to_string(), ) .await @@ -514,24 +475,9 @@ mod tests { .expect_list_matching_block_heights() .never(); - let mock_lake_s3_config = - crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); - let mut mock_redis_client = crate::redis::RedisClient::default(); - mock_redis_client - .expect_set::() - .returning(|_, fields| { - assert!(vec![107503704, 107503705].contains(&fields)); - Ok(()) - }) - .times(2); - mock_redis_client - .expect_xadd::() - .returning(|_, fields| { - assert!(vec![107503704, 107503705].contains(&fields[0].1)); - Ok(()) - }) - .times(2); + mock_redis_client.expect_publish_block().never(); + mock_redis_client.expect_set_last_processed_block().never(); let indexer_config = crate::indexer_config::IndexerConfig { account_id: near_indexer_primitives::types::AccountId::try_from( @@ -545,14 +491,11 @@ mod tests { }, }; - start_block_stream( + process_delta_lake_blocks( 107503704, - &indexer_config, - std::sync::Arc::new(mock_redis_client), std::sync::Arc::new(mock_delta_lake_client), - mock_lake_s3_config, - &ChainId::Mainnet, - 1, + std::sync::Arc::new(mock_redis_client), + &indexer_config, "stream key".to_string(), ) .await diff --git a/block-streamer/src/indexer_config.rs b/block-streamer/src/indexer_config.rs index f56eb5c54..bc55d1434 100644 --- a/block-streamer/src/indexer_config.rs +++ b/block-streamer/src/indexer_config.rs @@ -21,4 +21,9 @@ impl IndexerConfig { self.get_full_name().hash(&mut hasher); hasher.finish().to_string() } + + pub fn last_processed_block_key(&self) -> String { + // TODO: rename to `last_processed_block` + format!("{}:last_published_block", self.get_full_name()) + } } diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index 1d5a3ac2c..1ca9beca8 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -3,6 +3,7 @@ use tracing_subscriber::prelude::*; mod block_stream; mod delta_lake_client; mod indexer_config; +mod metrics; mod redis; mod rules; mod s3_client; @@ -15,26 +16,36 @@ mod test_utils; async fn main() -> anyhow::Result<()> { tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer()) + .with(metrics::LogCounter) .with(tracing_subscriber::EnvFilter::from_default_env()) .init(); let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set"); - let server_port = std::env::var("SERVER_PORT").expect("SERVER_PORT is not set"); + let grpc_port = std::env::var("GRPC_PORT").expect("GRPC_PORT is not set"); + let metrics_port = std::env::var("METRICS_PORT") + .expect("METRICS_PORT is not set") + .parse() + .expect("METRICS_PORT is not a valid number"); + + tracing::info!( + redis_url, + grpc_port, + metrics_port, + "Starting Block Streamer" + ); - tracing::info!("Starting Block Streamer Service..."); - - tracing::info!("Connecting to Redis..."); let redis_client = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?); let aws_config = aws_config::from_env().load().await; let s3_config = aws_sdk_s3::Config::from(&aws_config); let s3_client = crate::s3_client::S3Client::new(s3_config.clone()); - tracing::info!("Connecting to Delta Lake..."); let delta_lake_client = std::sync::Arc::new(crate::delta_lake_client::DeltaLakeClient::new(s3_client)); - server::init(&server_port, redis_client, delta_lake_client, s3_config).await?; + tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server")); + + server::init(&grpc_port, redis_client, delta_lake_client, s3_config).await?; Ok(()) } diff --git a/block-streamer/src/metrics.rs b/block-streamer/src/metrics.rs new file mode 100644 index 000000000..0c691f17e --- /dev/null +++ b/block-streamer/src/metrics.rs @@ -0,0 +1,72 @@ +use actix_web::{get, App, HttpServer, Responder}; +use lazy_static::lazy_static; +use prometheus::{ + register_int_counter_vec, register_int_gauge_vec, Encoder, IntCounterVec, IntGaugeVec, +}; +use tracing_subscriber::layer::Context; +use tracing_subscriber::Layer; + +lazy_static! { + pub static ref LAST_PROCESSED_BLOCK: IntGaugeVec = register_int_gauge_vec!( + "queryapi_block_streamer_last_processed_block", + "Height of last block seen", + &["indexer"] + ) + .unwrap(); + pub static ref PROCESSED_BLOCKS_COUNT: IntCounterVec = register_int_counter_vec!( + "queryapi_block_streamer_processed_blocks_count", + "Number of blocks processed by block stream", + &["indexer"] + ) + .unwrap(); + pub static ref PUBLISHED_BLOCKS_COUNT: IntCounterVec = register_int_counter_vec!( + "queryapi_block_streamer_published_blocks_count", + "Number of blocks published to redis stream", + &["indexer"] + ) + .unwrap(); + pub static ref LOGS_COUNT: IntCounterVec = register_int_counter_vec!( + "queryapi_block_streamer_logs_count", + "Number of messages logged", + &["level"] + ) + .unwrap(); +} + +pub struct LogCounter; + +impl Layer for LogCounter +where + S: tracing::Subscriber, +{ + fn on_event(&self, event: &tracing::Event, _ctx: Context) { + LOGS_COUNT + .with_label_values(&[event.metadata().level().as_str()]) + .inc(); + } +} + +#[get("/metrics")] +async fn get_metrics() -> impl Responder { + let mut buffer = Vec::::new(); + let encoder = prometheus::TextEncoder::new(); + loop { + match encoder.encode(&prometheus::gather(), &mut buffer) { + Ok(_) => break, + Err(err) => { + tracing::error!("Error encoding metrics: {}", err); + } + } + } + String::from_utf8(buffer).unwrap() +} + +pub(crate) fn init_server(port: u16) -> anyhow::Result { + tracing::info!("Starting metrics server on 0.0.0.0:{port}"); + + Ok(HttpServer::new(|| App::new().service(get_metrics)) + .bind(("0.0.0.0", port))? + .disable_signals() + .workers(1) + .run()) +} diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index aee9fe667..0cd193644 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -2,8 +2,12 @@ use std::fmt::Debug; +use anyhow::Context; use redis::{aio::ConnectionManager, RedisError, ToRedisArgs}; +use crate::indexer_config::IndexerConfig; +use crate::metrics; + #[cfg(test)] pub use MockRedisClientImpl as RedisClient; #[cfg(not(test))] @@ -56,4 +60,44 @@ impl RedisClientImpl { Ok(()) } + + pub async fn set_last_processed_block( + &self, + indexer_config: &IndexerConfig, + height: u64, + ) -> anyhow::Result<()> { + let indexer = indexer_config.get_full_name(); + metrics::PROCESSED_BLOCKS_COUNT + .with_label_values(&[&indexer]) + .inc(); + metrics::LAST_PROCESSED_BLOCK + .with_label_values(&[&indexer]) + .set( + height + .try_into() + .context("Failed to convert block height (u64) to metrics type (i64)")?, + ); + + self.set(indexer_config.last_processed_block_key(), height) + .await + .context("Failed to set last processed block") + } + + pub async fn publish_block( + &self, + indexer: &IndexerConfig, + stream: String, + block_height: u64, + ) -> anyhow::Result<()> { + metrics::PUBLISHED_BLOCKS_COUNT + .with_label_values(&[&indexer.get_full_name()]) + .inc(); + + self.xadd( + stream.clone(), + &[(String::from("block_height"), block_height)], + ) + .await + .context("Failed to add block to Redis Stream") + } } diff --git a/block-streamer/src/server/mod.rs b/block-streamer/src/server/mod.rs index 10099b3f3..c3699bb98 100644 --- a/block-streamer/src/server/mod.rs +++ b/block-streamer/src/server/mod.rs @@ -12,7 +12,7 @@ pub async fn init( ) -> anyhow::Result<()> { let addr = format!("0.0.0.0:{}", port).parse()?; - tracing::info!("Starting RPC server at {}", addr); + tracing::info!("Starting gRPC server on {}", addr); let block_streamer_service = block_streamer_service::BlockStreamerService::new( redis_client,