From adaadfc3b2c94ad7bfdf54cd7d9d887b0b55899a Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Fri, 28 Jun 2024 08:48:50 +1200 Subject: [PATCH] fix: Add metrics for reliably measuring Block Stream/Executor health (#843) The current methods for determining both Block Stream and Executor health is flawed. This PR addresses these flaws by adding new, more reliable, metrics for use within Grafana. ### Block Streams A Block Stream is considered healthy if `LAST_PROCESSED_BLOCK` is continuously incremented, i.e. we are continuously downloading blocks from S3. This is flawed for the following reasons: 1. When the Redis Stream if full, we halt the Block Stream, preventing it from processing more blocks 2. When a Block Stream is intentionally stopped, we no longer process blocks To address these flaws, I've introduced a new dedicated metric: `BLOCK_STREAM_UP`, which: - is incremented every time the Block Stream future is polled, i.e. the task is doing work. A static value means unhealthy. - is removed when the Block Stream is stopped, so that it doesn't trigger the false positive described above ### Executors An Executor is considered unhealthy if: it has messages in the Redis Stream, and no reported execution durations. The latter only being recorded on success. The inverse of this is used to determine "healthy". This is flawed for the following reasons: 1. We distinguish the difference between a genuinely broken Indexer, and one broken due to system failures 2. "health" is only determined when there are messages in Redis, meaning we catch the issue later than possible To address these I have added the following metrics: 1. `EXECUTOR_UP` which is incremented on every Executor loop, like above, a static value means unhealthy. 2. `SUCCESSFUL_EXECUTIONS`/`FAILED_EXECUTIONS` which track successful/failed executions directly, rather than tracking using durations. This will be useful for tracking health of specific Indexers, e.g. the `staking` indexer should never have failed executions. --- block-streamer/Cargo.lock | 1 + block-streamer/Cargo.toml | 1 + block-streamer/src/block_stream.rs | 96 ++++++++++++++----- block-streamer/src/metrics.rs | 6 ++ .../src/server/block_streamer_service.rs | 16 +++- coordinator/Cargo.lock | 9 +- runner/src/metrics.ts | 23 ++++- runner/src/stream-handler/worker.ts | 4 + 8 files changed, 120 insertions(+), 36 deletions(-) diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index ce9cb859d..26a1e34ff 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -980,6 +980,7 @@ dependencies = [ "lazy_static", "mockall", "near-lake-framework", + "pin-project", "prometheus", "prost 0.12.4", "redis", diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index f4becc111..d7d4a42f5 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -19,6 +19,7 @@ graphql_client = { version = "0.14.0", features = ["reqwest"] } lazy_static = "1.4.0" mockall = "0.11.4" near-lake-framework = "0.7.8" +pin-project = "1.1.5" prometheus = "0.13.3" prost = "0.12.3" redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] } diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 4dbb6b0e7..6698881a9 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -1,3 +1,7 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::Poll; + use anyhow::Context; use near_lake_framework::near_indexer_primitives; use tokio::task::JoinHandle; @@ -14,6 +18,35 @@ const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100; const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"]; const MAX_STREAM_SIZE: u64 = 100; +#[pin_project::pin_project] +pub struct PollCounter { + #[pin] + inner: F, + indexer_name: String, +} + +impl PollCounter { + pub fn new(inner: F, indexer_name: String) -> Self { + Self { + inner, + indexer_name, + } + } +} + +impl Future for PollCounter { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + metrics::BLOCK_STREAM_UP + .with_label_values(&[&self.indexer_name]) + .inc(); + + let this = self.project(); + this.inner.poll(cx) + } +} + pub struct Task { handle: JoinHandle>, cancellation_token: tokio_util::sync::CancellationToken, @@ -55,24 +88,15 @@ impl BlockStream { } let cancellation_token = tokio_util::sync::CancellationToken::new(); - let cancellation_token_clone = cancellation_token.clone(); - - let indexer_config = self.indexer_config.clone(); - let chain_id = self.chain_id.clone(); - let redis_stream = self.redis_stream.clone(); - - let handle = tokio::spawn(async move { - tokio::select! { - _ = cancellation_token_clone.cancelled() => { - tracing::info!( - account_id = indexer_config.account_id.as_str(), - function_name = indexer_config.function_name, - "Cancelling block stream task", - ); - - Ok(()) - }, - result = start_block_stream( + + let handle = tokio::spawn({ + let cancellation_token = cancellation_token.clone(); + let indexer_config = self.indexer_config.clone(); + let chain_id = self.chain_id.clone(); + let redis_stream = self.redis_stream.clone(); + + async move { + let block_stream_future = start_block_stream( start_block_height, &indexer_config, redis, @@ -80,17 +104,33 @@ impl BlockStream { lake_s3_client, &chain_id, LAKE_PREFETCH_SIZE, - redis_stream - ) => { - result.map_err(|err| { - tracing::error!( + redis_stream, + ); + + let block_stream_future = + PollCounter::new(block_stream_future, indexer_config.get_full_name()); + + tokio::select! { + _ = cancellation_token.cancelled() => { + tracing::info!( account_id = indexer_config.account_id.as_str(), function_name = indexer_config.function_name, - "Block stream task stopped due to error: {:?}", - err, + "Cancelling block stream task", ); - err - }) + + Ok(()) + }, + result = block_stream_future => { + result.map_err(|err| { + tracing::error!( + account_id = indexer_config.account_id.as_str(), + function_name = indexer_config.function_name, + "Block stream task stopped due to error: {:?}", + err, + ); + err + }) + } } } }); @@ -108,6 +148,10 @@ impl BlockStream { task.cancellation_token.cancel(); let _ = task.handle.await?; + // Fails if metric doesn't exist, i.e. task was never polled + let _ = metrics::BLOCK_STREAM_UP + .remove_label_values(&[&self.indexer_config.get_full_name()]); + return Ok(()); } diff --git a/block-streamer/src/metrics.rs b/block-streamer/src/metrics.rs index 31c4f84c2..1f01a071e 100644 --- a/block-streamer/src/metrics.rs +++ b/block-streamer/src/metrics.rs @@ -57,6 +57,12 @@ lazy_static! { &["level"] ) .unwrap(); + pub static ref BLOCK_STREAM_UP: IntCounterVec = register_int_counter_vec!( + "queryapi_block_streamer_block_stream_up", + "A continuously increasing counter to indicate the block stream is up", + &["indexer"] + ) + .unwrap(); } pub struct LogCounter; diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 4c7d2c69d..6a9037047 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -58,6 +58,7 @@ impl BlockStreamerService { #[tonic::async_trait] impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService { + #[tracing::instrument(skip(self))] async fn start_stream( &self, request: Request, @@ -117,7 +118,11 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic self.delta_lake_client.clone(), self.lake_s3_client.clone(), ) - .map_err(|_| Status::internal("Failed to start block stream"))?; + .map_err(|err| { + tracing::error!(?err, "Failed to start block stream"); + + Status::internal("Failed to start block stream") + })?; let mut lock = self.get_block_streams_lock()?; lock.insert(indexer_config.get_hash_id(), block_stream); @@ -127,6 +132,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic })) } + #[tracing::instrument(skip(self))] async fn stop_stream( &self, request: Request, @@ -148,10 +154,10 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic ))) } Some(mut block_stream) => { - block_stream - .cancel() - .await - .map_err(|_| Status::internal("Failed to cancel block stream"))?; + block_stream.cancel().await.map_err(|err| { + tracing::error!(?err, "Failed to cancel block stream"); + Status::internal("Failed to cancel block stream") + })?; } } diff --git a/coordinator/Cargo.lock b/coordinator/Cargo.lock index 5b118095c..0e4e5e24f 100644 --- a/coordinator/Cargo.lock +++ b/coordinator/Cargo.lock @@ -939,6 +939,7 @@ dependencies = [ "lazy_static", "mockall", "near-lake-framework", + "pin-project", "prometheus", "prost 0.12.3", "redis 0.21.7", @@ -3133,18 +3134,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.3" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.3" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index fd99d88e9..493da4119 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -63,6 +63,24 @@ const LOGS_COUNT = new Counter({ labelNames: ['level'], }); +const EXECUTOR_UP = new Counter({ + name: 'queryapi_runner_executor_up', + help: 'Incremented each time the executor loop runs to indicate whether the job is functional', + labelNames: ['indexer'], +}); + +const SUCCESSFUL_EXECUTIONS = new Counter({ + name: 'queryapi_runner_successful_executions', + help: 'Count of successful executions of an indexer function', + labelNames: ['indexer'], +}); + +const FAILED_EXECUTIONS = new Counter({ + name: 'queryapi_runner_failed_executions', + help: 'Count of failed executions of an indexer function', + labelNames: ['indexer'], +}); + export const METRICS = { HEAP_TOTAL_ALLOCATION, HEAP_USED, @@ -73,7 +91,10 @@ export const METRICS = { UNPROCESSED_STREAM_MESSAGES, LAST_PROCESSED_BLOCK_HEIGHT, EXECUTION_DURATION, - LOGS_COUNT + LOGS_COUNT, + EXECUTOR_UP, + SUCCESSFUL_EXECUTIONS, + FAILED_EXECUTIONS, }; const aggregatorRegistry = new AggregatorRegistry(); diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 6be274822..6c0759bb9 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -101,6 +101,8 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise let currBlockHeight = 0; while (true) { + METRICS.EXECUTOR_UP.labels({ indexer: indexerConfig.fullName() }).inc(); + if (workerContext.queue.length === 0) { await sleep(100); continue; @@ -150,9 +152,11 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise executionDurationTimer(); + METRICS.SUCCESSFUL_EXECUTIONS.labels({ indexer: indexerConfig.fullName() }).inc(); METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerConfig.fullName() }).set(currBlockHeight); postRunSpan.end(); } catch (err) { + METRICS.FAILED_EXECUTIONS.labels({ indexer: indexerConfig.fullName() }).inc(); parentSpan.setAttribute('status', 'failed'); parentPort?.postMessage({ type: WorkerMessageType.STATUS, data: { status: IndexerStatus.FAILING } }); const error = err as Error;