From ffc7ff267eada6975e56c23132d50360e6470416 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 17 Jul 2024 13:29:52 +1200 Subject: [PATCH 01/12] feat: Use less confusing name for `start_block_stream` span --- block-streamer/src/block_stream.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 04cd824a..d5ddddd2 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -167,6 +167,7 @@ impl BlockStream { #[allow(clippy::too_many_arguments)] #[tracing::instrument( + name = "block_stream" skip_all, fields( account_id = indexer.account_id.as_str(), From e28db69e2150d250ee1ffdf68f74d1108a1c5755 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 17 Jul 2024 15:57:49 +1200 Subject: [PATCH 02/12] feat: Monitor and expose block stream processing state --- block-streamer/src/block_stream.rs | 124 ++++++++++++++++++++++++++--- block-streamer/src/redis.rs | 22 +++++ 2 files changed, 134 insertions(+), 12 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index d5ddddd2..62b51d1e 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -1,6 +1,7 @@ +use std::cmp::Ordering; use std::future::Future; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::Poll; use anyhow::Context; @@ -56,12 +57,37 @@ pub struct Task { cancellation_token: tokio_util::sync::CancellationToken, } +/// Represents the processing state of a block stream +#[derive(Clone)] +pub enum ProcessingState { + /// Block Stream is not currently active but can be started. Either has not been started or was + /// stopped. + Idle, + + /// Block Stream is actively processing blocks. + Active, + + /// Block Stream has been intentionally/internally paused due to some condition, i.e. back pressure on + /// the Redis Stream. + Paused, + + /// Block Stream has been halted due to an error or other condition. Must be manually + /// restarted. + Halted, +} + +#[derive(Clone)] +pub struct BlockStreamHealth { + pub processing_state: ProcessingState, +} + pub struct BlockStream { task: Option, pub indexer_config: IndexerConfig, pub chain_id: ChainId, pub version: u64, pub redis_stream: String, + health: Arc>, } impl BlockStream { @@ -77,23 +103,73 @@ impl BlockStream { chain_id, version, redis_stream, + health: Arc::new(Mutex::new(BlockStreamHealth { + processing_state: ProcessingState::Idle, + })), } } - pub fn start( - &mut self, + pub fn health(&self) -> anyhow::Result { + match self.health.lock() { + Ok(health) => Ok(health.clone()), + Err(e) => Err(anyhow::anyhow!("Failed to acquire health lock: {:?}", e)), + } + } + + fn start_health_monitoring_task(&self, redis: Arc) { + tokio::spawn({ + let config = self.indexer_config.clone(); + let health = self.health.clone(); + let redis_stream = self.redis_stream.clone(); + + async move { + let mut last_processed_block = + redis.get_last_processed_block(&config).await.unwrap(); + + loop { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + let new_last_processed_block = + redis.get_last_processed_block(&config).await.unwrap(); + + let stream_size = redis.get_stream_length(redis_stream.clone()).await.unwrap(); + + match new_last_processed_block.cmp(&last_processed_block) { + Ordering::Less => { + tracing::error!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Last processed block decreased" + ); + + health.lock().unwrap().processing_state = ProcessingState::Halted; + } + Ordering::Equal if stream_size == Some(MAX_STREAM_SIZE) => { + health.lock().unwrap().processing_state = ProcessingState::Paused; + } + Ordering::Equal => { + health.lock().unwrap().processing_state = ProcessingState::Halted; + } + Ordering::Greater => { + health.lock().unwrap().processing_state = ProcessingState::Active; + } + }; + + last_processed_block = new_last_processed_block; + } + } + }); + } + + fn start_block_stream_task( + &self, start_block_height: near_indexer_primitives::types::BlockHeight, redis: Arc, reciever_blocks_processor: Arc, lake_s3_client: SharedLakeS3Client, - ) -> anyhow::Result<()> { - if self.task.is_some() { - return Err(anyhow::anyhow!("BlockStreamer has already been started",)); - } - - let cancellation_token = tokio_util::sync::CancellationToken::new(); - - let handle = tokio::spawn({ + cancellation_token: tokio_util::sync::CancellationToken, + ) -> JoinHandle> { + tokio::spawn({ let cancellation_token = cancellation_token.clone(); let indexer_config = self.indexer_config.clone(); let chain_id = self.chain_id.clone(); @@ -137,7 +213,31 @@ impl BlockStream { } } } - }); + }) + } + + pub fn start( + &mut self, + start_block_height: near_indexer_primitives::types::BlockHeight, + redis: Arc, + reciever_blocks_processor: Arc, + lake_s3_client: SharedLakeS3Client, + ) -> anyhow::Result<()> { + if self.task.is_some() { + return Err(anyhow::anyhow!("BlockStreamer has already been started",)); + } + + let cancellation_token = tokio_util::sync::CancellationToken::new(); + + self.start_health_monitoring_task(redis.clone()); + + let handle = self.start_block_stream_task( + start_block_height, + redis, + reciever_blocks_processor, + lake_s3_client, + cancellation_token.clone(), + ); self.task = Some(Task { handle, diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index 959b5ddc..1a84d34c 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -28,6 +28,18 @@ impl RedisCommandsImpl { Ok(Self { connection }) } + pub async fn get(&self, key: T) -> Result, RedisError> + where + T: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("GET: {:?}", key); + + redis::cmd("GET") + .arg(key) + .query_async(&mut self.connection.clone()) + .await + } + pub async fn xadd(&self, stream_key: T, fields: &[(String, U)]) -> Result<(), RedisError> where T: ToRedisArgs + Debug + Send + Sync + 'static, @@ -133,6 +145,16 @@ impl RedisClientImpl { .context("Failed to set last processed block") } + pub async fn get_last_processed_block( + &self, + indexer_config: &IndexerConfig, + ) -> anyhow::Result> { + self.commands + .get(indexer_config.last_processed_block_key()) + .await + .context("Failed to set last processed block") + } + pub async fn get_stream_length(&self, stream: String) -> anyhow::Result> { self.commands.xlen(stream).await } From ea86d617227660420e531e63b93d16c471a45f88 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 17 Jul 2024 16:10:10 +1200 Subject: [PATCH 03/12] feat: Expose block stream health via rpc --- block-streamer/proto/block_streamer.proto | 14 ++++++ .../src/server/block_streamer_service.rs | 46 +++++++++++++++++-- 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index 92d3a2c9..fdec9c71 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -108,4 +108,18 @@ message StreamInfo { string function_name = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; + // Health + Health health = 6; +} + +message Health { + ProcessingState processing_state = 1; +} + +enum ProcessingState { + UNSPECIFIED = 0; + IDLE = 1; + ACTIVE = 2; + PAUSED = 3; + HALTED = 4; } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 88d426b9..c1e79d02 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -21,6 +21,25 @@ pub struct BlockStreamerService { block_streams: Mutex>, } +impl From for blockstreamer::Health { + fn from(health: block_stream::BlockStreamHealth) -> Self { + blockstreamer::Health { + processing_state: match health.processing_state { + block_stream::ProcessingState::Active => { + blockstreamer::ProcessingState::Active as i32 + } + block_stream::ProcessingState::Idle => blockstreamer::ProcessingState::Idle as i32, + block_stream::ProcessingState::Halted => { + blockstreamer::ProcessingState::Halted as i32 + } + block_stream::ProcessingState::Paused => { + blockstreamer::ProcessingState::Paused as i32 + } + }, + } + } +} + impl BlockStreamerService { pub fn new( redis: std::sync::Arc, @@ -77,11 +96,17 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic }); if let Some((stream_id, stream)) = stream_entry { + let stream_health = stream.health().map_err(|err| { + tracing::error!(?err, "Failed to get health of block stream"); + Status::internal("Failed to get health of block stream") + })?; + Ok(Response::new(StreamInfo { stream_id: stream_id.to_string(), account_id: stream.indexer_config.account_id.to_string(), function_name: stream.indexer_config.function_name.to_string(), version: stream.version, + health: Some(stream_health.into()), })) } else { Err(Status::not_found(format!( @@ -210,11 +235,22 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic let block_streams: Vec = lock .values() - .map(|block_stream| StreamInfo { - stream_id: block_stream.indexer_config.get_hash_id(), - account_id: block_stream.indexer_config.account_id.to_string(), - function_name: block_stream.indexer_config.function_name.clone(), - version: block_stream.version, + .map(|block_stream| { + let stream_health = block_stream + .health() + .map_err(|err| { + tracing::error!(?err, "Failed to get health of block stream"); + Status::internal("Failed to get health of block stream") + }) + .ok(); + + StreamInfo { + stream_id: block_stream.indexer_config.get_hash_id(), + account_id: block_stream.indexer_config.account_id.to_string(), + function_name: block_stream.indexer_config.function_name.to_string(), + version: block_stream.version, + health: stream_health.map(|health| health.into()), + } }) .collect(); From 1ec7bd3f5c8ab98494d9eadc224147ccc3f64a54 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 17 Jul 2024 16:20:20 +1200 Subject: [PATCH 04/12] fix: Handle streams larger than max size --- block-streamer/src/block_stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 62b51d1e..222dbef2 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -139,12 +139,12 @@ impl BlockStream { tracing::error!( account_id = config.account_id.as_str(), function_name = config.function_name, - "Last processed block decreased" + "Last processed block should not decrease" ); health.lock().unwrap().processing_state = ProcessingState::Halted; } - Ordering::Equal if stream_size == Some(MAX_STREAM_SIZE) => { + Ordering::Equal if stream_size >= Some(MAX_STREAM_SIZE) => { health.lock().unwrap().processing_state = ProcessingState::Paused; } Ordering::Equal => { From 5c5829293e8907c97b2b41b89afd58c35a72d49f Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 17 Jul 2024 16:43:40 +1200 Subject: [PATCH 05/12] feat: Expose timestamp to ensure health is not stale --- block-streamer/proto/block_streamer.proto | 1 + block-streamer/src/block_stream.rs | 48 ++++++++++++++++--- .../src/server/block_streamer_service.rs | 6 +++ 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index fdec9c71..4ef0fb25 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -114,6 +114,7 @@ message StreamInfo { message Health { ProcessingState processing_state = 1; + uint64 updated_at_timestamp_secs = 2; } enum ProcessingState { diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 222dbef2..e6b5c261 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -3,6 +3,7 @@ use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::Poll; +use std::time::SystemTime; use anyhow::Context; use futures::StreamExt; @@ -79,6 +80,7 @@ pub enum ProcessingState { #[derive(Clone)] pub struct BlockStreamHealth { pub processing_state: ProcessingState, + pub last_updated: SystemTime, } pub struct BlockStream { @@ -105,6 +107,7 @@ impl BlockStream { redis_stream, health: Arc::new(Mutex::new(BlockStreamHealth { processing_state: ProcessingState::Idle, + last_updated: SystemTime::now(), })), } } @@ -130,9 +133,40 @@ impl BlockStream { tokio::time::sleep(std::time::Duration::from_secs(5)).await; let new_last_processed_block = - redis.get_last_processed_block(&config).await.unwrap(); + if let Ok(block) = redis.get_last_processed_block(&config).await { + block + } else { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Failed to fetch last processed block" + ); + continue; + }; + + let stream_size = if let Ok(stream_size) = + redis.get_stream_length(redis_stream.clone()).await + { + stream_size + } else { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Failed to fetch stream size" + ); + continue; + }; - let stream_size = redis.get_stream_length(redis_stream.clone()).await.unwrap(); + let mut health_lock = if let Ok(health) = health.lock() { + health + } else { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Failed to acquire health lock" + ); + continue; + }; match new_last_processed_block.cmp(&last_processed_block) { Ordering::Less => { @@ -142,19 +176,21 @@ impl BlockStream { "Last processed block should not decrease" ); - health.lock().unwrap().processing_state = ProcessingState::Halted; + health_lock.processing_state = ProcessingState::Halted; } Ordering::Equal if stream_size >= Some(MAX_STREAM_SIZE) => { - health.lock().unwrap().processing_state = ProcessingState::Paused; + health_lock.processing_state = ProcessingState::Paused; } Ordering::Equal => { - health.lock().unwrap().processing_state = ProcessingState::Halted; + health_lock.processing_state = ProcessingState::Halted; } Ordering::Greater => { - health.lock().unwrap().processing_state = ProcessingState::Active; + health_lock.processing_state = ProcessingState::Active; } }; + health_lock.last_updated = SystemTime::now(); + last_processed_block = new_last_processed_block; } } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index c1e79d02..06d7c576 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::Mutex; +use std::time::SystemTime; use near_lake_framework::near_indexer_primitives; use tonic::{Request, Response, Status}; @@ -36,6 +37,11 @@ impl From for blockstreamer::Health { blockstreamer::ProcessingState::Paused as i32 } }, + updated_at_timestamp_secs: health + .last_updated + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), } } } From abdd0756a30c9356bfcfd805b4856d7c77296923 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 17 Jul 2024 16:46:58 +1200 Subject: [PATCH 06/12] fix: Cancel monitoring task on block stream stop --- block-streamer/src/block_stream.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index e6b5c261..0b22b224 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -54,7 +54,8 @@ impl Future for PollCounter { } pub struct Task { - handle: JoinHandle>, + stream_handle: JoinHandle>, + monitor_handle: JoinHandle<()>, cancellation_token: tokio_util::sync::CancellationToken, } @@ -119,7 +120,7 @@ impl BlockStream { } } - fn start_health_monitoring_task(&self, redis: Arc) { + fn start_health_monitoring_task(&self, redis: Arc) -> JoinHandle<()> { tokio::spawn({ let config = self.indexer_config.clone(); let health = self.health.clone(); @@ -194,7 +195,7 @@ impl BlockStream { last_processed_block = new_last_processed_block; } } - }); + }) } fn start_block_stream_task( @@ -265,9 +266,9 @@ impl BlockStream { let cancellation_token = tokio_util::sync::CancellationToken::new(); - self.start_health_monitoring_task(redis.clone()); + let monitor_handle = self.start_health_monitoring_task(redis.clone()); - let handle = self.start_block_stream_task( + let stream_handle = self.start_block_stream_task( start_block_height, redis, reciever_blocks_processor, @@ -276,7 +277,8 @@ impl BlockStream { ); self.task = Some(Task { - handle, + stream_handle, + monitor_handle, cancellation_token, }); @@ -285,8 +287,9 @@ impl BlockStream { pub async fn cancel(&mut self) -> anyhow::Result<()> { if let Some(task) = self.task.take() { + task.monitor_handle.abort(); task.cancellation_token.cancel(); - let _ = task.handle.await?; + let _ = task.stream_handle.await?; // Fails if metric doesn't exist, i.e. task was never polled let _ = metrics::BLOCK_STREAM_UP From 89d3d343d1fafd93540b00378b8ba19d066900aa Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 17 Jul 2024 20:00:06 +1200 Subject: [PATCH 07/12] refactor: Rename `ProcessingState` variants --- block-streamer/proto/block_streamer.proto | 4 ++-- block-streamer/src/block_stream.rs | 12 ++++++------ block-streamer/src/server/block_streamer_service.rs | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index 4ef0fb25..e8b202e9 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -120,7 +120,7 @@ message Health { enum ProcessingState { UNSPECIFIED = 0; IDLE = 1; - ACTIVE = 2; + RUNNING = 2; PAUSED = 3; - HALTED = 4; + STALLED = 4; } diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 0b22b224..80d03307 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -67,15 +67,15 @@ pub enum ProcessingState { Idle, /// Block Stream is actively processing blocks. - Active, + Running, /// Block Stream has been intentionally/internally paused due to some condition, i.e. back pressure on /// the Redis Stream. Paused, - /// Block Stream has been halted due to an error or other condition. Must be manually + /// Block Stream has stalled due to an error or other condition. Must be manually /// restarted. - Halted, + Stalled, } #[derive(Clone)] @@ -177,16 +177,16 @@ impl BlockStream { "Last processed block should not decrease" ); - health_lock.processing_state = ProcessingState::Halted; + health_lock.processing_state = ProcessingState::Stalled; } Ordering::Equal if stream_size >= Some(MAX_STREAM_SIZE) => { health_lock.processing_state = ProcessingState::Paused; } Ordering::Equal => { - health_lock.processing_state = ProcessingState::Halted; + health_lock.processing_state = ProcessingState::Stalled; } Ordering::Greater => { - health_lock.processing_state = ProcessingState::Active; + health_lock.processing_state = ProcessingState::Running; } }; diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 06d7c576..06e2729d 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -26,12 +26,12 @@ impl From for blockstreamer::Health { fn from(health: block_stream::BlockStreamHealth) -> Self { blockstreamer::Health { processing_state: match health.processing_state { - block_stream::ProcessingState::Active => { - blockstreamer::ProcessingState::Active as i32 + block_stream::ProcessingState::Running => { + blockstreamer::ProcessingState::Running as i32 } block_stream::ProcessingState::Idle => blockstreamer::ProcessingState::Idle as i32, - block_stream::ProcessingState::Halted => { - blockstreamer::ProcessingState::Halted as i32 + block_stream::ProcessingState::Stalled => { + blockstreamer::ProcessingState::Stalled as i32 } block_stream::ProcessingState::Paused => { blockstreamer::ProcessingState::Paused as i32 From fd4f554343a7ffb5871662b87cc36f4c3bc4247b Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 17 Jul 2024 20:22:16 +1200 Subject: [PATCH 08/12] chore: Add doc comments to block streamer proto --- block-streamer/proto/block_streamer.proto | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index e8b202e9..1415263a 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -108,19 +108,26 @@ message StreamInfo { string function_name = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; - // Health + // Contains health information for the Block Stream Health health = 6; } +// Contains health information for the Block Stream message Health { + // The processing state of the block stream ProcessingState processing_state = 1; + // When the health info was last updated uint64 updated_at_timestamp_secs = 2; } enum ProcessingState { UNSPECIFIED = 0; + // Not started, or has been stopped IDLE = 1; + // Running as expected RUNNING = 2; + // Intentionally paused due to some internal condition PAUSED = 3; + // Stopped due to some unknown error STALLED = 4; } From 167a7c5dc93c76f05a261cc7617531ffdd5e7e57 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 17 Jul 2024 21:23:26 +1200 Subject: [PATCH 09/12] feat: Expose health information from executor rpc --- runner/protos/runner.proto | 21 ++++++++++++++++- .../services/runner/runner-service.test.ts | 16 ++++++++----- .../server/services/runner/runner-service.ts | 8 +++++-- runner/src/stream-handler/stream-handler.ts | 23 +++++++++++++------ runner/src/stream-handler/worker.ts | 8 +++---- 5 files changed, 56 insertions(+), 20 deletions(-) diff --git a/runner/protos/runner.proto b/runner/protos/runner.proto index 51045f8c..39801e65 100644 --- a/runner/protos/runner.proto +++ b/runner/protos/runner.proto @@ -62,7 +62,26 @@ message ExecutorInfo { string executor_id = 1; string account_id = 2; string function_name = 3; - string status = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; + Health health = 6; +} + +// Contains health information for the Executor +message Health { + ExecutionState execution_state = 1; +} + +enum ExecutionState { + UNSPECIFIED = 0; + // Running as expected + RUNNING = 1; + // Executor is running, but the execution is erroring + FAILING = 2; + // Waiting for some internal condition to be met before proceeding + WAITING = 3; + // Intentionally stopped + STOPPED = 4; + // Unintentionally stopped + STALLED = 5; } diff --git a/runner/src/server/services/runner/runner-service.test.ts b/runner/src/server/services/runner/runner-service.test.ts index f7a9c6dc..57ff6de6 100644 --- a/runner/src/server/services/runner/runner-service.test.ts +++ b/runner/src/server/services/runner/runner-service.test.ts @@ -1,10 +1,10 @@ import * as grpc from '@grpc/grpc-js'; import type StreamHandler from '../../../stream-handler/stream-handler'; -import { IndexerStatus } from '../../../indexer-meta/indexer-meta'; import { LogLevel } from '../../../indexer-meta/log-entry'; import getRunnerService from './runner-service'; import IndexerConfig from '../../../indexer-config/indexer-config'; +import { ExecutionState } from '../../../generated/runner/ExecutionState'; const BASIC_REDIS_STREAM = 'test-redis-stream'; const BASIC_ACCOUNT_ID = 'test-account-id'; @@ -15,7 +15,7 @@ const BASIC_CODE = 'test-code'; const BASIC_SCHEMA = 'test-schema'; const BASIC_VERSION = 1; const BASIC_EXECUTOR_CONTEXT = { - status: IndexerStatus.RUNNING, + executionState: ExecutionState.RUNNING, }; describe('Runner gRPC Service', () => { @@ -77,8 +77,10 @@ describe('Runner gRPC Service', () => { executorId: BASIC_EXECUTOR_ID, accountId: genericIndexerConfig.accountId, functionName: genericIndexerConfig.functionName, - status: IndexerStatus.RUNNING, - version: '1' + version: '1', + health: { + executionState: 'RUNNING' + } }); resolve(null); }); @@ -288,8 +290,10 @@ describe('Runner gRPC Service', () => { executorId: BASIC_EXECUTOR_ID, accountId: genericIndexerConfig.accountId, functionName: genericIndexerConfig.functionName, - status: IndexerStatus.RUNNING, - version: '1' + version: '1', + health: { + executionState: 'RUNNING' + } }] }); resolve(null); diff --git a/runner/src/server/services/runner/runner-service.ts b/runner/src/server/services/runner/runner-service.ts index 0d135574..3b45856e 100644 --- a/runner/src/server/services/runner/runner-service.ts +++ b/runner/src/server/services/runner/runner-service.ts @@ -32,7 +32,9 @@ export function getRunnerService ( accountId: executor.indexerConfig.accountId, functionName: executor.indexerConfig.functionName, version: executor.indexerConfig.version.toString(), - status: executor.executorContext.status + health: { + executionState: executor.executorContext.executionState, + } }); } else { const notFoundError = { @@ -150,7 +152,9 @@ export function getRunnerService ( accountId: indexerConfig.accountId, functionName: indexerConfig.functionName, version: indexerConfig.version.toString(), - status: indexerContext.status + health: { + executionState: indexerContext.executionState, + } }); }); callback(null, { diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index c6a07ce6..f1b24b4f 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -3,7 +3,6 @@ import { Worker, isMainThread } from 'worker_threads'; import { registerWorkerMetrics, deregisterWorkerMetrics } from '../metrics'; import Indexer from '../indexer'; -import { IndexerStatus } from '../indexer-meta/indexer-meta'; import LogEntry from '../indexer-meta/log-entry'; import logger from '../logger'; @@ -12,7 +11,7 @@ import type IndexerConfig from '../indexer-config'; export enum WorkerMessageType { METRICS = 'METRICS', BLOCK_HEIGHT = 'BLOCK_HEIGHT', - STATUS = 'STATUS', + EXECUTION_STATE = 'STATUS', } export interface WorkerMessage { @@ -20,8 +19,16 @@ export interface WorkerMessage { data: any } +export enum ExecutionState { + RUNNING = 'RUNNING', + FAILING = 'FAILING', + WAITING = 'WAITING', + STOPPED = 'STOPPED', + STALLED = 'STALLED', +} + interface ExecutorContext { - status: IndexerStatus + executionState: ExecutionState block_height: number } @@ -42,7 +49,7 @@ export default class StreamHandler { }, }); this.executorContext = { - status: IndexerStatus.RUNNING, + executionState: ExecutionState.RUNNING, block_height: indexerConfig.version, }; @@ -56,12 +63,14 @@ export default class StreamHandler { async stop (): Promise { deregisterWorkerMetrics(this.worker.threadId); + this.executorContext.executionState = ExecutionState.STOPPED; + await this.worker.terminate(); } private handleError (error: Error): void { this.logger.error('Terminating thread', error); - this.executorContext.status = IndexerStatus.STOPPED; + this.executorContext.executionState = ExecutionState.STALLED; const indexer = new Indexer(this.indexerConfig); indexer.setStoppedStatus().catch((e) => { @@ -82,8 +91,8 @@ export default class StreamHandler { private handleMessage (message: WorkerMessage): void { switch (message.type) { - case WorkerMessageType.STATUS: - this.executorContext.status = message.data.status; + case WorkerMessageType.EXECUTION_STATE: + this.executorContext.executionState = message.data.state; break; case WorkerMessageType.BLOCK_HEIGHT: this.executorContext.block_height = message.data; diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index d9e77184..fb1a897c 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -7,9 +7,8 @@ import Indexer from '../indexer'; import RedisClient from '../redis-client'; import { METRICS } from '../metrics'; import LakeClient from '../lake-client'; -import { WorkerMessageType, type WorkerMessage } from './stream-handler'; +import { WorkerMessageType, type WorkerMessage, ExecutionState } from './stream-handler'; import setUpTracerExport from '../instrumentation'; -import { IndexerStatus } from '../indexer-meta/indexer-meta'; import IndexerConfig from '../indexer-config'; import parentLogger from '../logger'; import { wrapSpan } from '../utility'; @@ -119,6 +118,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise metricsSpan.end(); if (workerContext.queue.length === 0) { + parentPort?.postMessage({ type: WorkerMessageType.EXECUTION_STATE, data: { state: ExecutionState.WAITING } }); await sleep(100); continue; } @@ -162,7 +162,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise }); const postRunSpan = tracer.startSpan('Delete redis message and shift queue', {}, context.active()); - parentPort?.postMessage({ type: WorkerMessageType.STATUS, data: { status: IndexerStatus.RUNNING } }); + parentPort?.postMessage({ type: WorkerMessageType.EXECUTION_STATE, data: { state: ExecutionState.RUNNING } }); await workerContext.redisClient.deleteStreamMessage(indexerConfig.redisStreamKey, streamMessageId); await workerContext.queue.shift(); @@ -174,7 +174,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise } catch (err) { METRICS.FAILED_EXECUTIONS.labels({ indexer: indexerConfig.fullName() }).inc(); parentSpan.setAttribute('status', 'failed'); - parentPort?.postMessage({ type: WorkerMessageType.STATUS, data: { status: IndexerStatus.FAILING } }); + parentPort?.postMessage({ type: WorkerMessageType.EXECUTION_STATE, data: { state: ExecutionState.FAILING } }); const error = err as Error; if (previousError !== error.message) { previousError = error.message; From 11459b0e175a5d2e04a145c23b4095d51a6e5db5 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 17 Jul 2024 21:23:38 +1200 Subject: [PATCH 10/12] fix: executor rpc examples --- runner/examples/list-executors.ts | 4 ++-- runner/examples/start-executor.ts | 6 +++--- runner/examples/stop-executor.ts | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/runner/examples/list-executors.ts b/runner/examples/list-executors.ts index b5200b7e..39620840 100644 --- a/runner/examples/list-executors.ts +++ b/runner/examples/list-executors.ts @@ -1,13 +1,13 @@ // Run with 'npx ts-node src/test-client.ts' -import runnerClient from '../src/server/runner-client'; +import runnerClient from '../src/server/services/runner/runner-client'; void (async function main () { runnerClient.ListExecutors({}, (err, response) => { if (err) { console.error('List request error: ', err); } else { - console.log('Successful ListExecutors request: ', response); + console.log('list response: ', JSON.stringify({ response }, null, 2)); } }); })(); diff --git a/runner/examples/start-executor.ts b/runner/examples/start-executor.ts index d9466d1e..9b573dae 100644 --- a/runner/examples/start-executor.ts +++ b/runner/examples/start-executor.ts @@ -1,6 +1,6 @@ // Run with 'npx ts-node src/test-client.ts' -import runnerClient from '../src/server/runner-client'; +import runnerClient from '../src/server/services/runner/runner-client'; const schema = ` CREATE TABLE @@ -13,7 +13,7 @@ CREATE TABLE `; const code = ` -console.log("hello"); +// do nothing `; const indexer = { @@ -35,7 +35,7 @@ void (async function main () { if (err) { console.error('error: ', err); } else { - console.log('start request: ', response); + console.log('start response: ', JSON.stringify({ response }, null, 2)); } }); })(); diff --git a/runner/examples/stop-executor.ts b/runner/examples/stop-executor.ts index 03466c99..ff7eef1b 100644 --- a/runner/examples/stop-executor.ts +++ b/runner/examples/stop-executor.ts @@ -1,13 +1,13 @@ // Run with 'npx ts-node src/test-client.ts' -import runnerClient from '../src/server/runner-client'; +import runnerClient from '../src/server/services/runner/runner-client'; runnerClient.StopExecutor({ - executorId: 'SOME_EXECUTOR_ID' + executorId: '0293a6b1dcd2259a8be6b59a8cd3e7b4285e540a64a7cbe99639947f7b7e2f9a' }, (err, response) => { if (err) { console.error('error: ', err); } else { - console.log('stop request: ', response); + console.log('stop request: ', JSON.stringify({ response }, null, 2)); } }); From 7c332635d9a1528a1f73b6700fba436fb12d93ed Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 17 Jul 2024 21:35:17 +1200 Subject: [PATCH 11/12] refactor: `Paused` -> `Waiting` --- block-streamer/proto/block_streamer.proto | 4 ++-- block-streamer/src/block_stream.rs | 4 ++-- block-streamer/src/server/block_streamer_service.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index 1415263a..fd24b797 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -126,8 +126,8 @@ enum ProcessingState { IDLE = 1; // Running as expected RUNNING = 2; - // Intentionally paused due to some internal condition - PAUSED = 3; + // Waiting for some internal condition to be met before continuing + WAITING = 3; // Stopped due to some unknown error STALLED = 4; } diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 80d03307..bf19038b 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -71,7 +71,7 @@ pub enum ProcessingState { /// Block Stream has been intentionally/internally paused due to some condition, i.e. back pressure on /// the Redis Stream. - Paused, + Waiting, /// Block Stream has stalled due to an error or other condition. Must be manually /// restarted. @@ -180,7 +180,7 @@ impl BlockStream { health_lock.processing_state = ProcessingState::Stalled; } Ordering::Equal if stream_size >= Some(MAX_STREAM_SIZE) => { - health_lock.processing_state = ProcessingState::Paused; + health_lock.processing_state = ProcessingState::Waiting; } Ordering::Equal => { health_lock.processing_state = ProcessingState::Stalled; diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 06e2729d..4dd3f01a 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -33,8 +33,8 @@ impl From for blockstreamer::Health { block_stream::ProcessingState::Stalled => { blockstreamer::ProcessingState::Stalled as i32 } - block_stream::ProcessingState::Paused => { - blockstreamer::ProcessingState::Paused as i32 + block_stream::ProcessingState::Waiting => { + blockstreamer::ProcessingState::Waiting as i32 } }, updated_at_timestamp_secs: health From 249acc208b1d553745ea0cd813eb309e249e6a1f Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 18 Jul 2024 07:15:53 +1200 Subject: [PATCH 12/12] feat: Update runner client proto --- runner-client/proto/runner.proto | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/runner-client/proto/runner.proto b/runner-client/proto/runner.proto index 51045f8c..39801e65 100644 --- a/runner-client/proto/runner.proto +++ b/runner-client/proto/runner.proto @@ -62,7 +62,26 @@ message ExecutorInfo { string executor_id = 1; string account_id = 2; string function_name = 3; - string status = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; + Health health = 6; +} + +// Contains health information for the Executor +message Health { + ExecutionState execution_state = 1; +} + +enum ExecutionState { + UNSPECIFIED = 0; + // Running as expected + RUNNING = 1; + // Executor is running, but the execution is erroring + FAILING = 2; + // Waiting for some internal condition to be met before proceeding + WAITING = 3; + // Intentionally stopped + STOPPED = 4; + // Unintentionally stopped + STALLED = 5; }