From f2cdc789e8f5af5217c1980c591947ef90c0bddb Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 18 Jul 2024 13:56:34 +1200 Subject: [PATCH 1/6] refactor: Spawn dedicated control loops per Indexer (#866) This PR introduces dedicated/self-contained control loops per Indexer, replacing the single/combined control loop. The motive for this ticket is described in #811, you can read more about it there. Overall, there is lots of clean up to be done, but I wanted to get this out the door as quick as possible as to not block the features required to build on top of this. I've discussed some of the major concerns below. ## `LifecycleManager` These dedicated control loops are managed by the `LifecycleManager` struct. This is a state machine which progresses the Indexer through different states depending on the context. The different states and their transitions are described on the `LifecycleState` enum: ```rust /// Represents the different lifecycle states of an Indexer #[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub enum LifecycleState { /// Pre-requisite resources, i.e. Data Layer, are being created. /// /// Transitions: /// - `Running` on success /// - `Repairing` on Data Layer provisioning failure #[default] Initializing, /// Indexer is functional, Block Stream and Executors are continouously monitored to ensure /// they are running the latest version of the Indexer. /// /// Transitions: /// - `Stopping` if suspended /// - `Running` if Block Stream or Executor fails to synchronise, essentially triggering a /// retry /// - `Running` on success Running, /// Indexer is being stopped, Block Stream and Executors are being stopped. /// /// Transitions: /// - `Stopping` on failure, triggering a retry /// - `Stopped` on success Stopping, /// Indexer is stopped, Block Stream and Executors are not running. /// /// Transitions: /// - `Running` if unsuspended Stopped, /// Indexer is in a bad state, currently requires manual intervention, but should eventually /// self heal. This is a dead-end state /// /// Transitions: /// - `Repairing` continuously Repairing, // TODO Add `error` to enable reparation /// Indexer is being deleted, all resources are being cleaned up /// /// Transitions: /// - `Deleting` on failure, triggering a retry /// - `Deleted` on success Deleting, /// Indexer is deleted, all resources are cleaned up, lifecycle manager will exit Deleted, } ``` The logic of this `struct` is very light, triggering high-level actions required within each state, and then returning the next desired state. Most of the "doing" logic has has been encapsulated in the other related `structs` as discussed below. The lifecycle state is stored in Redis so that the Indexer can pickup where it left off. A migration has been added to accommodate this new field, which replaces the existing `provisioned_state` field. ## `Handler`s Previously, the "handlers", i.e. `BlockStreamsHandler`, were lightweight `structs` which wrapped the gRPC client/methods. In this PR, I've moved all "synchronisation" logic to these structs. So rather than calling the e.g. `data_layer_handler.start_provisioning_task()` method, we can call `ensure_provisioned()` which manages all related logic. I feel this has been encapsulation, and allows the `LifecycleManager` to be light. I've had to remove `automock`, so we don't have mocked versions for this right now. Cloning mocked versions is tricky, and requires manual mocking. Rather than bloat this PR, I've left this out. Eventually, I'll separate the "sync" logic from the "client" logic, so that the latter can be easily mocked, and the sync logic covered by unit tests. Additionally, I've added `get` methods for both Block Streamer and Executors RPC, as listing is no longer convenient given we are managing Indexers individually. The getters use `account_id` and `function_name` as opposed to IDs. I'm considering moving away from IDs as the only way to get them is via list, which isn't helpful. Right now it's somewhat of a transitory state. --- block-streamer/proto/block_streamer.proto | 11 + .../src/server/block_streamer_service.rs | 95 +- coordinator/src/handlers/block_streams.rs | 176 +++- coordinator/src/handlers/data_layer.rs | 92 +- coordinator/src/handlers/executors.rs | 84 +- coordinator/src/indexer_state.rs | 109 +- coordinator/src/lifecycle.rs | 343 +++++++ coordinator/src/main.rs | 76 +- coordinator/src/registry.rs | 16 +- .../src/server/indexer_manager_service.rs | 10 +- coordinator/src/synchroniser.rs | 953 ------------------ runner-client/proto/runner.proto | 9 + runner/protos/runner.proto | 9 + .../services/runner/runner-service.test.ts | 53 + .../server/services/runner/runner-service.ts | 23 + 15 files changed, 992 insertions(+), 1067 deletions(-) create mode 100644 coordinator/src/lifecycle.rs diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index 5d1e0517a..92d3a2c90 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -12,6 +12,17 @@ service BlockStreamer { // Lists all current BlockStream processes rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse); + + // Get info for an existing BlockStream process + rpc GetStream (GetStreamRequest) returns (StreamInfo); +} + +// Request message for getting a BlockStream +message GetStreamRequest { + // Account ID which the indexer is defined under + string account_id = 1; + // Name of the indexer + string function_name = 2; } // Request message for starting a BlockStream diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 94ccb22aa..88d426b9c 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -59,6 +59,38 @@ impl BlockStreamerService { #[tonic::async_trait] impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService { + #[tracing::instrument(skip(self))] + async fn get_stream( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + + let lock = self.block_streams.lock().map_err(|err| { + tracing::error!(?err, "Failed to acquire `block_streams` lock"); + tonic::Status::internal("Failed to acquire `block_streams` lock") + })?; + + let stream_entry = lock.iter().find(|(_, block_stream)| { + block_stream.indexer_config.account_id == request.account_id + && block_stream.indexer_config.function_name == request.function_name + }); + + if let Some((stream_id, stream)) = stream_entry { + Ok(Response::new(StreamInfo { + stream_id: stream_id.to_string(), + account_id: stream.indexer_config.account_id.to_string(), + function_name: stream.indexer_config.function_name.to_string(), + version: stream.version, + })) + } else { + Err(Status::not_found(format!( + "Block Stream for account {} and name {} does not exist", + request.account_id, request.function_name + ))) + } + } + #[tracing::instrument(skip(self))] async fn start_stream( &self, @@ -171,7 +203,11 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic &self, _request: Request, ) -> Result, Status> { - let lock = self.block_streams.lock().unwrap(); + let lock = self.block_streams.lock().map_err(|err| { + tracing::error!(?err, "Failed to acquire `block_streams` lock"); + tonic::Status::internal("Failed to acquire `block_streams` lock") + })?; + let block_streams: Vec = lock .values() .map(|block_stream| StreamInfo { @@ -234,6 +270,63 @@ mod tests { ) } + #[tokio::test] + async fn get_existing_block_stream() { + let block_streamer_service = create_block_streamer_service(); + + { + let lock = block_streamer_service.get_block_streams_lock().unwrap(); + assert_eq!(lock.len(), 0); + } + + block_streamer_service + .start_stream(Request::new(StartStreamRequest { + start_block_height: 0, + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 0, + redis_stream: "stream".to_string(), + rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: 1, + })), + })) + .await + .unwrap(); + + let stream = block_streamer_service + .get_stream(Request::new(GetStreamRequest { + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + })) + .await + .unwrap(); + + assert_eq!( + stream.into_inner().stream_id, + "16210176318434468568".to_string() + ); + } + + #[tokio::test] + async fn get_non_existant_block_stream() { + let block_streamer_service = create_block_streamer_service(); + + { + let lock = block_streamer_service.get_block_streams_lock().unwrap(); + assert_eq!(lock.len(), 0); + } + + let stream_response = block_streamer_service + .get_stream(Request::new(GetStreamRequest { + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + })) + .await; + + assert_eq!(stream_response.err().unwrap().code(), tonic::Code::NotFound); + } + #[tokio::test] async fn starts_a_block_stream() { let block_streamer_service = create_block_streamer_service(); diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index f88a412d5..bc34c000b 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -5,34 +5,36 @@ pub use block_streamer::StreamInfo; use anyhow::Context; use block_streamer::block_streamer_client::BlockStreamerClient; use block_streamer::{ - start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, ListStreamsRequest, - StartStreamRequest, Status, StopStreamRequest, + start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest, + ListStreamsRequest, StartStreamRequest, Status, StopStreamRequest, }; +use near_primitives::types::AccountId; +use registry_types::StartBlock; use tonic::transport::channel::Channel; use tonic::Request; use crate::indexer_config::IndexerConfig; -use crate::redis::KeyProvider; +use crate::redis::{KeyProvider, RedisClient}; use crate::utils::exponential_retry; -#[cfg(not(test))] -pub use BlockStreamsHandlerImpl as BlockStreamsHandler; -#[cfg(test)] -pub use MockBlockStreamsHandlerImpl as BlockStreamsHandler; - -pub struct BlockStreamsHandlerImpl { +#[derive(Clone)] +pub struct BlockStreamsHandler { client: BlockStreamerClient, + redis_client: RedisClient, } #[cfg_attr(test, mockall::automock)] -impl BlockStreamsHandlerImpl { - pub fn connect(block_streamer_url: &str) -> anyhow::Result { +impl BlockStreamsHandler { + pub fn connect(block_streamer_url: &str, redis_client: RedisClient) -> anyhow::Result { let channel = Channel::from_shared(block_streamer_url.to_string()) .context("Block Streamer URL is invalid")? .connect_lazy(); let client = BlockStreamerClient::new(channel); - Ok(Self { client }) + Ok(Self { + client, + redis_client, + }) } pub async fn list(&self) -> anyhow::Result> { @@ -79,6 +81,26 @@ impl BlockStreamsHandlerImpl { .into() } + pub async fn get( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result> { + let request = GetStreamRequest { + account_id: account_id.to_string(), + function_name: function_name.clone(), + }; + + match self.client.clone().get_stream(Request::new(request)).await { + Ok(response) => Ok(Some(response.into_inner())), + Err(status) if status.code() == tonic::Code::NotFound => Ok(None), + Err(err) => Err(err).context(format!( + "Failed to get stream for account {} and name {}", + account_id, function_name + )), + } + } + pub async fn start( &self, start_block_height: u64, @@ -139,4 +161,134 @@ impl BlockStreamsHandlerImpl { Ok(()) } + + async fn reconfigure_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + if matches!( + config.start_block, + StartBlock::Latest | StartBlock::Height(..) + ) { + self.redis_client.clear_block_stream(config).await?; + } + + let height = match config.start_block { + StartBlock::Latest => config.get_registry_version(), + StartBlock::Height(height) => height, + StartBlock::Continue => self.get_continuation_block_height(config).await?, + }; + + tracing::info!( + start_block = ?config.start_block, + height, + "Starting block stream" + ); + + self.start(height, config).await?; + + Ok(()) + } + + async fn start_new_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let height = match config.start_block { + StartBlock::Height(height) => height, + StartBlock::Latest => config.get_registry_version(), + StartBlock::Continue => { + tracing::warn!( + "Attempted to start new Block Stream with CONTINUE, using LATEST instead" + ); + config.get_registry_version() + } + }; + + tracing::info!( + start_block = ?config.start_block, + height, + "Starting block stream" + ); + + self.start(height, config).await + } + + async fn get_continuation_block_height(&self, config: &IndexerConfig) -> anyhow::Result { + let height = self + .redis_client + .get_last_published_block(config) + .await? + .map(|height| height + 1) + .unwrap_or_else(|| { + tracing::warn!( + "Failed to get continuation block height, using registry version instead" + ); + + config.get_registry_version() + }); + + Ok(height) + } + + async fn resume_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let height = self.get_continuation_block_height(config).await?; + + tracing::info!(height, "Resuming block stream"); + + self.start(height, config).await?; + + Ok(()) + } + + pub async fn synchronise_block_stream( + &self, + config: &IndexerConfig, + previous_sync_version: Option, + ) -> anyhow::Result<()> { + let block_stream = self + .get(config.account_id.clone(), config.function_name.clone()) + .await?; + + if let Some(block_stream) = block_stream { + if block_stream.version == config.get_registry_version() { + return Ok(()); + } + + tracing::info!( + previous_version = block_stream.version, + "Stopping outdated block stream" + ); + + self.stop(block_stream.stream_id.clone()).await?; + + self.reconfigure_block_stream(config).await?; + + return Ok(()); + } + + if previous_sync_version.is_none() { + self.start_new_block_stream(config).await?; + + return Ok(()); + } + + if previous_sync_version.unwrap() != config.get_registry_version() { + self.reconfigure_block_stream(config).await?; + + return Ok(()); + } + + self.resume_block_stream(config).await?; + + Ok(()) + } + + pub async fn stop_if_needed( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + if let Some(block_stream) = self.get(account_id, function_name).await? { + tracing::info!("Stopping block stream"); + + self.stop(block_stream.stream_id).await?; + } + + Ok(()) + } } diff --git a/coordinator/src/handlers/data_layer.rs b/coordinator/src/handlers/data_layer.rs index 68537b3c7..3e2df54dc 100644 --- a/coordinator/src/handlers/data_layer.rs +++ b/coordinator/src/handlers/data_layer.rs @@ -8,23 +8,18 @@ use anyhow::Context; use runner::data_layer::data_layer_client::DataLayerClient; use runner::data_layer::{DeprovisionRequest, GetTaskStatusRequest, ProvisionRequest}; use tonic::transport::channel::Channel; -use tonic::Request; +use tonic::{Request, Status}; use crate::indexer_config::IndexerConfig; -#[cfg(not(test))] -pub use DataLayerHandlerImpl as DataLayerHandler; -#[cfg(test)] -pub use MockDataLayerHandlerImpl as DataLayerHandler; - type TaskId = String; -pub struct DataLayerHandlerImpl { +#[derive(Clone)] +pub struct DataLayerHandler { client: DataLayerClient, } -#[cfg_attr(test, mockall::automock)] -impl DataLayerHandlerImpl { +impl DataLayerHandler { pub fn connect(runner_url: &str) -> anyhow::Result { let channel = Channel::from_shared(runner_url.to_string()) .context("Runner URL is invalid")? @@ -37,7 +32,7 @@ impl DataLayerHandlerImpl { pub async fn start_provisioning_task( &self, indexer_config: &IndexerConfig, - ) -> anyhow::Result { + ) -> Result { let request = ProvisionRequest { account_id: indexer_config.account_id.to_string(), function_name: indexer_config.function_name.clone(), @@ -98,4 +93,81 @@ impl DataLayerHandlerImpl { Ok(status) } + + pub async fn ensure_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { + tracing::info!(account_id = ?indexer_config.account_id, function_name = ?indexer_config.function_name, "Provisioning data layer"); + + let start_task_result = self.start_provisioning_task(indexer_config).await; + + if let Err(error) = start_task_result { + // Already provisioned + if error.code() == tonic::Code::FailedPrecondition { + return Ok(()); + } + + return Err(error.into()); + } + + let task_id = start_task_result.unwrap(); + + let mut iterations = 0; + let delay_seconds = 1; + + loop { + if self.get_task_status(task_id.clone()).await? == TaskStatus::Complete { + break; + } + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + iterations += 1; + + if iterations * delay_seconds % 60 == 0 { + tracing::warn!( + ?indexer_config.account_id, + ?indexer_config.function_name, + "Still waiting for provisioning to complete after {} seconds", + iterations * delay_seconds + ); + } + } + + Ok(()) + } + + pub async fn ensure_deprovisioned( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + tracing::info!(?account_id, ?function_name, "Deprovisioning data layer"); + + let task_id = self + .start_deprovisioning_task(account_id.clone(), function_name.clone()) + .await?; + + let mut iterations = 0; + let delay_seconds = 1; + + loop { + if self.get_task_status(task_id.clone()).await? == TaskStatus::Complete { + break; + } + + tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await; + + iterations += 1; + + if iterations * delay_seconds % 60 == 0 { + tracing::warn!( + ?account_id, + ?function_name, + "Still waiting for deprovisioning to complete after {} seconds", + iterations * delay_seconds + ); + } + } + + Ok(()) + } } diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 686164048..01c9cc4be 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -1,10 +1,11 @@ #![cfg_attr(test, allow(dead_code))] +use near_primitives::types::AccountId; pub use runner::ExecutorInfo; use anyhow::Context; use runner::runner_client::RunnerClient; -use runner::{ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; +use runner::{GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; use tonic::transport::channel::Channel; use tonic::Request; @@ -12,17 +13,12 @@ use crate::indexer_config::IndexerConfig; use crate::redis::KeyProvider; use crate::utils::exponential_retry; -#[cfg(not(test))] -pub use ExecutorsHandlerImpl as ExecutorsHandler; -#[cfg(test)] -pub use MockExecutorsHandlerImpl as ExecutorsHandler; - -pub struct ExecutorsHandlerImpl { +#[derive(Clone)] +pub struct ExecutorsHandler { client: RunnerClient, } -#[cfg_attr(test, mockall::automock)] -impl ExecutorsHandlerImpl { +impl ExecutorsHandler { pub fn connect(runner_url: &str) -> anyhow::Result { let channel = Channel::from_shared(runner_url.to_string()) .context("Runner URL is invalid")? @@ -50,6 +46,31 @@ impl ExecutorsHandlerImpl { .await } + pub async fn get( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result> { + let request = GetExecutorRequest { + account_id: account_id.to_string(), + function_name: function_name.clone(), + }; + + match self + .client + .clone() + .get_executor(Request::new(request)) + .await + { + Ok(response) => Ok(Some(response.into_inner())), + Err(status) if status.code() == tonic::Code::NotFound => Ok(None), + Err(err) => Err(err).context(format!( + "Failed to get executor for account {} and name {}", + account_id, function_name + )), + } + } + pub async fn start(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { let request = StartExecutorRequest { code: indexer_config.code.clone(), @@ -97,4 +118,49 @@ impl ExecutorsHandlerImpl { Ok(()) } + + pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let executor = self + .get(config.account_id.clone(), config.function_name.clone()) + .await?; + + if let Some(executor) = executor { + if executor.version == config.get_registry_version() { + return Ok(()); + } + + tracing::info!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + version = executor.version, + "Stopping outdated executor" + ); + + self.stop(executor.executor_id).await?; + } + + tracing::info!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + version = config.get_registry_version(), + "Starting executor" + ); + + self.start(config).await?; + + Ok(()) + } + + pub async fn stop_if_needed( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + if let Some(executor) = self.get(account_id, function_name).await? { + tracing::info!("Stopping executor"); + self.stop(executor.executor_id).await?; + } + + Ok(()) + } } diff --git a/coordinator/src/indexer_state.rs b/coordinator/src/indexer_state.rs index 2539e06a2..1200648f0 100644 --- a/coordinator/src/indexer_state.rs +++ b/coordinator/src/indexer_state.rs @@ -4,6 +4,7 @@ use anyhow::Context; use near_primitives::types::AccountId; use crate::indexer_config::IndexerConfig; +use crate::lifecycle::LifecycleState; use crate::redis::{KeyProvider, RedisClient}; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] @@ -16,7 +17,7 @@ pub enum ProvisionedState { } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] -pub struct IndexerState { +pub struct OldIndexerState { pub account_id: AccountId, pub function_name: String, pub block_stream_synced_at: Option, @@ -24,6 +25,15 @@ pub struct IndexerState { pub provisioned_state: ProvisionedState, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub struct IndexerState { + pub account_id: AccountId, + pub function_name: String, + pub block_stream_synced_at: Option, + pub enabled: bool, + pub lifecycle_state: LifecycleState, +} + impl KeyProvider for IndexerState { fn account_id(&self) -> String { self.account_id.to_string() @@ -49,13 +59,46 @@ impl IndexerStateManagerImpl { Self { redis_client } } + pub async fn migrate(&self) -> anyhow::Result<()> { + let raw_states = self.redis_client.list_indexer_states().await?; + + for raw_state in raw_states { + if let Ok(state) = serde_json::from_str::(&raw_state) { + tracing::info!( + "{}/{} already migrated, skipping", + state.account_id, + state.function_name + ); + continue; + } + + tracing::info!("Migrating {}", raw_state); + + let old_state: OldIndexerState = serde_json::from_str(&raw_state)?; + + let state = IndexerState { + account_id: old_state.account_id, + function_name: old_state.function_name, + block_stream_synced_at: old_state.block_stream_synced_at, + enabled: old_state.enabled, + lifecycle_state: LifecycleState::Running, + }; + + self.redis_client + .set(state.get_state_key(), serde_json::to_string(&state)?) + .await?; + } + + Ok(()) + } + fn get_default_state(&self, indexer_config: &IndexerConfig) -> IndexerState { IndexerState { account_id: indexer_config.account_id.clone(), function_name: indexer_config.function_name.clone(), block_stream_synced_at: None, enabled: true, - provisioned_state: ProvisionedState::Unprovisioned, + lifecycle_state: LifecycleState::default(), } } @@ -76,10 +119,12 @@ impl IndexerStateManagerImpl { } pub async fn delete_state(&self, indexer_state: &IndexerState) -> anyhow::Result<()> { + tracing::info!("Deleting state"); + self.redis_client.delete_indexer_state(indexer_state).await } - async fn set_state( + pub async fn set_state( &self, indexer_config: &IndexerConfig, state: IndexerState, @@ -101,58 +146,6 @@ impl IndexerStateManagerImpl { Ok(()) } - pub async fn set_deprovisioning( - &self, - indexer_state: &IndexerState, - task_id: String, - ) -> anyhow::Result<()> { - let mut state = indexer_state.clone(); - - state.provisioned_state = ProvisionedState::Deprovisioning { task_id }; - - self.redis_client - .set(state.get_state_key(), serde_json::to_string(&state)?) - .await?; - - Ok(()) - } - - pub async fn set_provisioning( - &self, - indexer_config: &IndexerConfig, - task_id: String, - ) -> anyhow::Result<()> { - let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.provisioned_state = ProvisionedState::Provisioning { task_id }; - - self.set_state(indexer_config, indexer_state).await?; - - Ok(()) - } - - pub async fn set_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { - let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.provisioned_state = ProvisionedState::Provisioned; - - self.set_state(indexer_config, indexer_state).await?; - - Ok(()) - } - pub async fn set_provisioning_failure( - &self, - indexer_config: &IndexerConfig, - ) -> anyhow::Result<()> { - let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.provisioned_state = ProvisionedState::Failed; - - self.set_state(indexer_config, indexer_state).await?; - - Ok(()) - } - pub async fn set_enabled( &self, indexer_config: &IndexerConfig, @@ -194,7 +187,7 @@ mod tests { let mut mock_redis_client = RedisClient::default(); mock_redis_client .expect_list_indexer_states() - .returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true, "provisioned_state": "Provisioned" }).to_string()])) + .returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Initializing" }).to_string()])) .once(); mock_redis_client .expect_list_indexer_states() @@ -229,7 +222,7 @@ mod tests { .with(predicate::eq(indexer_config.clone())) .returning(|_| { Ok(Some( - serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true, "provisioned_state": "Provisioned" }) + serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true, "lifecycle_state": "Initializing" }) .to_string(), )) }); @@ -237,7 +230,7 @@ mod tests { .expect_set_indexer_state::() .with( predicate::always(), - predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false,\"provisioned_state\":\"Provisioned\"}".to_string()), + predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false,\"lifecycle_state\":\"Initializing\"}".to_string()), ) .returning(|_, _| Ok(())) .once(); diff --git a/coordinator/src/lifecycle.rs b/coordinator/src/lifecycle.rs new file mode 100644 index 000000000..70c9f7b38 --- /dev/null +++ b/coordinator/src/lifecycle.rs @@ -0,0 +1,343 @@ +use tracing::{info, warn}; + +use crate::handlers::block_streams::BlockStreamsHandler; +use crate::handlers::data_layer::DataLayerHandler; +use crate::handlers::executors::ExecutorsHandler; +use crate::indexer_config::IndexerConfig; +use crate::indexer_state::{IndexerState, IndexerStateManager}; +use crate::redis::{KeyProvider, RedisClient}; +use crate::registry::Registry; + +const LOOP_THROTTLE_MS: u64 = 1000; + +/// Represents the different lifecycle states of an Indexer +#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub enum LifecycleState { + /// Pre-requisite resources, i.e. Data Layer are being created. + /// + /// Transitions: + /// - `Running` on success + /// - `Repairing` on Data Layer provisioning failure + #[default] + Initializing, + /// Indexer is functional, Block Stream and Executors are continouously monitored to ensure + /// they are running the latest version of the Indexer. + /// + /// Transitions: + /// - `Stopping` if suspended + /// - `Running` if Block Stream or Executor fails to synchronise, essentially triggering a + /// retry + /// - `Running` on success + Running, + /// Indexer is being stopped, Block Stream and Executors are being stopped. + /// + /// Transitions: + /// - `Stopping` on failure, triggering a retry + /// - `Stopped` on success + Stopping, + /// Indexer is stopped, Block Stream and Executors are not running. + /// + /// Transitions: + /// - `Running` if unsuspended + Stopped, + /// Indexer is in a bad state, currently requires manual intervention, but should eventually + /// self heal. This is a dead-end state + /// + /// Transitions: + /// - `Repairing` continuously + Repairing, // TODO Add `error` to enable reparation + /// Indexer is being deleted, all resources are being cleaned up + /// + /// Transitions: + /// - `Deleting` on failure, triggering a retry + /// - `Deleted` on success + Deleting, + /// Indexer is deleted, all resources are cleaned up, lifecycle manager will exit + Deleted, +} + +pub struct LifecycleManager<'a> { + initial_config: IndexerConfig, + block_streams_handler: &'a BlockStreamsHandler, + executors_handler: &'a ExecutorsHandler, + data_layer_handler: &'a DataLayerHandler, + registry: &'a Registry, + state_manager: &'a IndexerStateManager, + redis_client: &'a RedisClient, +} + +impl<'a> LifecycleManager<'a> { + pub fn new( + initial_config: IndexerConfig, + block_streams_handler: &'a BlockStreamsHandler, + executors_handler: &'a ExecutorsHandler, + data_layer_handler: &'a DataLayerHandler, + registry: &'a Registry, + state_manager: &'a IndexerStateManager, + redis_client: &'a RedisClient, + ) -> Self { + Self { + initial_config, + block_streams_handler, + executors_handler, + data_layer_handler, + registry, + state_manager, + redis_client, + } + } + + #[tracing::instrument(name = "initializing", skip_all)] + async fn handle_initializing( + &self, + config: Option<&IndexerConfig>, + _state: &IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + let config = config.unwrap(); + + if self + .data_layer_handler + .ensure_provisioned(config) + .await + .is_err() + { + return LifecycleState::Repairing; + } + + LifecycleState::Running + } + + #[tracing::instrument(name = "running", skip_all)] + async fn handle_running( + &self, + config: Option<&IndexerConfig>, + state: &mut IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + let config = config.unwrap(); + + if !state.enabled { + return LifecycleState::Stopping; + } + + if let Err(error) = self + .block_streams_handler + .synchronise_block_stream(config, state.block_stream_synced_at) + .await + { + warn!(?error, "Failed to synchronise block stream, retrying..."); + + return LifecycleState::Running; + } + + state.block_stream_synced_at = Some(config.get_registry_version()); + + if let Err(error) = self.executors_handler.synchronise_executor(config).await { + warn!(?error, "Failed to synchronise executor, retrying..."); + + return LifecycleState::Running; + } + + LifecycleState::Running + } + + #[tracing::instrument(name = "stopping", skip_all)] + async fn handle_stopping(&self, config: Option<&IndexerConfig>) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + let config = config.unwrap(); + + if let Err(error) = self + .block_streams_handler + .stop_if_needed(config.account_id.clone(), config.function_name.clone()) + .await + { + warn!(?error, "Failed to stop block stream, retrying..."); + return LifecycleState::Stopping; + } + + if let Err(error) = self + .executors_handler + .stop_if_needed(config.account_id.clone(), config.function_name.clone()) + .await + { + warn!(?error, "Failed to stop executor, retrying..."); + return LifecycleState::Stopping; + } + + LifecycleState::Stopped + } + + #[tracing::instrument(name = "stopped", skip_all)] + async fn handle_stopped( + &self, + config: Option<&IndexerConfig>, + state: &IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + // TODO Transistion to `Running` on config update + + if state.enabled { + return LifecycleState::Running; + } + + LifecycleState::Stopped + } + + #[tracing::instrument(name = "repairing", skip_all)] + async fn handle_repairing( + &self, + config: Option<&IndexerConfig>, + _state: &IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + // TODO Add more robust error handling, for now just stop + LifecycleState::Repairing + } + + #[tracing::instrument(name = "deleting", skip_all)] + async fn handle_deleting(&self, state: &IndexerState) -> LifecycleState { + if let Err(error) = self + .block_streams_handler + .stop_if_needed(state.account_id.clone(), state.function_name.clone()) + .await + { + warn!(?error, "Failed to stop block stream"); + } + + if let Err(error) = self + .executors_handler + .stop_if_needed(state.account_id.clone(), state.function_name.clone()) + .await + { + warn!(?error, "Failed to stop executor"); + } + + if self.state_manager.delete_state(state).await.is_err() { + // Retry + return LifecycleState::Deleting; + } + + info!("Clearing block stream"); + + if self + .redis_client + .del(state.get_redis_stream_key()) + .await + .is_err() + { + // Retry + return LifecycleState::Deleting; + } + + if self + .data_layer_handler + .ensure_deprovisioned(state.account_id.clone(), state.function_name.clone()) + .await + .is_err() + { + return LifecycleState::Deleted; + } + + LifecycleState::Deleted + } + + #[tracing::instrument( + name = "lifecycle_manager", + skip(self), + fields( + account_id = self.initial_config.account_id.as_str(), + function_name = self.initial_config.function_name.as_str() + ) + )] + pub async fn run(&self) { + let mut first_iteration = true; + + loop { + tokio::time::sleep(std::time::Duration::from_millis(LOOP_THROTTLE_MS)).await; + + let config = match self + .registry + .fetch_indexer( + &self.initial_config.account_id, + &self.initial_config.function_name, + ) + .await + { + Ok(config) => config, + Err(error) => { + warn!(?error, "Failed to fetch config"); + continue; + } + }; + + let mut state = match self.state_manager.get_state(&self.initial_config).await { + Ok(state) => state, + Err(error) => { + warn!(?error, "Failed to get state"); + continue; + } + }; + + if first_iteration { + info!("Initial lifecycle state: {:?}", state.lifecycle_state,); + first_iteration = false; + } + + let desired_lifecycle_state = match state.lifecycle_state { + LifecycleState::Initializing => { + self.handle_initializing(config.as_ref(), &state).await + } + LifecycleState::Running => self.handle_running(config.as_ref(), &mut state).await, + LifecycleState::Stopping => self.handle_stopping(config.as_ref()).await, + LifecycleState::Stopped => self.handle_stopped(config.as_ref(), &state).await, + LifecycleState::Repairing => self.handle_repairing(config.as_ref(), &state).await, + LifecycleState::Deleting => self.handle_deleting(&state).await, + LifecycleState::Deleted => LifecycleState::Deleted, + }; + + if desired_lifecycle_state != state.lifecycle_state { + info!( + "Transitioning lifecycle state: {:?} -> {:?}", + state.lifecycle_state, desired_lifecycle_state, + ); + } + + if desired_lifecycle_state == LifecycleState::Deleted { + break; + } + + state.lifecycle_state = desired_lifecycle_state; + + loop { + match self + .state_manager + .set_state(&self.initial_config, state.clone()) + .await + { + Ok(_) => break, + Err(e) => { + warn!("Failed to set state: {:?}. Retrying...", e); + + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + } + } + } + } + } +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 6d55aa780..fef71dd32 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,27 +1,29 @@ +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use near_primitives::types::AccountId; +use tokio::task::JoinHandle; use tracing_subscriber::prelude::*; use crate::handlers::block_streams::BlockStreamsHandler; use crate::handlers::data_layer::DataLayerHandler; use crate::handlers::executors::ExecutorsHandler; use crate::indexer_state::IndexerStateManager; +use crate::lifecycle::LifecycleManager; use crate::redis::RedisClient; use crate::registry::Registry; -use crate::synchroniser::Synchroniser; mod handlers; mod indexer_config; mod indexer_state; +mod lifecycle; mod redis; mod registry; mod server; -mod synchroniser; mod utils; -const CONTROL_LOOP_THROTTLE_SECONDS: Duration = Duration::from_secs(1); +const LOOP_THROTTLE_SECONDS: Duration = Duration::from_secs(1); async fn sleep(duration: Duration) -> anyhow::Result<()> { tokio::time::sleep(duration).await; @@ -58,18 +60,11 @@ async fn main() -> anyhow::Result<()> { let registry = Arc::new(Registry::connect(registry_contract_id.clone(), &rpc_url)); let redis_client = RedisClient::connect(&redis_url).await?; - let block_streams_handler = BlockStreamsHandler::connect(&block_streamer_url)?; + let block_streams_handler = + BlockStreamsHandler::connect(&block_streamer_url, redis_client.clone())?; let executors_handler = ExecutorsHandler::connect(&runner_url)?; let data_layer_handler = DataLayerHandler::connect(&runner_url)?; let indexer_state_manager = Arc::new(IndexerStateManager::new(redis_client.clone())); - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &indexer_state_manager, - &redis_client, - ); tokio::spawn({ let indexer_state_manager = indexer_state_manager.clone(); @@ -77,7 +72,62 @@ async fn main() -> anyhow::Result<()> { async move { server::init(grpc_port, indexer_state_manager, registry).await } }); + indexer_state_manager.migrate().await?; + + let mut lifecycle_tasks = HashMap::>::new(); + loop { - tokio::try_join!(synchroniser.sync(), sleep(CONTROL_LOOP_THROTTLE_SECONDS))?; + let indexer_registry = registry.fetch().await?; + + for config in indexer_registry.iter() { + if lifecycle_tasks.contains_key(&config.get_full_name()) { + continue; + } + + tracing::info!( + account_id = config.account_id.as_str(), + function_name = config.function_name.as_str(), + "Starting lifecycle manager" + ); + + let handle = tokio::spawn({ + let indexer_state_manager = indexer_state_manager.clone(); + let config = config.clone(); + let registry = registry.clone(); + let redis_client = redis_client.clone(); + let block_streams_handler = block_streams_handler.clone(); + let data_layer_handler = data_layer_handler.clone(); + let executors_handler = executors_handler.clone(); + + async move { + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &indexer_state_manager, + &redis_client, + ); + + lifecycle_manager.run().await + } + }); + + lifecycle_tasks.insert(config.get_full_name(), handle); + } + + let finished_tasks: Vec = lifecycle_tasks + .iter() + .filter_map(|(name, task)| task.is_finished().then_some(name.clone())) + .collect(); + + for indexer_name in finished_tasks { + tracing::info!(indexer_name, "Lifecycle has finished, removing..."); + + lifecycle_tasks.remove(&indexer_name); + } + + sleep(LOOP_THROTTLE_SECONDS).await?; } } diff --git a/coordinator/src/registry.rs b/coordinator/src/registry.rs index d3cb87397..de891ffa3 100644 --- a/coordinator/src/registry.rs +++ b/coordinator/src/registry.rs @@ -169,9 +169,9 @@ impl RegistryImpl { pub async fn fetch_indexer( &self, - account_id: AccountId, - function_name: String, - ) -> anyhow::Result { + account_id: &AccountId, + function_name: &str, + ) -> anyhow::Result> { let response = self .json_rpc_client .call(RpcQueryRequest { @@ -194,10 +194,10 @@ impl RegistryImpl { .context("Failed to fetch indexer")?; if let QueryResponseKind::CallResult(call_result) = response.kind { - let indexer: registry_types::IndexerConfig = - serde_json::from_slice(&call_result.result)?; - - return Ok(IndexerConfig { + let indexer = serde_json::from_slice::>( + &call_result.result, + )? + .map(|indexer| IndexerConfig { account_id: account_id.clone(), function_name: function_name.to_string(), code: indexer.code, @@ -207,6 +207,8 @@ impl RegistryImpl { updated_at_block_height: indexer.updated_at_block_height, created_at_block_height: indexer.created_at_block_height, }); + + return Ok(indexer); } anyhow::bail!("Invalid registry response") diff --git a/coordinator/src/server/indexer_manager_service.rs b/coordinator/src/server/indexer_manager_service.rs index 36a91ca31..809fd8970 100644 --- a/coordinator/src/server/indexer_manager_service.rs +++ b/coordinator/src/server/indexer_manager_service.rs @@ -42,9 +42,10 @@ impl indexer_manager::indexer_manager_server::IndexerManager for IndexerManagerS let indexer_config = self .registry - .fetch_indexer(account_id, request.function_name) + .fetch_indexer(&account_id, &request.function_name) .await - .map_err(|_| Status::not_found("Indexer not found"))?; + .map_err(|_| Status::internal("Failed to fetch indexer"))? + .ok_or(Status::not_found("Indexer not found"))?; self.indexer_state_manager .set_enabled(&indexer_config, true) @@ -78,9 +79,10 @@ impl indexer_manager::indexer_manager_server::IndexerManager for IndexerManagerS let indexer_config = self .registry - .fetch_indexer(account_id, request.function_name) + .fetch_indexer(&account_id, &request.function_name) .await - .map_err(|_| Status::not_found("Indexer not found"))?; + .map_err(|_| Status::internal("Failed to fetch indexer"))? + .ok_or(Status::not_found("Indexer not found"))?; self.indexer_state_manager .set_enabled(&indexer_config, false) diff --git a/coordinator/src/synchroniser.rs b/coordinator/src/synchroniser.rs index bf362bc5d..64a5ce073 100644 --- a/coordinator/src/synchroniser.rs +++ b/coordinator/src/synchroniser.rs @@ -427,956 +427,3 @@ impl<'a> Synchroniser<'a> { Ok(()) } } - -#[cfg(test)] -mod test { - use super::*; - - use mockall::predicate::*; - use std::collections::HashMap; - - use crate::registry::IndexerRegistry; - - #[tokio::test] - async fn generates_sync_states() { - let existing_account_ids = vec![ - "account1.near".to_string(), - "account2.near".to_string(), - "account3.near".to_string(), - "account4.near".to_string(), - ]; - let new_account_ids = vec![ - "new_account1.near".to_string(), - "new_account2.near".to_string(), - ]; - let deleted_account_ids = vec![ - "deleted_account1.near".to_string(), - "deleted_account2.near".to_string(), - ]; - - let mut existing_indexer_configs: Vec = Vec::new(); - for (i, account_id) in existing_account_ids.iter().enumerate() { - for j in 1..=5 { - existing_indexer_configs.push(IndexerConfig { - account_id: account_id.parse().unwrap(), - function_name: format!("existing_indexer{}_{}", i + 1, j), - ..Default::default() - }); - } - } - - let mut new_indexer_configs: Vec = Vec::new(); - for (i, account_id) in new_account_ids.iter().enumerate() { - for j in 1..=3 { - new_indexer_configs.push(IndexerConfig { - account_id: account_id.parse().unwrap(), - function_name: format!("new_indexer{}_{}", i + 1, j), - ..Default::default() - }); - } - } - - let mut deleted_indexer_configs: Vec = Vec::new(); - for (i, account_id) in deleted_account_ids.iter().enumerate() { - for j in 1..=2 { - deleted_indexer_configs.push(IndexerConfig { - account_id: account_id.parse().unwrap(), - function_name: format!("deleted_indexer{}_{}", i + 1, j), - ..Default::default() - }); - } - } - - let mut indexer_registry = IndexerRegistry::new(); - for indexer in existing_indexer_configs - .iter() - .chain(new_indexer_configs.iter()) - { - indexer_registry - .entry(indexer.account_id.clone()) - .or_default() - .insert(indexer.function_name.clone(), indexer.clone()); - } - - let mut block_streams_handler = BlockStreamsHandler::default(); - let block_streams: Vec = existing_indexer_configs - .iter() - // generate some "randomness" - .rev() - .enumerate() - .map(|(i, indexer)| StreamInfo { - stream_id: format!("stream_id{}", i + 1), - account_id: indexer.account_id.to_string(), - function_name: indexer.function_name.clone(), - version: indexer.get_registry_version(), - }) - .collect(); - block_streams_handler - .expect_list() - .returning(move || Ok(block_streams.clone())); - - let mut executors_handler = ExecutorsHandler::default(); - let executors: Vec = existing_indexer_configs - .iter() - // generate some "randomness" - .rev() - .enumerate() - .map(|(i, indexer)| ExecutorInfo { - executor_id: format!("executor_id{}", i + 1), - account_id: indexer.account_id.to_string(), - function_name: indexer.function_name.clone(), - version: indexer.get_registry_version(), - status: "running".to_string(), - }) - .collect(); - - executors_handler - .expect_list() - .returning(move || Ok(executors.clone())); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - let states: Vec = existing_indexer_configs - .iter() - .map(|indexer| IndexerState { - account_id: indexer.account_id.clone(), - function_name: indexer.function_name.clone(), - block_stream_synced_at: Some(indexer.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }) - .chain(deleted_indexer_configs.iter().map(|indexer| IndexerState { - account_id: indexer.account_id.clone(), - function_name: indexer.function_name.clone(), - block_stream_synced_at: Some(indexer.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - })) - .collect(); - state_manager - .expect_list() - .returning(move || Ok(states.clone())); - - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - let synchronisation_states = synchroniser - .generate_synchronisation_states() - .await - .unwrap(); - - let mut new_count = 0; - let mut existing_count = 0; - let mut deleted_count = 0; - - for state in &synchronisation_states { - match state { - SynchronisationState::New(_) => new_count += 1, - SynchronisationState::Existing(_, _, executor, block_stream) => { - assert!(executor.is_some(), "Executor should exist for the indexer"); - assert!( - block_stream.is_some(), - "Block stream should exist for the indexer" - ); - existing_count += 1; - } - SynchronisationState::Deleted(_, _, _) => { - deleted_count += 1; - } - } - } - - assert_eq!(new_count, 6); - assert_eq!(existing_count, 20); - assert_eq!(deleted_count, 4); - } - - mod new { - use super::*; - - #[tokio::test] - async fn triggers_data_layer_provisioning() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler.expect_list().returning(|| Ok(vec![])); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| Ok(vec![])); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager.expect_list().returning(|| Ok(vec![])); - state_manager - .expect_set_provisioning() - .with(eq(config.clone()), eq("task_id".to_string())) - .returning(|_, _| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_start_provisioning_task() - .with(eq(config)) - .returning(|_| Ok("task_id".to_string())) - .once(); - - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser.sync().await.unwrap(); - } - } - - mod existing { - use super::*; - - #[tokio::test] - async fn waits_for_provisioning_to_complete() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let task_id = "task_id".to_string(); - - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioning { - task_id: task_id.clone().to_string(), - }, - }; - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_provisioned() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_get_task_status() - .with(eq(task_id)) - .returning(|_| Ok(TaskStatus::Complete)); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler.expect_start().never(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_start().never(); - - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_indexer(&config, &state, None, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_failed_provisioning() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioning { - task_id: "task_id".to_string(), - }, - }; - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_provisioning_failure() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_get_task_status() - .with(eq("task_id".to_string())) - .returning(|_| Ok(TaskStatus::Failed)); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler.expect_start().never(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_start().never(); - - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_indexer(&config, &state, None, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_synced() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let mut block_streams_handler = BlockStreamsHandler::default(); - let config_clone = config.clone(); - block_streams_handler.expect_list().returning(move || { - Ok(vec![StreamInfo { - stream_id: config_clone.get_redis_stream_key(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version(), - }]) - }); - block_streams_handler.expect_stop().never(); - block_streams_handler.expect_start().never(); - - let mut executors_handler = ExecutorsHandler::default(); - let config_clone = config.clone(); - executors_handler.expect_list().returning(move || { - Ok(vec![ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version(), - status: "running".to_string(), - }]) - }); - executors_handler.expect_stop().never(); - executors_handler.expect_start().never(); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_synced() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - state_manager.expect_list().returning(move || { - Ok(vec![IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }]) - }); - - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser.sync().await.unwrap(); - } - - #[tokio::test] - async fn restarts_outdated() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let mut block_streams_handler = BlockStreamsHandler::default(); - let config_clone = config.clone(); - block_streams_handler.expect_list().returning(move || { - Ok(vec![StreamInfo { - stream_id: "stream_id".to_string(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version() + 1, - }]) - }); - block_streams_handler - .expect_stop() - .with(eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with(eq(100), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let mut executors_handler = ExecutorsHandler::default(); - let config_clone = config.clone(); - executors_handler.expect_list().returning(move || { - Ok(vec![ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version() + 1, - status: "running".to_string(), - }]) - }); - executors_handler - .expect_stop() - .with(eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - executors_handler - .expect_start() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_synced() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - state_manager.expect_list().returning(move || { - Ok(vec![IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }]) - }); - - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser.sync().await.unwrap(); - } - - #[tokio::test] - async fn treats_unsynced_blocks_streams_as_new() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: None, - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with(eq(100), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let redis_client = RedisClient::default(); - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_block_stream(&config, &state, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn restarts_stopped_and_outdated_block_stream() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version() - 1), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with(eq(100), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_block_stream(&config, &state, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn resumes_stopped_and_synced_block_stream() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }; - - let last_published_block = 1; - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream::() - .never(); - redis_client - .expect_get_last_published_block() - .with(eq(config.clone())) - .returning(move |_| Ok(Some(last_published_block))); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with(eq(last_published_block + 1), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_block_stream(&config, &state, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn reconfigures_block_stream() { - let config_with_latest = IndexerConfig { - start_block: StartBlock::Latest, - ..IndexerConfig::default() - }; - let height = 5; - let config_with_height = IndexerConfig { - start_block: StartBlock::Height(height), - ..IndexerConfig::default() - }; - let last_published_block = 1; - let config_with_continue = IndexerConfig { - start_block: StartBlock::Continue, - ..IndexerConfig::default() - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with( - eq(last_published_block + 1), - eq(config_with_continue.clone()), - ) - .returning(|_, _| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with( - eq(config_with_latest.get_registry_version()), - eq(config_with_latest.clone()), - ) - .returning(|_, _| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with(eq(height), eq(config_with_height.clone())) - .returning(|_, _| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(eq(config_with_latest.clone())) - .returning(|_| Ok(())) - .once(); - redis_client - .expect_clear_block_stream() - .with(eq(config_with_height.clone())) - .returning(|_| Ok(())) - .once(); - redis_client - .expect_get_last_published_block() - .with(eq(config_with_continue.clone())) - .returning(move |_| Ok(Some(last_published_block))); - - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .reconfigure_block_stream(&config_with_latest) - .await - .unwrap(); - synchroniser - .reconfigure_block_stream(&config_with_height) - .await - .unwrap(); - synchroniser - .reconfigure_block_stream(&config_with_continue) - .await - .unwrap(); - } - - #[tokio::test] - async fn stops_disabled_indexers() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Provisioned, - }; - let executor = ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - status: "running".to_string(), - }; - let block_stream = StreamInfo { - stream_id: "stream_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_stop() - .with(eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler - .expect_stop() - .with(eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_synced() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .never(); - - let registry = Registry::default(); - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_indexer(&config, &state, Some(&executor), Some(&block_stream)) - .await - .unwrap(); - // Simulate second run, start/stop etc should not be called - synchroniser - .sync_existing_indexer(&config, &state, None, None) - .await - .unwrap(); - } - } - - mod deleted { - use super::*; - - #[tokio::test] - async fn stops_block_stream_and_executor() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Deprovisioning { - task_id: "task_id".to_string(), - }, - }; - let executor = ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - status: "running".to_string(), - }; - let block_stream = StreamInfo { - stream_id: "stream_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_stop() - .with(eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler - .expect_stop() - .with(eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut state_manager = IndexerStateManager::default(); - state_manager.expect_delete_state().never(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_get_task_status() - .with(eq("task_id".to_string())) - .returning(|_| Ok(TaskStatus::Pending)); - - let registry = Registry::default(); - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_deleted_indexer(&state, Some(&executor), Some(&block_stream)) - .await - .unwrap(); - } - - #[tokio::test] - async fn cleans_indexer_resources() { - let config = IndexerConfig::default(); - let provisioned_state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Provisioned, - }; - let deprovisioning_state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Deprovisioning { - task_id: "task_id".to_string(), - }, - }; - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_deprovisioning() - .with(eq(provisioned_state.clone()), eq("task_id".to_string())) - .returning(|_, _| Ok(())); - state_manager - .expect_delete_state() - .with(eq(deprovisioning_state.clone())) - .returning(|_| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_start_deprovisioning_task() - .with( - eq(config.clone().account_id), - eq(config.clone().function_name), - ) - .returning(|_, _| Ok("task_id".to_string())); - data_layer_handler - .expect_get_task_status() - .with(eq("task_id".to_string())) - .returning(|_| Ok(TaskStatus::Complete)); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_del::() - .with(eq(config.get_redis_stream_key())) - .returning(|_| Ok(())) - .once(); - - let registry = Registry::default(); - let block_streams_handler = BlockStreamsHandler::default(); - let executors_handler = ExecutorsHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_deleted_indexer(&provisioned_state, None, None) - .await - .unwrap(); - synchroniser - .sync_deleted_indexer(&deprovisioning_state, None, None) - .await - .unwrap(); - } - } -} diff --git a/runner-client/proto/runner.proto b/runner-client/proto/runner.proto index 82c457f7d..51045f8c5 100644 --- a/runner-client/proto/runner.proto +++ b/runner-client/proto/runner.proto @@ -10,6 +10,15 @@ service Runner { // Lists all Runner executor rpc ListExecutors (ListExecutorsRequest) returns (ListExecutorsResponse); + + // Get Executor info + rpc GetExecutor (GetExecutorRequest) returns (ExecutorInfo); +} + +// Get Executor request +message GetExecutorRequest { + string account_id = 1; + string function_name = 2; } // Start Executor Request diff --git a/runner/protos/runner.proto b/runner/protos/runner.proto index 82c457f7d..51045f8c5 100644 --- a/runner/protos/runner.proto +++ b/runner/protos/runner.proto @@ -10,6 +10,15 @@ service Runner { // Lists all Runner executor rpc ListExecutors (ListExecutorsRequest) returns (ListExecutorsResponse); + + // Get Executor info + rpc GetExecutor (GetExecutorRequest) returns (ExecutorInfo); +} + +// Get Executor request +message GetExecutorRequest { + string account_id = 1; + string function_name = 2; } // Start Executor Request diff --git a/runner/src/server/services/runner/runner-service.test.ts b/runner/src/server/services/runner/runner-service.test.ts index bfbf9ca8d..f7a9c6dc1 100644 --- a/runner/src/server/services/runner/runner-service.test.ts +++ b/runner/src/server/services/runner/runner-service.test.ts @@ -32,6 +32,59 @@ describe('Runner gRPC Service', () => { genericIndexerConfig = new IndexerConfig(BASIC_REDIS_STREAM, BASIC_ACCOUNT_ID, BASIC_FUNCTION_NAME, BASIC_VERSION, BASIC_CODE, BASIC_SCHEMA, LogLevel.INFO); }); + it('get non existant executor', async () => { + const streamHandlerType = jest.fn().mockImplementation((indexerConfig) => { + return { + indexerConfig, + executorContext: BASIC_EXECUTOR_CONTEXT + }; + }); + const service = getRunnerService(new Map(), streamHandlerType); + + await new Promise((resolve) => { + service.GetExecutor({ request: { accountId: BASIC_ACCOUNT_ID, functionName: BASIC_FUNCTION_NAME } } as any, (err) => { + expect(err).toEqual({ + code: grpc.status.NOT_FOUND, + message: `Executor for account ${BASIC_ACCOUNT_ID} and name ${BASIC_FUNCTION_NAME} does not exist` + }); + resolve(null); + }); + }); + }); + + it('gets an existing executor', async () => { + const streamHandlerType = jest.fn().mockImplementation((indexerConfig) => { + return { + indexerConfig, + executorContext: BASIC_EXECUTOR_CONTEXT + }; + }); + const service = getRunnerService(new Map(), streamHandlerType); + const request = generateRequest(BASIC_REDIS_STREAM + '-A', BASIC_ACCOUNT_ID, BASIC_FUNCTION_NAME, BASIC_CODE, BASIC_SCHEMA, BASIC_VERSION); + + await new Promise((resolve, reject) => { + service.StartExecutor(request, (err) => { + if (err) reject(err); + resolve(null); + }); + }); + + await new Promise((resolve, reject) => { + service.GetExecutor({ request: { accountId: BASIC_ACCOUNT_ID, functionName: BASIC_FUNCTION_NAME } } as any, (err, response) => { + if (err) reject(err); + + expect(response).toEqual({ + executorId: BASIC_EXECUTOR_ID, + accountId: genericIndexerConfig.accountId, + functionName: genericIndexerConfig.functionName, + status: IndexerStatus.RUNNING, + version: '1' + }); + resolve(null); + }); + }); + }); + it('starts a executor with correct settings', () => { const service = getRunnerService(new Map(), genericStreamHandlerType); const mockCallback = jest.fn() as unknown as any; diff --git a/runner/src/server/services/runner/runner-service.ts b/runner/src/server/services/runner/runner-service.ts index 8584d185b..0d135574f 100644 --- a/runner/src/server/services/runner/runner-service.ts +++ b/runner/src/server/services/runner/runner-service.ts @@ -8,6 +8,7 @@ import parentLogger from '../../../logger'; import { type RunnerHandlers } from '../../../generated/runner/Runner'; import { type StartExecutorResponse__Output, type StartExecutorResponse } from '../../../generated/runner/StartExecutorResponse'; import { type StartExecutorRequest__Output } from '../../../generated/runner/StartExecutorRequest'; +import { type GetExecutorRequest__Output } from '../../../generated/runner/GetExecutorRequest'; import { type StopExecutorRequest__Output } from '../../../generated/runner/StopExecutorRequest'; import { type StopExecutorResponse__Output, type StopExecutorResponse } from '../../../generated/runner/StopExecutorResponse'; import { type ListExecutorsRequest__Output } from '../../../generated/runner/ListExecutorsRequest'; @@ -19,6 +20,28 @@ export function getRunnerService ( StreamHandlerType: typeof StreamHandler = StreamHandler ): RunnerHandlers { const RunnerService: RunnerHandlers = { + GetExecutor (call: ServerUnaryCall, callback: sendUnaryData): void { + const { accountId, functionName } = call.request; + + const executorEntry = Array.from(executors.entries()).find(([_id, executor]) => executor.indexerConfig.accountId === accountId && executor.indexerConfig.functionName === functionName); + + if (executorEntry) { + const [executorId, executor] = executorEntry; + callback(null, { + executorId, + accountId: executor.indexerConfig.accountId, + functionName: executor.indexerConfig.functionName, + version: executor.indexerConfig.version.toString(), + status: executor.executorContext.status + }); + } else { + const notFoundError = { + code: grpc.status.NOT_FOUND, + message: `Executor for account ${accountId} and name ${functionName} does not exist` + }; + callback(notFoundError, null); + } + }, StartExecutor (call: ServerUnaryCall, callback: sendUnaryData): void { // Validate request const validationResult = validateStartExecutorRequest(call.request); From 29bde3c68769f864f109fedab6878e05db2287a7 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 18 Jul 2024 15:11:46 +1200 Subject: [PATCH 2/6] feat: Expose health info from Block Stream/Executor RPC (#889) This PR exposes a `health` field on the Block Stream/Executor info, which can be accessed via RPC. The intent of this field is for Coordinator to monitor it, and then act accordingly. I wanted to raise this work first, so that the overall result is not too large. Essentially, `health` contains only a single `enum` describing the "state" of the process, but this can be expanded over time as needed. --- block-streamer/proto/block_streamer.proto | 22 +++ block-streamer/src/block_stream.rs | 170 ++++++++++++++++-- block-streamer/src/redis.rs | 22 +++ .../src/server/block_streamer_service.rs | 52 +++++- runner-client/proto/runner.proto | 21 ++- runner/examples/list-executors.ts | 4 +- runner/examples/start-executor.ts | 6 +- runner/examples/stop-executor.ts | 6 +- runner/protos/runner.proto | 21 ++- .../services/runner/runner-service.test.ts | 16 +- .../server/services/runner/runner-service.ts | 8 +- runner/src/stream-handler/stream-handler.ts | 23 ++- runner/src/stream-handler/worker.ts | 8 +- 13 files changed, 330 insertions(+), 49 deletions(-) diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index 92d3a2c90..fd24b7972 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -108,4 +108,26 @@ message StreamInfo { string function_name = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; + // Contains health information for the Block Stream + Health health = 6; +} + +// Contains health information for the Block Stream +message Health { + // The processing state of the block stream + ProcessingState processing_state = 1; + // When the health info was last updated + uint64 updated_at_timestamp_secs = 2; +} + +enum ProcessingState { + UNSPECIFIED = 0; + // Not started, or has been stopped + IDLE = 1; + // Running as expected + RUNNING = 2; + // Waiting for some internal condition to be met before continuing + WAITING = 3; + // Stopped due to some unknown error + STALLED = 4; } diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 04cd824a6..bf19038b5 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -1,7 +1,9 @@ +use std::cmp::Ordering; use std::future::Future; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::Poll; +use std::time::SystemTime; use anyhow::Context; use futures::StreamExt; @@ -52,16 +54,43 @@ impl Future for PollCounter { } pub struct Task { - handle: JoinHandle>, + stream_handle: JoinHandle>, + monitor_handle: JoinHandle<()>, cancellation_token: tokio_util::sync::CancellationToken, } +/// Represents the processing state of a block stream +#[derive(Clone)] +pub enum ProcessingState { + /// Block Stream is not currently active but can be started. Either has not been started or was + /// stopped. + Idle, + + /// Block Stream is actively processing blocks. + Running, + + /// Block Stream has been intentionally/internally paused due to some condition, i.e. back pressure on + /// the Redis Stream. + Waiting, + + /// Block Stream has stalled due to an error or other condition. Must be manually + /// restarted. + Stalled, +} + +#[derive(Clone)] +pub struct BlockStreamHealth { + pub processing_state: ProcessingState, + pub last_updated: SystemTime, +} + pub struct BlockStream { task: Option, pub indexer_config: IndexerConfig, pub chain_id: ChainId, pub version: u64, pub redis_stream: String, + health: Arc>, } impl BlockStream { @@ -77,23 +106,107 @@ impl BlockStream { chain_id, version, redis_stream, + health: Arc::new(Mutex::new(BlockStreamHealth { + processing_state: ProcessingState::Idle, + last_updated: SystemTime::now(), + })), } } - pub fn start( - &mut self, + pub fn health(&self) -> anyhow::Result { + match self.health.lock() { + Ok(health) => Ok(health.clone()), + Err(e) => Err(anyhow::anyhow!("Failed to acquire health lock: {:?}", e)), + } + } + + fn start_health_monitoring_task(&self, redis: Arc) -> JoinHandle<()> { + tokio::spawn({ + let config = self.indexer_config.clone(); + let health = self.health.clone(); + let redis_stream = self.redis_stream.clone(); + + async move { + let mut last_processed_block = + redis.get_last_processed_block(&config).await.unwrap(); + + loop { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + let new_last_processed_block = + if let Ok(block) = redis.get_last_processed_block(&config).await { + block + } else { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Failed to fetch last processed block" + ); + continue; + }; + + let stream_size = if let Ok(stream_size) = + redis.get_stream_length(redis_stream.clone()).await + { + stream_size + } else { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Failed to fetch stream size" + ); + continue; + }; + + let mut health_lock = if let Ok(health) = health.lock() { + health + } else { + tracing::warn!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Failed to acquire health lock" + ); + continue; + }; + + match new_last_processed_block.cmp(&last_processed_block) { + Ordering::Less => { + tracing::error!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + "Last processed block should not decrease" + ); + + health_lock.processing_state = ProcessingState::Stalled; + } + Ordering::Equal if stream_size >= Some(MAX_STREAM_SIZE) => { + health_lock.processing_state = ProcessingState::Waiting; + } + Ordering::Equal => { + health_lock.processing_state = ProcessingState::Stalled; + } + Ordering::Greater => { + health_lock.processing_state = ProcessingState::Running; + } + }; + + health_lock.last_updated = SystemTime::now(); + + last_processed_block = new_last_processed_block; + } + } + }) + } + + fn start_block_stream_task( + &self, start_block_height: near_indexer_primitives::types::BlockHeight, redis: Arc, reciever_blocks_processor: Arc, lake_s3_client: SharedLakeS3Client, - ) -> anyhow::Result<()> { - if self.task.is_some() { - return Err(anyhow::anyhow!("BlockStreamer has already been started",)); - } - - let cancellation_token = tokio_util::sync::CancellationToken::new(); - - let handle = tokio::spawn({ + cancellation_token: tokio_util::sync::CancellationToken, + ) -> JoinHandle> { + tokio::spawn({ let cancellation_token = cancellation_token.clone(); let indexer_config = self.indexer_config.clone(); let chain_id = self.chain_id.clone(); @@ -137,10 +250,35 @@ impl BlockStream { } } } - }); + }) + } + + pub fn start( + &mut self, + start_block_height: near_indexer_primitives::types::BlockHeight, + redis: Arc, + reciever_blocks_processor: Arc, + lake_s3_client: SharedLakeS3Client, + ) -> anyhow::Result<()> { + if self.task.is_some() { + return Err(anyhow::anyhow!("BlockStreamer has already been started",)); + } + + let cancellation_token = tokio_util::sync::CancellationToken::new(); + + let monitor_handle = self.start_health_monitoring_task(redis.clone()); + + let stream_handle = self.start_block_stream_task( + start_block_height, + redis, + reciever_blocks_processor, + lake_s3_client, + cancellation_token.clone(), + ); self.task = Some(Task { - handle, + stream_handle, + monitor_handle, cancellation_token, }); @@ -149,8 +287,9 @@ impl BlockStream { pub async fn cancel(&mut self) -> anyhow::Result<()> { if let Some(task) = self.task.take() { + task.monitor_handle.abort(); task.cancellation_token.cancel(); - let _ = task.handle.await?; + let _ = task.stream_handle.await?; // Fails if metric doesn't exist, i.e. task was never polled let _ = metrics::BLOCK_STREAM_UP @@ -167,6 +306,7 @@ impl BlockStream { #[allow(clippy::too_many_arguments)] #[tracing::instrument( + name = "block_stream" skip_all, fields( account_id = indexer.account_id.as_str(), diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index 959b5ddc9..1a84d34c6 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -28,6 +28,18 @@ impl RedisCommandsImpl { Ok(Self { connection }) } + pub async fn get(&self, key: T) -> Result, RedisError> + where + T: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("GET: {:?}", key); + + redis::cmd("GET") + .arg(key) + .query_async(&mut self.connection.clone()) + .await + } + pub async fn xadd(&self, stream_key: T, fields: &[(String, U)]) -> Result<(), RedisError> where T: ToRedisArgs + Debug + Send + Sync + 'static, @@ -133,6 +145,16 @@ impl RedisClientImpl { .context("Failed to set last processed block") } + pub async fn get_last_processed_block( + &self, + indexer_config: &IndexerConfig, + ) -> anyhow::Result> { + self.commands + .get(indexer_config.last_processed_block_key()) + .await + .context("Failed to set last processed block") + } + pub async fn get_stream_length(&self, stream: String) -> anyhow::Result> { self.commands.xlen(stream).await } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 88d426b9c..4dd3f01ae 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::Mutex; +use std::time::SystemTime; use near_lake_framework::near_indexer_primitives; use tonic::{Request, Response, Status}; @@ -21,6 +22,30 @@ pub struct BlockStreamerService { block_streams: Mutex>, } +impl From for blockstreamer::Health { + fn from(health: block_stream::BlockStreamHealth) -> Self { + blockstreamer::Health { + processing_state: match health.processing_state { + block_stream::ProcessingState::Running => { + blockstreamer::ProcessingState::Running as i32 + } + block_stream::ProcessingState::Idle => blockstreamer::ProcessingState::Idle as i32, + block_stream::ProcessingState::Stalled => { + blockstreamer::ProcessingState::Stalled as i32 + } + block_stream::ProcessingState::Waiting => { + blockstreamer::ProcessingState::Waiting as i32 + } + }, + updated_at_timestamp_secs: health + .last_updated + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + } + } +} + impl BlockStreamerService { pub fn new( redis: std::sync::Arc, @@ -77,11 +102,17 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic }); if let Some((stream_id, stream)) = stream_entry { + let stream_health = stream.health().map_err(|err| { + tracing::error!(?err, "Failed to get health of block stream"); + Status::internal("Failed to get health of block stream") + })?; + Ok(Response::new(StreamInfo { stream_id: stream_id.to_string(), account_id: stream.indexer_config.account_id.to_string(), function_name: stream.indexer_config.function_name.to_string(), version: stream.version, + health: Some(stream_health.into()), })) } else { Err(Status::not_found(format!( @@ -210,11 +241,22 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic let block_streams: Vec = lock .values() - .map(|block_stream| StreamInfo { - stream_id: block_stream.indexer_config.get_hash_id(), - account_id: block_stream.indexer_config.account_id.to_string(), - function_name: block_stream.indexer_config.function_name.clone(), - version: block_stream.version, + .map(|block_stream| { + let stream_health = block_stream + .health() + .map_err(|err| { + tracing::error!(?err, "Failed to get health of block stream"); + Status::internal("Failed to get health of block stream") + }) + .ok(); + + StreamInfo { + stream_id: block_stream.indexer_config.get_hash_id(), + account_id: block_stream.indexer_config.account_id.to_string(), + function_name: block_stream.indexer_config.function_name.to_string(), + version: block_stream.version, + health: stream_health.map(|health| health.into()), + } }) .collect(); diff --git a/runner-client/proto/runner.proto b/runner-client/proto/runner.proto index 51045f8c5..39801e657 100644 --- a/runner-client/proto/runner.proto +++ b/runner-client/proto/runner.proto @@ -62,7 +62,26 @@ message ExecutorInfo { string executor_id = 1; string account_id = 2; string function_name = 3; - string status = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; + Health health = 6; +} + +// Contains health information for the Executor +message Health { + ExecutionState execution_state = 1; +} + +enum ExecutionState { + UNSPECIFIED = 0; + // Running as expected + RUNNING = 1; + // Executor is running, but the execution is erroring + FAILING = 2; + // Waiting for some internal condition to be met before proceeding + WAITING = 3; + // Intentionally stopped + STOPPED = 4; + // Unintentionally stopped + STALLED = 5; } diff --git a/runner/examples/list-executors.ts b/runner/examples/list-executors.ts index b5200b7e2..396208405 100644 --- a/runner/examples/list-executors.ts +++ b/runner/examples/list-executors.ts @@ -1,13 +1,13 @@ // Run with 'npx ts-node src/test-client.ts' -import runnerClient from '../src/server/runner-client'; +import runnerClient from '../src/server/services/runner/runner-client'; void (async function main () { runnerClient.ListExecutors({}, (err, response) => { if (err) { console.error('List request error: ', err); } else { - console.log('Successful ListExecutors request: ', response); + console.log('list response: ', JSON.stringify({ response }, null, 2)); } }); })(); diff --git a/runner/examples/start-executor.ts b/runner/examples/start-executor.ts index d9466d1e9..9b573daef 100644 --- a/runner/examples/start-executor.ts +++ b/runner/examples/start-executor.ts @@ -1,6 +1,6 @@ // Run with 'npx ts-node src/test-client.ts' -import runnerClient from '../src/server/runner-client'; +import runnerClient from '../src/server/services/runner/runner-client'; const schema = ` CREATE TABLE @@ -13,7 +13,7 @@ CREATE TABLE `; const code = ` -console.log("hello"); +// do nothing `; const indexer = { @@ -35,7 +35,7 @@ void (async function main () { if (err) { console.error('error: ', err); } else { - console.log('start request: ', response); + console.log('start response: ', JSON.stringify({ response }, null, 2)); } }); })(); diff --git a/runner/examples/stop-executor.ts b/runner/examples/stop-executor.ts index 03466c997..ff7eef1bf 100644 --- a/runner/examples/stop-executor.ts +++ b/runner/examples/stop-executor.ts @@ -1,13 +1,13 @@ // Run with 'npx ts-node src/test-client.ts' -import runnerClient from '../src/server/runner-client'; +import runnerClient from '../src/server/services/runner/runner-client'; runnerClient.StopExecutor({ - executorId: 'SOME_EXECUTOR_ID' + executorId: '0293a6b1dcd2259a8be6b59a8cd3e7b4285e540a64a7cbe99639947f7b7e2f9a' }, (err, response) => { if (err) { console.error('error: ', err); } else { - console.log('stop request: ', response); + console.log('stop request: ', JSON.stringify({ response }, null, 2)); } }); diff --git a/runner/protos/runner.proto b/runner/protos/runner.proto index 51045f8c5..39801e657 100644 --- a/runner/protos/runner.proto +++ b/runner/protos/runner.proto @@ -62,7 +62,26 @@ message ExecutorInfo { string executor_id = 1; string account_id = 2; string function_name = 3; - string status = 4; // Block height corresponding to the created/updated height of the indexer uint64 version = 5; + Health health = 6; +} + +// Contains health information for the Executor +message Health { + ExecutionState execution_state = 1; +} + +enum ExecutionState { + UNSPECIFIED = 0; + // Running as expected + RUNNING = 1; + // Executor is running, but the execution is erroring + FAILING = 2; + // Waiting for some internal condition to be met before proceeding + WAITING = 3; + // Intentionally stopped + STOPPED = 4; + // Unintentionally stopped + STALLED = 5; } diff --git a/runner/src/server/services/runner/runner-service.test.ts b/runner/src/server/services/runner/runner-service.test.ts index f7a9c6dc1..57ff6de65 100644 --- a/runner/src/server/services/runner/runner-service.test.ts +++ b/runner/src/server/services/runner/runner-service.test.ts @@ -1,10 +1,10 @@ import * as grpc from '@grpc/grpc-js'; import type StreamHandler from '../../../stream-handler/stream-handler'; -import { IndexerStatus } from '../../../indexer-meta/indexer-meta'; import { LogLevel } from '../../../indexer-meta/log-entry'; import getRunnerService from './runner-service'; import IndexerConfig from '../../../indexer-config/indexer-config'; +import { ExecutionState } from '../../../generated/runner/ExecutionState'; const BASIC_REDIS_STREAM = 'test-redis-stream'; const BASIC_ACCOUNT_ID = 'test-account-id'; @@ -15,7 +15,7 @@ const BASIC_CODE = 'test-code'; const BASIC_SCHEMA = 'test-schema'; const BASIC_VERSION = 1; const BASIC_EXECUTOR_CONTEXT = { - status: IndexerStatus.RUNNING, + executionState: ExecutionState.RUNNING, }; describe('Runner gRPC Service', () => { @@ -77,8 +77,10 @@ describe('Runner gRPC Service', () => { executorId: BASIC_EXECUTOR_ID, accountId: genericIndexerConfig.accountId, functionName: genericIndexerConfig.functionName, - status: IndexerStatus.RUNNING, - version: '1' + version: '1', + health: { + executionState: 'RUNNING' + } }); resolve(null); }); @@ -288,8 +290,10 @@ describe('Runner gRPC Service', () => { executorId: BASIC_EXECUTOR_ID, accountId: genericIndexerConfig.accountId, functionName: genericIndexerConfig.functionName, - status: IndexerStatus.RUNNING, - version: '1' + version: '1', + health: { + executionState: 'RUNNING' + } }] }); resolve(null); diff --git a/runner/src/server/services/runner/runner-service.ts b/runner/src/server/services/runner/runner-service.ts index 0d135574f..3b45856e9 100644 --- a/runner/src/server/services/runner/runner-service.ts +++ b/runner/src/server/services/runner/runner-service.ts @@ -32,7 +32,9 @@ export function getRunnerService ( accountId: executor.indexerConfig.accountId, functionName: executor.indexerConfig.functionName, version: executor.indexerConfig.version.toString(), - status: executor.executorContext.status + health: { + executionState: executor.executorContext.executionState, + } }); } else { const notFoundError = { @@ -150,7 +152,9 @@ export function getRunnerService ( accountId: indexerConfig.accountId, functionName: indexerConfig.functionName, version: indexerConfig.version.toString(), - status: indexerContext.status + health: { + executionState: indexerContext.executionState, + } }); }); callback(null, { diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index c6a07ce6c..f1b24b4fb 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -3,7 +3,6 @@ import { Worker, isMainThread } from 'worker_threads'; import { registerWorkerMetrics, deregisterWorkerMetrics } from '../metrics'; import Indexer from '../indexer'; -import { IndexerStatus } from '../indexer-meta/indexer-meta'; import LogEntry from '../indexer-meta/log-entry'; import logger from '../logger'; @@ -12,7 +11,7 @@ import type IndexerConfig from '../indexer-config'; export enum WorkerMessageType { METRICS = 'METRICS', BLOCK_HEIGHT = 'BLOCK_HEIGHT', - STATUS = 'STATUS', + EXECUTION_STATE = 'STATUS', } export interface WorkerMessage { @@ -20,8 +19,16 @@ export interface WorkerMessage { data: any } +export enum ExecutionState { + RUNNING = 'RUNNING', + FAILING = 'FAILING', + WAITING = 'WAITING', + STOPPED = 'STOPPED', + STALLED = 'STALLED', +} + interface ExecutorContext { - status: IndexerStatus + executionState: ExecutionState block_height: number } @@ -42,7 +49,7 @@ export default class StreamHandler { }, }); this.executorContext = { - status: IndexerStatus.RUNNING, + executionState: ExecutionState.RUNNING, block_height: indexerConfig.version, }; @@ -56,12 +63,14 @@ export default class StreamHandler { async stop (): Promise { deregisterWorkerMetrics(this.worker.threadId); + this.executorContext.executionState = ExecutionState.STOPPED; + await this.worker.terminate(); } private handleError (error: Error): void { this.logger.error('Terminating thread', error); - this.executorContext.status = IndexerStatus.STOPPED; + this.executorContext.executionState = ExecutionState.STALLED; const indexer = new Indexer(this.indexerConfig); indexer.setStoppedStatus().catch((e) => { @@ -82,8 +91,8 @@ export default class StreamHandler { private handleMessage (message: WorkerMessage): void { switch (message.type) { - case WorkerMessageType.STATUS: - this.executorContext.status = message.data.status; + case WorkerMessageType.EXECUTION_STATE: + this.executorContext.executionState = message.data.state; break; case WorkerMessageType.BLOCK_HEIGHT: this.executorContext.block_height = message.data; diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index d9e77184d..fb1a897ce 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -7,9 +7,8 @@ import Indexer from '../indexer'; import RedisClient from '../redis-client'; import { METRICS } from '../metrics'; import LakeClient from '../lake-client'; -import { WorkerMessageType, type WorkerMessage } from './stream-handler'; +import { WorkerMessageType, type WorkerMessage, ExecutionState } from './stream-handler'; import setUpTracerExport from '../instrumentation'; -import { IndexerStatus } from '../indexer-meta/indexer-meta'; import IndexerConfig from '../indexer-config'; import parentLogger from '../logger'; import { wrapSpan } from '../utility'; @@ -119,6 +118,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise metricsSpan.end(); if (workerContext.queue.length === 0) { + parentPort?.postMessage({ type: WorkerMessageType.EXECUTION_STATE, data: { state: ExecutionState.WAITING } }); await sleep(100); continue; } @@ -162,7 +162,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise }); const postRunSpan = tracer.startSpan('Delete redis message and shift queue', {}, context.active()); - parentPort?.postMessage({ type: WorkerMessageType.STATUS, data: { status: IndexerStatus.RUNNING } }); + parentPort?.postMessage({ type: WorkerMessageType.EXECUTION_STATE, data: { state: ExecutionState.RUNNING } }); await workerContext.redisClient.deleteStreamMessage(indexerConfig.redisStreamKey, streamMessageId); await workerContext.queue.shift(); @@ -174,7 +174,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise } catch (err) { METRICS.FAILED_EXECUTIONS.labels({ indexer: indexerConfig.fullName() }).inc(); parentSpan.setAttribute('status', 'failed'); - parentPort?.postMessage({ type: WorkerMessageType.STATUS, data: { status: IndexerStatus.FAILING } }); + parentPort?.postMessage({ type: WorkerMessageType.EXECUTION_STATE, data: { state: ExecutionState.FAILING } }); const error = err as Error; if (previousError !== error.message) { previousError = error.message; From ee5845b0da6a8f22f36f3ee6c8606d7352b7fdb5 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 18 Jul 2024 15:17:05 +1200 Subject: [PATCH 3/6] feat: Restart stalled Block Streams & Executors (#891) Coordinator will now continuously monitor Block Stream and Executor health, and restart them if they are "stalled". The goal here is to avoid the need for manually intervening on stopped processes. Stalled means slightly different things for each: - Block Stream - When it is able to, is not actively processing blocks - Executors - An uncaught error was encountered, causing the thread to exit --- coordinator/src/handlers/block_streams.rs | 33 ++++++++++++++++++++++- coordinator/src/handlers/executors.rs | 28 ++++++++++++++++++- 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index bc34c000b..93878d61f 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -1,12 +1,14 @@ #![cfg_attr(test, allow(dead_code))] +use std::time::{Duration, SystemTime}; + pub use block_streamer::StreamInfo; use anyhow::Context; use block_streamer::block_streamer_client::BlockStreamerClient; use block_streamer::{ start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest, - ListStreamsRequest, StartStreamRequest, Status, StopStreamRequest, + ListStreamsRequest, ProcessingState, StartStreamRequest, Status, StopStreamRequest, }; use near_primitives::types::AccountId; use registry_types::StartBlock; @@ -235,6 +237,34 @@ impl BlockStreamsHandler { Ok(()) } + async fn ensure_healthy( + &self, + config: &IndexerConfig, + block_stream: &StreamInfo, + ) -> anyhow::Result<()> { + if let Some(health) = block_stream.health.as_ref() { + let updated_at = + SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs); + + let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(30); + let stalled = matches!( + health.processing_state.try_into(), + Ok(ProcessingState::Stalled) + ); + + if !stale && !stalled { + return Ok(()); + } + } + + tracing::info!("Restarting stalled block stream"); + + self.stop(block_stream.stream_id.clone()).await?; + self.resume_block_stream(config).await?; + + Ok(()) + } + pub async fn synchronise_block_stream( &self, config: &IndexerConfig, @@ -246,6 +276,7 @@ impl BlockStreamsHandler { if let Some(block_stream) = block_stream { if block_stream.version == config.get_registry_version() { + self.ensure_healthy(config, &block_stream).await?; return Ok(()); } diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 01c9cc4be..4e12ef26f 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -5,7 +5,10 @@ pub use runner::ExecutorInfo; use anyhow::Context; use runner::runner_client::RunnerClient; -use runner::{GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; +use runner::{ + ExecutionState, GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, + StopExecutorRequest, +}; use tonic::transport::channel::Channel; use tonic::Request; @@ -119,6 +122,28 @@ impl ExecutorsHandler { Ok(()) } + async fn ensure_healthy( + &self, + config: &IndexerConfig, + executor: ExecutorInfo, + ) -> anyhow::Result<()> { + if let Some(health) = executor.health { + if !matches!( + health.execution_state.try_into(), + Ok(ExecutionState::Stalled) + ) { + return Ok(()); + } + } + + tracing::info!("Restarting stalled executor"); + + self.stop(executor.executor_id).await?; + self.start(config).await?; + + Ok(()) + } + pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> { let executor = self .get(config.account_id.clone(), config.function_name.clone()) @@ -126,6 +151,7 @@ impl ExecutorsHandler { if let Some(executor) = executor { if executor.version == config.get_registry_version() { + self.ensure_healthy(config, executor).await?; return Ok(()); } From 7dfb2aa968691eefa73d0bf2f00885ef1249fd89 Mon Sep 17 00:00:00 2001 From: Kevin Zhang <42101107+Kevin101Zhang@users.noreply.github.com> Date: Thu, 18 Jul 2024 14:12:09 -0400 Subject: [PATCH 4/6] feat: added loading spinners during RPC call (#894) --- .../widgets/src/QueryApi.IndexerExplorer.jsx | 137 +++++++++++++----- 1 file changed, 99 insertions(+), 38 deletions(-) diff --git a/frontend/widgets/src/QueryApi.IndexerExplorer.jsx b/frontend/widgets/src/QueryApi.IndexerExplorer.jsx index 1af68c70d..550604178 100644 --- a/frontend/widgets/src/QueryApi.IndexerExplorer.jsx +++ b/frontend/widgets/src/QueryApi.IndexerExplorer.jsx @@ -5,8 +5,10 @@ const [myIndexers, setMyIndexers] = useState([]); const [allIndexers, setAllIndexers] = useState([]); const [error, setError] = useState(null); const [indexerMetadata, setIndexerMetaData] = useState(new Map()); +const [loading, setLoading] = useState(false); const fetchIndexerData = () => { + setLoading(true); Near.asyncView(`${REPL_REGISTRY_CONTRACT_ID}`, "list_all").then((data) => { const allIndexers = []; const myIndexers = []; @@ -22,6 +24,7 @@ const fetchIndexerData = () => { }); setMyIndexers(myIndexers); setAllIndexers(allIndexers); + setLoading(false); }); } @@ -61,6 +64,14 @@ useEffect(() => { storeIndexerMetaData(); }, []); +const Container = styled.div` + display: flex; + justify-content: center; + align-items: center; + height: 100%; + width: 100%; +`; + const Wrapper = styled.div` display: flex; flex-direction: column; @@ -232,6 +243,39 @@ const TextLink = styled.a` } `; +const LoadingSpinner = () => { + const spinnerStyle = { + width: '40px', + height: '40px', + border: '4px solid rgba(0, 0, 0, 0.1)', + borderLeftColor: 'black', + borderRadius: '50%', + animation: 'spin 1s linear infinite', + textAlign: 'center', + display: 'flex', + justifyContent: 'center', + alignCenter: 'center', + }; + + const LoadingContainer = styled.div` + text-align: center; + width: 100%; + height: 100%; + `; + + const LoadingSpinnerContainer = styled.div` + display: flex; + justify-content: center; + font-size: 14px; + ` + return + +
+ + <>{selectedTab === "my-indexers" ? "Loading Your Indexers" : "Loading All Indexers"} + ; +}; + return ( @@ -249,47 +293,64 @@ return ( {error && {error}} + {selectedTab === "all" && ( <> - - {allIndexers.map((indexer, i) => ( - - - - ))} - + {loading ? ( + + + + ) : ( + + <> + {allIndexers.map((indexer, i) => ( + + + + ))} + + + )} )} - {selectedTab === "my-indexers" && myIndexers.length === 0 && ( -
-

You don't have any indexers yet.

-

- QueryAPI streamlines the process of querying specific data from the Near Blockchain. Explore new Indexers and fork them to try it out! -

-

- To learn more about QueryAPI, visit - - QueryAPI Docs - -

-
+ + {selectedTab === "my-indexers" && ( + <> + {loading ? ( + + + + ) : myIndexers.length === 0 ? ( +
+

You don't have any indexers yet.

+

+ QueryAPI streamlines the process of querying specific data from the Near Blockchain. Explore new Indexers and fork them to try it out! +

+

+ To learn more about QueryAPI, visit + + QueryAPI Docs + +

+
+ ) : ( + + <> + {myIndexers.map((indexer, i) => ( + + + + ))} + + + )} + )} - - {selectedTab === "my-indexers" && ( - <> - {myIndexers.map((indexer, i) => ( - - - - ))} - - )} - -
+ ); From f1c1757997e7a78c85dddf2a18307901d3269334 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 18 Jul 2024 23:48:32 +0530 Subject: [PATCH 5/6] feat: In-Memory DmlHandler Test Fixture (#809) In order to enable quick testing of indexer code for the purposes of local indexer development, there needs to be a way to functionally mock dependencies that are used by the Indexer code. These dependencies are roughly encapsulated under the context object. In particular context.db is of high importance as it is the method through with Indexers interact with their persistent data. This PR focuses on creating an MVP of an in memory DML Handler Test Fixture which can be used interchangeably with an actual DmlHandler. This allows context.db calls to pretend to be real calls to an actual Postgres DB without actually having an instance running (Which would be hard to integrate into unit testing). This PR is mainly focused on getting a basic DmlHandler test fixture out the door. It has the same functions which roughly behave similarly to if they were called on an actual DB. There is the added benefit of being extremely quick to tear down as all data is represented in memory, making it suitable for fast iteration through unit testing. However, since I am essentially mocking Postgres DB responses, the correctness standard is much lower than using an actual PG DB, but since DmlHandler simplifies user interactions with the DB anyway, we can mock a great deal of Indexer context.db use cases sufficiently to serve as useful for end users. --- frontend/src/utils/pgSchemaTypeGen.js | 1 - runner/src/dml-handler/dml-handler.test.ts | 1 + runner/src/dml-handler/dml-handler.ts | 27 +- .../dml-handler/in-memory-dml-handler.test.ts | 205 ++++++++++ .../src/dml-handler/in-memory-dml-handler.ts | 358 ++++++++++++++++++ runner/src/indexer/indexer.ts | 4 +- 6 files changed, 585 insertions(+), 11 deletions(-) create mode 100644 runner/src/dml-handler/in-memory-dml-handler.test.ts create mode 100644 runner/src/dml-handler/in-memory-dml-handler.ts diff --git a/frontend/src/utils/pgSchemaTypeGen.js b/frontend/src/utils/pgSchemaTypeGen.js index 767a85be8..a53df0005 100644 --- a/frontend/src/utils/pgSchemaTypeGen.js +++ b/frontend/src/utils/pgSchemaTypeGen.js @@ -104,7 +104,6 @@ export class PgSchemaTypeGen { ) { this.addColumn(columnSpec, columns); } else if ( - Object.prototype.hasOwnProperty.call(columnSpec, 'constraint') && columnSpec.constraint_type === 'primary key' ) { for (const foreignKeyDef of columnSpec.definition) { diff --git a/runner/src/dml-handler/dml-handler.test.ts b/runner/src/dml-handler/dml-handler.test.ts index 2cc52842a..4b7c2ef19 100644 --- a/runner/src/dml-handler/dml-handler.test.ts +++ b/runner/src/dml-handler/dml-handler.test.ts @@ -27,6 +27,7 @@ describe('DML Handler tests', () => { format: pgFormat } as unknown as PgClient; TABLE_DEFINITION_NAMES = { + tableName: 'test_table', originalTableName: '"test_table"', originalColumnNames: new Map([ ['account_id', 'account_id'], diff --git a/runner/src/dml-handler/dml-handler.ts b/runner/src/dml-handler/dml-handler.ts index a76163a27..7a481f3e9 100644 --- a/runner/src/dml-handler/dml-handler.ts +++ b/runner/src/dml-handler/dml-handler.ts @@ -6,10 +6,19 @@ import type IndexerConfig from '../indexer-config/indexer-config'; import { type Tracer, trace, type Span } from '@opentelemetry/api'; import { type QueryResult } from 'pg'; -type WhereClauseMulti = Record)>; -type WhereClauseSingle = Record; - -export default class DmlHandler { +export type PostgresRowValue = string | number | any; +export type PostgresRow = Record; +export type WhereClauseMulti = Record; +export type WhereClauseSingle = Record; + +export interface IDmlHandler { + insert: (tableDefinitionNames: TableDefinitionNames, rowsToInsert: PostgresRow[]) => Promise + select: (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti, limit: number | null) => Promise + update: (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseSingle, updateObject: any) => Promise + upsert: (tableDefinitionNames: TableDefinitionNames, rowsToUpsert: PostgresRow[], conflictColumns: string[], updateColumns: string[]) => Promise + delete: (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti) => Promise +} +export default class DmlHandler implements IDmlHandler { validTableNameRegex = /^[a-zA-Z_][a-zA-Z0-9_]*$/; pgClient: PgClient; tracer: Tracer; @@ -53,7 +62,7 @@ export default class DmlHandler { return { queryVars, whereClause }; } - async insert (tableDefinitionNames: TableDefinitionNames, rowsToInsert: any[]): Promise { + async insert (tableDefinitionNames: TableDefinitionNames, rowsToInsert: PostgresRow[]): Promise { if (!rowsToInsert?.length) { return []; } @@ -67,7 +76,7 @@ export default class DmlHandler { return result.rows; } - async select (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti, limit: number | null = null): Promise { + async select (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti, limit: number | null = null): Promise { const { queryVars, whereClause } = this.getWhereClause(whereObject, tableDefinitionNames.originalColumnNames); let query = `SELECT * FROM ${this.indexerConfig.schemaName()}.${tableDefinitionNames.originalTableName} WHERE ${whereClause}`; if (limit !== null) { @@ -78,7 +87,7 @@ export default class DmlHandler { return result.rows; } - async update (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseSingle, updateObject: any): Promise { + async update (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseSingle, updateObject: any): Promise { const updateKeys = Object.keys(updateObject).map((col) => tableDefinitionNames.originalColumnNames.get(col) ?? col); const updateParam = Array.from({ length: updateKeys.length }, (_, index) => `${updateKeys[index]}=$${index + 1}`).join(', '); const whereKeys = Object.keys(whereObject).map((col) => tableDefinitionNames.originalColumnNames.get(col) ?? col); @@ -91,7 +100,7 @@ export default class DmlHandler { return result.rows; } - async upsert (tableDefinitionNames: TableDefinitionNames, rowsToUpsert: any[], conflictColumns: string[], updateColumns: string[]): Promise { + async upsert (tableDefinitionNames: TableDefinitionNames, rowsToUpsert: PostgresRow[], conflictColumns: string[], updateColumns: string[]): Promise { if (!rowsToUpsert?.length) { return []; } @@ -108,7 +117,7 @@ export default class DmlHandler { return result.rows; } - async delete (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti): Promise { + async delete (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti): Promise { const { queryVars, whereClause } = this.getWhereClause(whereObject, tableDefinitionNames.originalColumnNames); const query = `DELETE FROM ${this.indexerConfig.schemaName()}.${tableDefinitionNames.originalTableName} WHERE ${whereClause} RETURNING *`; diff --git a/runner/src/dml-handler/in-memory-dml-handler.test.ts b/runner/src/dml-handler/in-memory-dml-handler.test.ts new file mode 100644 index 000000000..6fbc20c79 --- /dev/null +++ b/runner/src/dml-handler/in-memory-dml-handler.test.ts @@ -0,0 +1,205 @@ +import { type TableDefinitionNames } from '../indexer'; +import InMemoryDmlHandler from './in-memory-dml-handler'; + +const DEFAULT_ITEM_1_WITHOUT_ID = { + account_id: 'TEST_NEAR', + block_height: 1, + content: 'CONTENT', + accounts_liked: [], +}; + +const DEFAULT_ITEM_1_WITH_ID = { + id: 1, + account_id: 'TEST_NEAR', + block_height: 1, + content: 'CONTENT', + accounts_liked: [], +}; + +const DEFAULT_ITEM_2_WITHOUT_ID = { + account_id: 'TEST_NEAR', + block_height: 2, + content: 'CONTENT', + accounts_liked: [], +}; + +const DEFAULT_ITEM_2_WITH_ID = { + id: 2, + account_id: 'TEST_NEAR', + block_height: 2, + content: 'CONTENT', + accounts_liked: [], +}; + +describe('DML Handler Fixture Tests', () => { + const SIMPLE_SCHEMA = `CREATE TABLE + "posts" ( + "id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0) NOT NULL, + "content" TEXT NOT NULL, + "accounts_liked" JSONB NOT NULL DEFAULT '[]', + CONSTRAINT "posts_pkey" PRIMARY KEY ("id", "account_id") + );`; + const TABLE_DEFINITION_NAMES: TableDefinitionNames = { + tableName: 'posts', + originalTableName: '"posts"', + originalColumnNames: new Map([]) + }; + + let dmlHandler: InMemoryDmlHandler; + + beforeEach(() => { + dmlHandler = new InMemoryDmlHandler(SIMPLE_SCHEMA); + }); + + test('select rows', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID, DEFAULT_ITEM_2_WITHOUT_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + + const selectSingleValue = await dmlHandler.select(TABLE_DEFINITION_NAMES, { id: 1 }); + expect(selectSingleValue[0].id).toEqual(1); + + const selectMultipleValues = await dmlHandler.select(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR', block_height: [1, 2] }); + expect(selectMultipleValues[0].account_id).toEqual('TEST_NEAR'); + expect(selectMultipleValues[1].account_id).toEqual('TEST_NEAR'); + expect(selectMultipleValues[0].block_height).toEqual(1); + expect(selectMultipleValues[1].block_height).toEqual(2); + + expect(await dmlHandler.select(TABLE_DEFINITION_NAMES, { account_id: 'unknown_near' })).toEqual([]); + }); + + test('insert two rows with serial column', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID, DEFAULT_ITEM_2_WITHOUT_ID]; + + const correctResult = [DEFAULT_ITEM_1_WITH_ID, DEFAULT_ITEM_2_WITH_ID]; + + const result = await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + expect(result).toEqual(correctResult); + }); + + test('reject insert after specifying serial column value', async () => { + const inputObjWithSerial = [DEFAULT_ITEM_1_WITH_ID]; + const inputObj = [DEFAULT_ITEM_2_WITHOUT_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObjWithSerial); + await expect(dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj)).rejects.toThrow('Cannot insert row twice into the same table'); + }); + + test('reject insert after not specifying primary key value', async () => { + const inputObj = [{ + block_height: 1, + content: 'CONTENT', + accounts_liked: [], + }]; + + await expect(dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj)).rejects.toThrow('Inserted row must specify value for primary key columns'); + }); + + test('update rows', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID, DEFAULT_ITEM_2_WITHOUT_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + + const updateOne = await dmlHandler.update(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR', block_height: 2 }, { content: 'UPDATED_CONTENT' }); + const selectOneUpdate = await dmlHandler.select(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR', block_height: 2 }); + expect(updateOne).toEqual(selectOneUpdate); + + const updateAll = await dmlHandler.update(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR' }, { content: 'final content' }); + const selectAllUpdated = await dmlHandler.select(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR' }); + expect(updateAll).toEqual(selectAllUpdated); + }); + + test('update criteria matches nothing', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID, DEFAULT_ITEM_2_WITHOUT_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + + const updateNone = await dmlHandler.update(TABLE_DEFINITION_NAMES, { account_id: 'none_near' }, { content: 'UPDATED_CONTENT' }); + const selectUpdated = await dmlHandler.select(TABLE_DEFINITION_NAMES, { content: 'UPDATED_CONTENT' }); + expect(updateNone).toEqual([]); + expect(selectUpdated).toEqual([]); + }); + + test('upsert rows', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + + const upsertObj = [{ + account_id: 'TEST_NEAR', + block_height: 1, + content: 'UPSERT', + accounts_liked: [], + }, + { + account_id: 'TEST_NEAR', + block_height: 2, + content: 'UPSERT', + accounts_liked: [], + }]; + + const upserts = await dmlHandler.upsert(TABLE_DEFINITION_NAMES, upsertObj, ['account_id', 'block_height'], ['content']); + + const selectAll = await dmlHandler.select(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR' }); + expect(upserts).toEqual(selectAll); + }); + + test('upsert rows with non unique conflcit columns', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID, DEFAULT_ITEM_2_WITHOUT_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + + const upsertObj = [{ + account_id: 'TEST_NEAR', + block_height: 1, + content: 'UPSERT', + accounts_liked: [], + }, + { + account_id: 'TEST_NEAR', + block_height: 2, + content: 'UPSERT', + accounts_liked: [], + }]; + + await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, upsertObj, ['account_id'], ['content'])).rejects.toThrow('Conflict update criteria cannot affect row twice'); + }); + + test('reject upsert due to duplicate row', async () => { + const inputObj = [DEFAULT_ITEM_1_WITH_ID, DEFAULT_ITEM_1_WITH_ID]; + + await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, inputObj, ['id', 'account_id'], ['content'])).rejects.toThrow('Conflict update criteria cannot affect row twice'); + }); + + test('reject upsert after specifying serial column value', async () => { + const inputObjWithSerial = [DEFAULT_ITEM_1_WITH_ID]; + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID]; + + await dmlHandler.upsert(TABLE_DEFINITION_NAMES, inputObjWithSerial, ['id', 'account_id'], ['content']); + await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, inputObj, ['id', 'account_id'], ['content'])).rejects.toThrow('Cannot insert row twice into the same table'); + }); + + test('reject insert after not specifying primary key value', async () => { + const inputObj = [{ + block_height: 1, + content: 'CONTENT', + accounts_liked: [], + }]; + + await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, inputObj, ['id', 'account_id'], ['content'])).rejects.toThrow('Inserted row must specify value for primary key columns'); + }); + + test('delete rows', async () => { + const inputObj = [DEFAULT_ITEM_1_WITHOUT_ID, DEFAULT_ITEM_2_WITHOUT_ID]; + + const correctResponse = [DEFAULT_ITEM_1_WITH_ID, DEFAULT_ITEM_2_WITH_ID]; + + await dmlHandler.insert(TABLE_DEFINITION_NAMES, inputObj); + + const deletedRows = await dmlHandler.delete(TABLE_DEFINITION_NAMES, { account_id: 'TEST_NEAR' }); + + expect(deletedRows).toEqual(correctResponse); + }); +}); diff --git a/runner/src/dml-handler/in-memory-dml-handler.ts b/runner/src/dml-handler/in-memory-dml-handler.ts new file mode 100644 index 000000000..7a8783ef4 --- /dev/null +++ b/runner/src/dml-handler/in-memory-dml-handler.ts @@ -0,0 +1,358 @@ +import { type AST, Parser } from 'node-sql-parser'; +import { type TableDefinitionNames } from '../indexer'; +import { type PostgresRow, type WhereClauseMulti, type WhereClauseSingle, type IDmlHandler } from './dml-handler'; + +// TODO: Define class to represent specification +interface TableSpecification { + tableName: string + columnNames: string[] + primaryKeyColumns: string[] + serialColumns: string[] +} + +class PostgresRowEntity { + data: PostgresRow; + private readonly primaryKeys: string[]; + + constructor (data: PostgresRow, primaryKeys: string[]) { + this.data = data; + this.primaryKeys = primaryKeys.sort(); + + // TODO: Verify value of primary key as well (if primary key is NOT NULL) + if (!primaryKeys.every(primaryKey => { + return primaryKey in data; + })) { + throw new Error('Inserted row must specify value for primary key columns'); + } + } + + public primaryKey (): string { + return JSON.stringify( + this.primaryKeys.reduce>((acc, key) => { + acc[key] = this.data[key]; + return acc; + }, {}) + ); + } + + public isEqualRow (row: PostgresRow): boolean { + return this.primaryKeys.every(primaryKey => { + return row[primaryKey] === this.data[primaryKey]; + }); + } + + public isEqualEntity (entity: PostgresRowEntity): boolean { + return this.primaryKey() === entity.primaryKey(); + } + + public isEqualCriteria (criteria: WhereClauseMulti): boolean { + return Object.keys(criteria).every(attribute => { + const toMatchValue = criteria[attribute]; + if (Array.isArray(toMatchValue)) { + return toMatchValue.includes(this.data[attribute]); + } + return toMatchValue === this.data[attribute]; + }); + } + + public update (updateObject: PostgresRow): void { + Object.keys(updateObject).forEach(updateKey => { + this.data[updateKey] = updateObject[updateKey]; + }); + } +} + +class TableData { + specification: TableSpecification; + data: PostgresRowEntity[]; + serialCounter: Map; + + constructor (tableSpec: TableSpecification) { + this.specification = tableSpec; + this.data = []; + this.serialCounter = new Map(); + } + + public getEntitiesByCriteria (criteria: WhereClauseMulti, limit: number | null): PostgresRowEntity[] { + const matchedRows: PostgresRowEntity[] = []; + this.data.forEach(row => { + if (row.isEqualCriteria(criteria)) { + if (!limit || (limit && matchedRows.length < limit)) { + matchedRows.push(row); + } + } + }); + return matchedRows; + } + + private getSerialValue (columnName: string): number { + const serialCounterKey = `${this.specification.tableName}-${columnName}`; + const counterValue: number = this.serialCounter.get(serialCounterKey) ?? 1; + this.serialCounter.set(serialCounterKey, counterValue + 1); + return counterValue; + } + + private fillSerialValues (row: PostgresRow): void { + for (const serialColumnName of this.specification.serialColumns) { + if (row[serialColumnName] === undefined) { + row[serialColumnName] = this.getSerialValue(serialColumnName); + } + } + } + + private createEntityFromRow (row: PostgresRow): PostgresRowEntity { + // TODO: Fill default values + // TODO: Assert non null values + this.fillSerialValues(row); + return new PostgresRowEntity(row, this.specification.primaryKeyColumns); + } + + public rowIsUnique (otherRow: PostgresRow): boolean { + return this.data.every(entity => { + return !entity.isEqualRow(otherRow); + }); + } + + private entityIsUnique (otherEntity: PostgresRowEntity): boolean { + return this.data.every(entity => { + return !entity.isEqualEntity(otherEntity); + }); + } + + public insertRow (row: PostgresRow): PostgresRowEntity { + const entity: PostgresRowEntity = this.createEntityFromRow(row); + if (!this.entityIsUnique(entity)) { + throw new Error(`Cannot insert row twice into the same table: ${JSON.stringify(entity.data)}`); + } + + this.data.push(entity); + return entity; + } + + public insertEntity (entity: PostgresRowEntity): PostgresRowEntity { + if (!this.entityIsUnique(entity)) { + throw new Error(`Cannot insert row twice into the same table: ${JSON.stringify(entity.data)}`); + } + + this.data.push(entity); + return entity; + } + + public removeEntitiesByCriteria (criteria: WhereClauseMulti): PostgresRowEntity[] { + const remainingRows: PostgresRowEntity[] = []; + const matchedRows: PostgresRowEntity[] = []; + this.data.forEach(entity => { + if (entity.isEqualCriteria(criteria)) { + matchedRows.push(entity); + } else { + remainingRows.push(entity); + } + }); + this.data = remainingRows; + return matchedRows; + } +} + +class IndexerData { + private readonly tables: Map; + + constructor (schema: AST[]) { + this.tables = this.initializeTables(schema); + } + + private initializeTables (schemaAST: AST[]): Map { + const tables = new Map(); + for (const statement of schemaAST) { + if (statement.type === 'create' && statement.keyword === 'table') { + const tableSpec = this.createTableSpecification(statement); + tables.set(tableSpec.tableName, new TableData(tableSpec)); + } + } + + return tables; + } + + private createTableSpecification (createTableStatement: any): TableSpecification { + // TODO: Track foreign key columns and manage them during inserts/updates + const tableName = createTableStatement.table[0].table; + const columnNames: string[] = []; + const primaryKeyColumns: string[] = []; + const serialColumns: string[] = []; + + for (const columnDefinition of createTableStatement.create_definitions ?? []) { + if (columnDefinition.column) { + const columnName = this.getColumnName(columnDefinition); + columnNames.push(columnName); + + const dataType = columnDefinition.definition.dataType as string; + if (dataType.toUpperCase().includes('SERIAL')) { + serialColumns + .push(columnName); + } + + if (columnDefinition.primary_key) { + primaryKeyColumns.push(columnName); + } + } else if (columnDefinition.constraint_type === 'primary key') { + for (const primaryKey of columnDefinition.definition) { + primaryKeyColumns.push(primaryKey.column.expr.value); + } + } + } + const tableSpec: TableSpecification = { + tableName, + columnNames, + primaryKeyColumns, + serialColumns, + }; + + return tableSpec; + } + + private getColumnName (columnDefinition: any): string { + if (columnDefinition.column?.type === 'column_ref') { + return columnDefinition.column.column.expr.value; + } + return ''; + } + + private selectColumnsFromRow (row: PostgresRow, columnsToSelect: string[]): PostgresRow { + return columnsToSelect.reduce((newRow, columnName) => { + newRow[columnName] = columnName in row ? row[columnName] : undefined; + return newRow; + }, {}); + } + + private getTableData (tableName: string): TableData { + const tableData = this.tables.get(tableName); + if (!tableData) { + throw new Error(`Invalid table name provided: ${tableName}`); + } + + return tableData; + } + + private copyRow (row: PostgresRow): PostgresRow { + return JSON.parse(JSON.stringify(row)); + } + + private copyDataFromEntities (entities: PostgresRowEntity[]): PostgresRow[] { + const copiedRowData: PostgresRow[] = []; + for (const entity of entities) { + const copiedRow = this.copyRow(entity.data); + copiedRowData.push(copiedRow); + } + + return copiedRowData; + } + + public select (tableName: string, criteria: WhereClauseMulti, limit: number | null): PostgresRow[] { + const tableData = this.getTableData(tableName); + const matchedRows = tableData.getEntitiesByCriteria(criteria, limit); + + return this.copyDataFromEntities(matchedRows); + } + + public insert (tableName: string, rowsToInsert: PostgresRow[]): PostgresRow[] { + // TODO: Check types of columns + // TODO: Verify columns are correctly named, and have any required values + // TODO: Verify inserts are unique before actual insertion + const tableData = this.getTableData(tableName); + const insertedRows: PostgresRowEntity[] = []; + + for (const row of rowsToInsert) { + const rowCopy = this.copyRow(row); + if (!tableData.rowIsUnique(rowCopy)) { + throw new Error(`Cannot insert row twice into the same table: ${JSON.stringify(rowCopy)}`); + } + insertedRows.push(tableData.insertRow(rowCopy)); + } + + return this.copyDataFromEntities(insertedRows); + } + + public update (tableName: string, criteria: WhereClauseSingle, updateObject: PostgresRow): PostgresRow[] { + // TODO: Validate criteria passed in has valid column names + const tableData = this.getTableData(tableName); + const updatedRows: PostgresRowEntity[] = []; + + const matchedRows = tableData.removeEntitiesByCriteria(criteria); + for (const rowEntity of matchedRows) { + rowEntity.update(updateObject); + updatedRows.push(tableData.insertEntity(rowEntity)); + } + + return this.copyDataFromEntities(updatedRows); + } + + public upsert (tableName: string, rowsToUpsert: PostgresRow[], conflictColumns: string[], updateColumns: string[]): PostgresRow[] { + // TODO: Verify conflictColumns is a superset of primary key set (For uniqueness constraint) + const tableData = this.getTableData(tableName); + const upsertedRows: PostgresRowEntity[] = []; + + for (const row of rowsToUpsert) { + const rowCopy = this.copyRow(row); + const updateCriteriaObject = this.selectColumnsFromRow(rowCopy, conflictColumns); + const rowsMatchingUpdate = tableData.removeEntitiesByCriteria(updateCriteriaObject); + + if (rowsMatchingUpdate.length > 1) { + throw new Error('Conflict update criteria cannot affect row twice'); + } else if (rowsMatchingUpdate.length === 1) { + const matchedEntity = rowsMatchingUpdate[0]; + if (upsertedRows.some(upsertedEntity => upsertedEntity.isEqualEntity(matchedEntity))) { + throw new Error('Conflict update criteria cannot affect row twice'); + } + + const updateObject = this.selectColumnsFromRow(rowCopy, updateColumns); + matchedEntity.update(updateObject); + upsertedRows.push(tableData.insertEntity(matchedEntity)); + } else { + upsertedRows.push(tableData.insertRow(rowCopy)); + } + } + + return this.copyDataFromEntities(upsertedRows); + } + + public delete (tableName: string, deleteCriteria: WhereClauseMulti): PostgresRow[] { + const tableData = this.getTableData(tableName); + const deletedRows = tableData.removeEntitiesByCriteria(deleteCriteria); + + return this.copyDataFromEntities(deletedRows); + } +} + +export default class InMemoryDmlHandler implements IDmlHandler { + private readonly indexerData: IndexerData; + + constructor (schema: string) { + const parser = new Parser(); + let schemaAST = parser.astify(schema, { database: 'Postgresql' }); + schemaAST = Array.isArray(schemaAST) ? schemaAST : [schemaAST]; // Ensure iterable + this.indexerData = new IndexerData(schemaAST); + } + + public async insert (tableDefinitionNames: TableDefinitionNames, rowsToInsert: PostgresRow[]): Promise { + if (!rowsToInsert?.length) { + return []; + } + + return this.indexerData.insert(tableDefinitionNames.tableName, rowsToInsert); + } + + public async select (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti, limit: number | null = null): Promise { + return this.indexerData.select(tableDefinitionNames.tableName, whereObject, limit); + } + + public async update (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseSingle, updateObject: any): Promise { + return this.indexerData.update(tableDefinitionNames.tableName, whereObject, updateObject); + } + + public async upsert (tableDefinitionNames: TableDefinitionNames, rowsToUpsert: PostgresRow[], conflictColumns: string[], updateColumns: string[]): Promise { + return this.indexerData.upsert(tableDefinitionNames.tableName, rowsToUpsert, conflictColumns, updateColumns); + } + + public async delete (tableDefinitionNames: TableDefinitionNames, whereObject: WhereClauseMulti): Promise { + return this.indexerData.delete(tableDefinitionNames.tableName, whereObject); + } +} diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 3d6ad3a9a..1fd154ef5 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -34,6 +34,7 @@ interface Context { } export interface TableDefinitionNames { + tableName: string originalTableName: string originalColumnNames: Map } @@ -234,6 +235,7 @@ export default class Indexer { for (const columnDef of createDefs) { if (columnDef.column?.type === 'column_ref') { const tableDefinitionNames: TableDefinitionNames = { + tableName, originalTableName: this.retainOriginalQuoting(schema, tableName), originalColumnNames: this.getColumnDefinitionNames(createDefs) }; @@ -401,7 +403,7 @@ export default class Indexer { if (logError) { const message: string = errors ? errors.map((e: any) => e.message).join(', ') : `HTTP ${response.status} error writing with graphql to indexer storage`; const mutation: string = - `mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){ + `mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){ insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) { id } From 0177c261506991f03014011e2236818c6b40e91a Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Fri, 19 Jul 2024 07:10:41 +1200 Subject: [PATCH 6/6] fix: Avoid restarting block streams which have just started (#895) On Block Streamer startup, all Block Streams are started in one big herd. Some of which can take a while to start processing. These end up getting marked as "Stalled", and are therefore restarted. This PR increases the monitoring scrape interval, so that Block Streams have a longer window to prove they are "Processing", and therefore do not get restarted. Also bumped the "stale" check in Coordinator, so they are less likely to get marked as Stale. --- block-streamer/src/block_stream.rs | 2 +- coordinator/src/handlers/block_streams.rs | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index bf19038b5..eff2a0d1e 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -131,7 +131,7 @@ impl BlockStream { redis.get_last_processed_block(&config).await.unwrap(); loop { - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(15)).await; let new_last_processed_block = if let Ok(block) = redis.get_last_processed_block(&config).await { diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index 93878d61f..9f71149b1 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -246,7 +246,7 @@ impl BlockStreamsHandler { let updated_at = SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs); - let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(30); + let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(60); let stalled = matches!( health.processing_state.try_into(), Ok(ProcessingState::Stalled) @@ -254,13 +254,17 @@ impl BlockStreamsHandler { if !stale && !stalled { return Ok(()); + } else { + tracing::info!(stale, stalled, "Restarting stalled block stream"); } + } else { + tracing::info!("Restarting stalled block stream"); } - tracing::info!("Restarting stalled block stream"); - self.stop(block_stream.stream_id.clone()).await?; - self.resume_block_stream(config).await?; + + let height = self.get_continuation_block_height(config).await?; + self.start(height, config).await?; Ok(()) }