diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index 92d3a2c90..fd24b7972 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -108,4 +108,26 @@ message StreamInfo { string function_name = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; + // Contains health information for the Block Stream + Health health = 6; +} + +// Contains health information for the Block Stream +message Health { + // The processing state of the block stream + ProcessingState processing_state = 1; + // When the health info was last updated + uint64 updated_at_timestamp_secs = 2; +} + +enum ProcessingState { + UNSPECIFIED = 0; + // Not started, or has been stopped + IDLE = 1; + // Running as expected + RUNNING = 2; + // Waiting for some internal condition to be met before continuing + WAITING = 3; + // Stopped due to some unknown error + STALLED = 4; } diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 04cd824a6..bf19038b5 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -1,7 +1,9 @@ +use std::cmp::Ordering; use std::future::Future; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::Poll; +use std::time::SystemTime; use anyhow::Context; use futures::StreamExt; @@ -52,16 +54,43 @@ impl Future for PollCounter { } pub struct Task { - handle: JoinHandle>, + stream_handle: JoinHandle>, + monitor_handle: JoinHandle<()>, cancellation_token: tokio_util::sync::CancellationToken, } +/// Represents the processing state of a block stream +#[derive(Clone)] +pub enum ProcessingState { + /// Block Stream is not currently active but can be started. Either has not been started or was + /// stopped. + Idle, + + /// Block Stream is actively processing blocks. + Running, + + /// Block Stream has been intentionally/internally paused due to some condition, i.e. back pressure on + /// the Redis Stream. + Waiting, + + /// Block Stream has stalled due to an error or other condition. Must be manually + /// restarted. + Stalled, +} + +#[derive(Clone)] +pub struct BlockStreamHealth { + pub processing_state: ProcessingState, + pub last_updated: SystemTime, +} + pub struct BlockStream { task: Option, pub indexer_config: IndexerConfig, pub chain_id: ChainId, pub version: u64, pub redis_stream: String, + health: Arc>, } impl BlockStream { @@ -77,23 +106,107 @@ impl BlockStream { chain_id, version, redis_stream, + health: Arc::new(Mutex::new(BlockStreamHealth { + processing_state: ProcessingState::Idle, + last_updated: SystemTime::now(), + })), } } - pub fn start( - &mut self, + pub fn health(&self) -> anyhow::Result { + match self.health.lock() { + Ok(health) => Ok(health.clone()), + Err(e) => Err(anyhow::anyhow!("Failed to acquire health lock: {:?}", e)), + } + } + + fn start_health_monitoring_task(&self, redis: Arc) -> JoinHandle<()> { + tokio::spawn({ + let config = self.indexer_config.clone(); + let health = self.health.clone(); + let redis_stream = self.redis_stream.clone(); + + async move { + let mut last_processed_block = + redis.get_last_processed_block(&config).await.unwrap(); + + loop { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + let new_last_processed_block = + if let Ok(block) = redis.get_last_processed_block(&config).await { + block + } else { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Failed to fetch last processed block" + ); + continue; + }; + + let stream_size = if let Ok(stream_size) = + redis.get_stream_length(redis_stream.clone()).await + { + stream_size + } else { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Failed to fetch stream size" + ); + continue; + }; + + let mut health_lock = if let Ok(health) = health.lock() { + health + } else { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Failed to acquire health lock" + ); + continue; + }; + + match new_last_processed_block.cmp(&last_processed_block) { + Ordering::Less => { + tracing::error!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Last processed block should not decrease" + ); + + health_lock.processing_state = ProcessingState::Stalled; + } + Ordering::Equal if stream_size >= Some(MAX_STREAM_SIZE) => { + health_lock.processing_state = ProcessingState::Waiting; + } + Ordering::Equal => { + health_lock.processing_state = ProcessingState::Stalled; + } + Ordering::Greater => { + health_lock.processing_state = ProcessingState::Running; + } + }; + + health_lock.last_updated = SystemTime::now(); + + last_processed_block = new_last_processed_block; + } + } + }) + } + + fn start_block_stream_task( + &self, start_block_height: near_indexer_primitives::types::BlockHeight, redis: Arc, reciever_blocks_processor: Arc, lake_s3_client: SharedLakeS3Client, - ) -> anyhow::Result<()> { - if self.task.is_some() { - return Err(anyhow::anyhow!("BlockStreamer has already been started",)); - } - - let cancellation_token = tokio_util::sync::CancellationToken::new(); - - let handle = tokio::spawn({ + cancellation_token: tokio_util::sync::CancellationToken, + ) -> JoinHandle> { + tokio::spawn({ let cancellation_token = cancellation_token.clone(); let indexer_config = self.indexer_config.clone(); let chain_id = self.chain_id.clone(); @@ -137,10 +250,35 @@ impl BlockStream { } } } - }); + }) + } + + pub fn start( + &mut self, + start_block_height: near_indexer_primitives::types::BlockHeight, + redis: Arc, + reciever_blocks_processor: Arc, + lake_s3_client: SharedLakeS3Client, + ) -> anyhow::Result<()> { + if self.task.is_some() { + return Err(anyhow::anyhow!("BlockStreamer has already been started",)); + } + + let cancellation_token = tokio_util::sync::CancellationToken::new(); + + let monitor_handle = self.start_health_monitoring_task(redis.clone()); + + let stream_handle = self.start_block_stream_task( + start_block_height, + redis, + reciever_blocks_processor, + lake_s3_client, + cancellation_token.clone(), + ); self.task = Some(Task { - handle, + stream_handle, + monitor_handle, cancellation_token, }); @@ -149,8 +287,9 @@ impl BlockStream { pub async fn cancel(&mut self) -> anyhow::Result<()> { if let Some(task) = self.task.take() { + task.monitor_handle.abort(); task.cancellation_token.cancel(); - let _ = task.handle.await?; + let _ = task.stream_handle.await?; // Fails if metric doesn't exist, i.e. task was never polled let _ = metrics::BLOCK_STREAM_UP @@ -167,6 +306,7 @@ impl BlockStream { #[allow(clippy::too_many_arguments)] #[tracing::instrument( + name = "block_stream" skip_all, fields( account_id = indexer.account_id.as_str(), diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index 959b5ddc9..1a84d34c6 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -28,6 +28,18 @@ impl RedisCommandsImpl { Ok(Self { connection }) } + pub async fn get(&self, key: T) -> Result, RedisError> + where + T: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("GET: {:?}", key); + + redis::cmd("GET") + .arg(key) + .query_async(&mut self.connection.clone()) + .await + } + pub async fn xadd(&self, stream_key: T, fields: &[(String, U)]) -> Result<(), RedisError> where T: ToRedisArgs + Debug + Send + Sync + 'static, @@ -133,6 +145,16 @@ impl RedisClientImpl { .context("Failed to set last processed block") } + pub async fn get_last_processed_block( + &self, + indexer_config: &IndexerConfig, + ) -> anyhow::Result> { + self.commands + .get(indexer_config.last_processed_block_key()) + .await + .context("Failed to set last processed block") + } + pub async fn get_stream_length(&self, stream: String) -> anyhow::Result> { self.commands.xlen(stream).await } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 88d426b9c..4dd3f01ae 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::Mutex; +use std::time::SystemTime; use near_lake_framework::near_indexer_primitives; use tonic::{Request, Response, Status}; @@ -21,6 +22,30 @@ pub struct BlockStreamerService { block_streams: Mutex>, } +impl From for blockstreamer::Health { + fn from(health: block_stream::BlockStreamHealth) -> Self { + blockstreamer::Health { + processing_state: match health.processing_state { + block_stream::ProcessingState::Running => { + blockstreamer::ProcessingState::Running as i32 + } + block_stream::ProcessingState::Idle => blockstreamer::ProcessingState::Idle as i32, + block_stream::ProcessingState::Stalled => { + blockstreamer::ProcessingState::Stalled as i32 + } + block_stream::ProcessingState::Waiting => { + blockstreamer::ProcessingState::Waiting as i32 + } + }, + updated_at_timestamp_secs: health + .last_updated + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + } + } +} + impl BlockStreamerService { pub fn new( redis: std::sync::Arc, @@ -77,11 +102,17 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic }); if let Some((stream_id, stream)) = stream_entry { + let stream_health = stream.health().map_err(|err| { + tracing::error!(?err, "Failed to get health of block stream"); + Status::internal("Failed to get health of block stream") + })?; + Ok(Response::new(StreamInfo { stream_id: stream_id.to_string(), account_id: stream.indexer_config.account_id.to_string(), function_name: stream.indexer_config.function_name.to_string(), version: stream.version, + health: Some(stream_health.into()), })) } else { Err(Status::not_found(format!( @@ -210,11 +241,22 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic let block_streams: Vec = lock .values() - .map(|block_stream| StreamInfo { - stream_id: block_stream.indexer_config.get_hash_id(), - account_id: block_stream.indexer_config.account_id.to_string(), - function_name: block_stream.indexer_config.function_name.clone(), - version: block_stream.version, + .map(|block_stream| { + let stream_health = block_stream + .health() + .map_err(|err| { + tracing::error!(?err, "Failed to get health of block stream"); + Status::internal("Failed to get health of block stream") + }) + .ok(); + + StreamInfo { + stream_id: block_stream.indexer_config.get_hash_id(), + account_id: block_stream.indexer_config.account_id.to_string(), + function_name: block_stream.indexer_config.function_name.to_string(), + version: block_stream.version, + health: stream_health.map(|health| health.into()), + } }) .collect(); diff --git a/runner-client/proto/runner.proto b/runner-client/proto/runner.proto index 51045f8c5..39801e657 100644 --- a/runner-client/proto/runner.proto +++ b/runner-client/proto/runner.proto @@ -62,7 +62,26 @@ message ExecutorInfo { string executor_id = 1; string account_id = 2; string function_name = 3; - string status = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; + Health health = 6; +} + +// Contains health information for the Executor +message Health { + ExecutionState execution_state = 1; +} + +enum ExecutionState { + UNSPECIFIED = 0; + // Running as expected + RUNNING = 1; + // Executor is running, but the execution is erroring + FAILING = 2; + // Waiting for some internal condition to be met before proceeding + WAITING = 3; + // Intentionally stopped + STOPPED = 4; + // Unintentionally stopped + STALLED = 5; } diff --git a/runner/examples/list-executors.ts b/runner/examples/list-executors.ts index b5200b7e2..396208405 100644 --- a/runner/examples/list-executors.ts +++ b/runner/examples/list-executors.ts @@ -1,13 +1,13 @@ // Run with 'npx ts-node src/test-client.ts' -import runnerClient from '../src/server/runner-client'; +import runnerClient from '../src/server/services/runner/runner-client'; void (async function main () { runnerClient.ListExecutors({}, (err, response) => { if (err) { console.error('List request error: ', err); } else { - console.log('Successful ListExecutors request: ', response); + console.log('list response: ', JSON.stringify({ response }, null, 2)); } }); })(); diff --git a/runner/examples/start-executor.ts b/runner/examples/start-executor.ts index d9466d1e9..9b573daef 100644 --- a/runner/examples/start-executor.ts +++ b/runner/examples/start-executor.ts @@ -1,6 +1,6 @@ // Run with 'npx ts-node src/test-client.ts' -import runnerClient from '../src/server/runner-client'; +import runnerClient from '../src/server/services/runner/runner-client'; const schema = ` CREATE TABLE @@ -13,7 +13,7 @@ CREATE TABLE `; const code = ` -console.log("hello"); +// do nothing `; const indexer = { @@ -35,7 +35,7 @@ void (async function main () { if (err) { console.error('error: ', err); } else { - console.log('start request: ', response); + console.log('start response: ', JSON.stringify({ response }, null, 2)); } }); })(); diff --git a/runner/examples/stop-executor.ts b/runner/examples/stop-executor.ts index 03466c997..ff7eef1bf 100644 --- a/runner/examples/stop-executor.ts +++ b/runner/examples/stop-executor.ts @@ -1,13 +1,13 @@ // Run with 'npx ts-node src/test-client.ts' -import runnerClient from '../src/server/runner-client'; +import runnerClient from '../src/server/services/runner/runner-client'; runnerClient.StopExecutor({ - executorId: 'SOME_EXECUTOR_ID' + executorId: '0293a6b1dcd2259a8be6b59a8cd3e7b4285e540a64a7cbe99639947f7b7e2f9a' }, (err, response) => { if (err) { console.error('error: ', err); } else { - console.log('stop request: ', response); + console.log('stop request: ', JSON.stringify({ response }, null, 2)); } }); diff --git a/runner/protos/runner.proto b/runner/protos/runner.proto index 51045f8c5..39801e657 100644 --- a/runner/protos/runner.proto +++ b/runner/protos/runner.proto @@ -62,7 +62,26 @@ message ExecutorInfo { string executor_id = 1; string account_id = 2; string function_name = 3; - string status = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; + Health health = 6; +} + +// Contains health information for the Executor +message Health { + ExecutionState execution_state = 1; +} + +enum ExecutionState { + UNSPECIFIED = 0; + // Running as expected + RUNNING = 1; + // Executor is running, but the execution is erroring + FAILING = 2; + // Waiting for some internal condition to be met before proceeding + WAITING = 3; + // Intentionally stopped + STOPPED = 4; + // Unintentionally stopped + STALLED = 5; } diff --git a/runner/src/server/services/runner/runner-service.test.ts b/runner/src/server/services/runner/runner-service.test.ts index f7a9c6dc1..57ff6de65 100644 --- a/runner/src/server/services/runner/runner-service.test.ts +++ b/runner/src/server/services/runner/runner-service.test.ts @@ -1,10 +1,10 @@ import * as grpc from '@grpc/grpc-js'; import type StreamHandler from '../../../stream-handler/stream-handler'; -import { IndexerStatus } from '../../../indexer-meta/indexer-meta'; import { LogLevel } from '../../../indexer-meta/log-entry'; import getRunnerService from './runner-service'; import IndexerConfig from '../../../indexer-config/indexer-config'; +import { ExecutionState } from '../../../generated/runner/ExecutionState'; const BASIC_REDIS_STREAM = 'test-redis-stream'; const BASIC_ACCOUNT_ID = 'test-account-id'; @@ -15,7 +15,7 @@ const BASIC_CODE = 'test-code'; const BASIC_SCHEMA = 'test-schema'; const BASIC_VERSION = 1; const BASIC_EXECUTOR_CONTEXT = { - status: IndexerStatus.RUNNING, + executionState: ExecutionState.RUNNING, }; describe('Runner gRPC Service', () => { @@ -77,8 +77,10 @@ describe('Runner gRPC Service', () => { executorId: BASIC_EXECUTOR_ID, accountId: genericIndexerConfig.accountId, functionName: genericIndexerConfig.functionName, - status: IndexerStatus.RUNNING, - version: '1' + version: '1', + health: { + executionState: 'RUNNING' + } }); resolve(null); }); @@ -288,8 +290,10 @@ describe('Runner gRPC Service', () => { executorId: BASIC_EXECUTOR_ID, accountId: genericIndexerConfig.accountId, functionName: genericIndexerConfig.functionName, - status: IndexerStatus.RUNNING, - version: '1' + version: '1', + health: { + executionState: 'RUNNING' + } }] }); resolve(null); diff --git a/runner/src/server/services/runner/runner-service.ts b/runner/src/server/services/runner/runner-service.ts index 0d135574f..3b45856e9 100644 --- a/runner/src/server/services/runner/runner-service.ts +++ b/runner/src/server/services/runner/runner-service.ts @@ -32,7 +32,9 @@ export function getRunnerService ( accountId: executor.indexerConfig.accountId, functionName: executor.indexerConfig.functionName, version: executor.indexerConfig.version.toString(), - status: executor.executorContext.status + health: { + executionState: executor.executorContext.executionState, + } }); } else { const notFoundError = { @@ -150,7 +152,9 @@ export function getRunnerService ( accountId: indexerConfig.accountId, functionName: indexerConfig.functionName, version: indexerConfig.version.toString(), - status: indexerContext.status + health: { + executionState: indexerContext.executionState, + } }); }); callback(null, { diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index c6a07ce6c..f1b24b4fb 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -3,7 +3,6 @@ import { Worker, isMainThread } from 'worker_threads'; import { registerWorkerMetrics, deregisterWorkerMetrics } from '../metrics'; import Indexer from '../indexer'; -import { IndexerStatus } from '../indexer-meta/indexer-meta'; import LogEntry from '../indexer-meta/log-entry'; import logger from '../logger'; @@ -12,7 +11,7 @@ import type IndexerConfig from '../indexer-config'; export enum WorkerMessageType { METRICS = 'METRICS', BLOCK_HEIGHT = 'BLOCK_HEIGHT', - STATUS = 'STATUS', + EXECUTION_STATE = 'STATUS', } export interface WorkerMessage { @@ -20,8 +19,16 @@ export interface WorkerMessage { data: any } +export enum ExecutionState { + RUNNING = 'RUNNING', + FAILING = 'FAILING', + WAITING = 'WAITING', + STOPPED = 'STOPPED', + STALLED = 'STALLED', +} + interface ExecutorContext { - status: IndexerStatus + executionState: ExecutionState block_height: number } @@ -42,7 +49,7 @@ export default class StreamHandler { }, }); this.executorContext = { - status: IndexerStatus.RUNNING, + executionState: ExecutionState.RUNNING, block_height: indexerConfig.version, }; @@ -56,12 +63,14 @@ export default class StreamHandler { async stop (): Promise { deregisterWorkerMetrics(this.worker.threadId); + this.executorContext.executionState = ExecutionState.STOPPED; + await this.worker.terminate(); } private handleError (error: Error): void { this.logger.error('Terminating thread', error); - this.executorContext.status = IndexerStatus.STOPPED; + this.executorContext.executionState = ExecutionState.STALLED; const indexer = new Indexer(this.indexerConfig); indexer.setStoppedStatus().catch((e) => { @@ -82,8 +91,8 @@ export default class StreamHandler { private handleMessage (message: WorkerMessage): void { switch (message.type) { - case WorkerMessageType.STATUS: - this.executorContext.status = message.data.status; + case WorkerMessageType.EXECUTION_STATE: + this.executorContext.executionState = message.data.state; break; case WorkerMessageType.BLOCK_HEIGHT: this.executorContext.block_height = message.data; diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index d9e77184d..fb1a897ce 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -7,9 +7,8 @@ import Indexer from '../indexer'; import RedisClient from '../redis-client'; import { METRICS } from '../metrics'; import LakeClient from '../lake-client'; -import { WorkerMessageType, type WorkerMessage } from './stream-handler'; +import { WorkerMessageType, type WorkerMessage, ExecutionState } from './stream-handler'; import setUpTracerExport from '../instrumentation'; -import { IndexerStatus } from '../indexer-meta/indexer-meta'; import IndexerConfig from '../indexer-config'; import parentLogger from '../logger'; import { wrapSpan } from '../utility'; @@ -119,6 +118,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise metricsSpan.end(); if (workerContext.queue.length === 0) { + parentPort?.postMessage({ type: WorkerMessageType.EXECUTION_STATE, data: { state: ExecutionState.WAITING } }); await sleep(100); continue; } @@ -162,7 +162,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise }); const postRunSpan = tracer.startSpan('Delete redis message and shift queue', {}, context.active()); - parentPort?.postMessage({ type: WorkerMessageType.STATUS, data: { status: IndexerStatus.RUNNING } }); + parentPort?.postMessage({ type: WorkerMessageType.EXECUTION_STATE, data: { state: ExecutionState.RUNNING } }); await workerContext.redisClient.deleteStreamMessage(indexerConfig.redisStreamKey, streamMessageId); await workerContext.queue.shift(); @@ -174,7 +174,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise } catch (err) { METRICS.FAILED_EXECUTIONS.labels({ indexer: indexerConfig.fullName() }).inc(); parentSpan.setAttribute('status', 'failed'); - parentPort?.postMessage({ type: WorkerMessageType.STATUS, data: { status: IndexerStatus.FAILING } }); + parentPort?.postMessage({ type: WorkerMessageType.EXECUTION_STATE, data: { state: ExecutionState.FAILING } }); const error = err as Error; if (previousError !== error.message) { previousError = error.message;