diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index ce9cb859d..26a1e34ff 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -980,6 +980,7 @@ dependencies = [ "lazy_static", "mockall", "near-lake-framework", + "pin-project", "prometheus", "prost 0.12.4", "redis", diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index f4becc111..d7d4a42f5 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -19,6 +19,7 @@ graphql_client = { version = "0.14.0", features = ["reqwest"] } lazy_static = "1.4.0" mockall = "0.11.4" near-lake-framework = "0.7.8" +pin-project = "1.1.5" prometheus = "0.13.3" prost = "0.12.3" redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] } diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 4dbb6b0e7..6698881a9 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -1,3 +1,7 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::Poll; + use anyhow::Context; use near_lake_framework::near_indexer_primitives; use tokio::task::JoinHandle; @@ -14,6 +18,35 @@ const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100; const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"]; const MAX_STREAM_SIZE: u64 = 100; +#[pin_project::pin_project] +pub struct PollCounter { + #[pin] + inner: F, + indexer_name: String, +} + +impl PollCounter { + pub fn new(inner: F, indexer_name: String) -> Self { + Self { + inner, + indexer_name, + } + } +} + +impl Future for PollCounter { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + metrics::BLOCK_STREAM_UP + .with_label_values(&[&self.indexer_name]) + .inc(); + + let this = self.project(); + this.inner.poll(cx) + } +} + pub struct Task { handle: JoinHandle>, cancellation_token: tokio_util::sync::CancellationToken, @@ -55,24 +88,15 @@ impl BlockStream { } let cancellation_token = tokio_util::sync::CancellationToken::new(); - let cancellation_token_clone = cancellation_token.clone(); - - let indexer_config = self.indexer_config.clone(); - let chain_id = self.chain_id.clone(); - let redis_stream = self.redis_stream.clone(); - - let handle = tokio::spawn(async move { - tokio::select! { - _ = cancellation_token_clone.cancelled() => { - tracing::info!( - account_id = indexer_config.account_id.as_str(), - function_name = indexer_config.function_name, - "Cancelling block stream task", - ); - - Ok(()) - }, - result = start_block_stream( + + let handle = tokio::spawn({ + let cancellation_token = cancellation_token.clone(); + let indexer_config = self.indexer_config.clone(); + let chain_id = self.chain_id.clone(); + let redis_stream = self.redis_stream.clone(); + + async move { + let block_stream_future = start_block_stream( start_block_height, &indexer_config, redis, @@ -80,17 +104,33 @@ impl BlockStream { lake_s3_client, &chain_id, LAKE_PREFETCH_SIZE, - redis_stream - ) => { - result.map_err(|err| { - tracing::error!( + redis_stream, + ); + + let block_stream_future = + PollCounter::new(block_stream_future, indexer_config.get_full_name()); + + tokio::select! { + _ = cancellation_token.cancelled() => { + tracing::info!( account_id = indexer_config.account_id.as_str(), function_name = indexer_config.function_name, - "Block stream task stopped due to error: {:?}", - err, + "Cancelling block stream task", ); - err - }) + + Ok(()) + }, + result = block_stream_future => { + result.map_err(|err| { + tracing::error!( + account_id = indexer_config.account_id.as_str(), + function_name = indexer_config.function_name, + "Block stream task stopped due to error: {:?}", + err, + ); + err + }) + } } } }); @@ -108,6 +148,10 @@ impl BlockStream { task.cancellation_token.cancel(); let _ = task.handle.await?; + // Fails if metric doesn't exist, i.e. task was never polled + let _ = metrics::BLOCK_STREAM_UP + .remove_label_values(&[&self.indexer_config.get_full_name()]); + return Ok(()); } diff --git a/block-streamer/src/metrics.rs b/block-streamer/src/metrics.rs index 31c4f84c2..1f01a071e 100644 --- a/block-streamer/src/metrics.rs +++ b/block-streamer/src/metrics.rs @@ -57,6 +57,12 @@ lazy_static! { &["level"] ) .unwrap(); + pub static ref BLOCK_STREAM_UP: IntCounterVec = register_int_counter_vec!( + "queryapi_block_streamer_block_stream_up", + "A continuously increasing counter to indicate the block stream is up", + &["indexer"] + ) + .unwrap(); } pub struct LogCounter; diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 4c7d2c69d..6a9037047 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -58,6 +58,7 @@ impl BlockStreamerService { #[tonic::async_trait] impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService { + #[tracing::instrument(skip(self))] async fn start_stream( &self, request: Request, @@ -117,7 +118,11 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic self.delta_lake_client.clone(), self.lake_s3_client.clone(), ) - .map_err(|_| Status::internal("Failed to start block stream"))?; + .map_err(|err| { + tracing::error!(?err, "Failed to start block stream"); + + Status::internal("Failed to start block stream") + })?; let mut lock = self.get_block_streams_lock()?; lock.insert(indexer_config.get_hash_id(), block_stream); @@ -127,6 +132,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic })) } + #[tracing::instrument(skip(self))] async fn stop_stream( &self, request: Request, @@ -148,10 +154,10 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic ))) } Some(mut block_stream) => { - block_stream - .cancel() - .await - .map_err(|_| Status::internal("Failed to cancel block stream"))?; + block_stream.cancel().await.map_err(|err| { + tracing::error!(?err, "Failed to cancel block stream"); + Status::internal("Failed to cancel block stream") + })?; } } diff --git a/coordinator/Cargo.lock b/coordinator/Cargo.lock index 5b118095c..0e4e5e24f 100644 --- a/coordinator/Cargo.lock +++ b/coordinator/Cargo.lock @@ -939,6 +939,7 @@ dependencies = [ "lazy_static", "mockall", "near-lake-framework", + "pin-project", "prometheus", "prost 0.12.3", "redis 0.21.7", @@ -3133,18 +3134,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.3" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.3" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index fd99d88e9..493da4119 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -63,6 +63,24 @@ const LOGS_COUNT = new Counter({ labelNames: ['level'], }); +const EXECUTOR_UP = new Counter({ + name: 'queryapi_runner_executor_up', + help: 'Incremented each time the executor loop runs to indicate whether the job is functional', + labelNames: ['indexer'], +}); + +const SUCCESSFUL_EXECUTIONS = new Counter({ + name: 'queryapi_runner_successful_executions', + help: 'Count of successful executions of an indexer function', + labelNames: ['indexer'], +}); + +const FAILED_EXECUTIONS = new Counter({ + name: 'queryapi_runner_failed_executions', + help: 'Count of failed executions of an indexer function', + labelNames: ['indexer'], +}); + export const METRICS = { HEAP_TOTAL_ALLOCATION, HEAP_USED, @@ -73,7 +91,10 @@ export const METRICS = { UNPROCESSED_STREAM_MESSAGES, LAST_PROCESSED_BLOCK_HEIGHT, EXECUTION_DURATION, - LOGS_COUNT + LOGS_COUNT, + EXECUTOR_UP, + SUCCESSFUL_EXECUTIONS, + FAILED_EXECUTIONS, }; const aggregatorRegistry = new AggregatorRegistry(); diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 6be274822..6c0759bb9 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -101,6 +101,8 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise let currBlockHeight = 0; while (true) { + METRICS.EXECUTOR_UP.labels({ indexer: indexerConfig.fullName() }).inc(); + if (workerContext.queue.length === 0) { await sleep(100); continue; @@ -150,9 +152,11 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise executionDurationTimer(); + METRICS.SUCCESSFUL_EXECUTIONS.labels({ indexer: indexerConfig.fullName() }).inc(); METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerConfig.fullName() }).set(currBlockHeight); postRunSpan.end(); } catch (err) { + METRICS.FAILED_EXECUTIONS.labels({ indexer: indexerConfig.fullName() }).inc(); parentSpan.setAttribute('status', 'failed'); parentPort?.postMessage({ type: WorkerMessageType.STATUS, data: { status: IndexerStatus.FAILING } }); const error = err as Error;