diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index a605f30c4..c5114bdb3 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -4,7 +4,7 @@ use tokio::task::JoinHandle; use crate::indexer_config::IndexerConfig; use crate::rules::types::ChainId; -use registry_types::MatchingRule; +use registry_types::Rule; /// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but /// we need this configurable for testing purposes. @@ -135,8 +135,8 @@ pub(crate) async fn start_block_stream( .last_indexed_block .parse::()?; - let blocks_from_index = match &indexer.indexer_rule.matching_rule { - MatchingRule::ActionAny { + let blocks_from_index = match &indexer.rule { + Rule::ActionAny { affected_account_id, .. } => { @@ -151,11 +151,11 @@ pub(crate) async fn start_block_stream( .list_matching_block_heights(start_block_height, affected_account_id) .await } - MatchingRule::ActionFunctionCall { .. } => { + Rule::ActionFunctionCall { .. } => { tracing::error!("ActionFunctionCall matching rule not yet supported for delta lake processing, function: {:?} {:?}", indexer.account_id, indexer.function_name); Ok(vec![]) } - MatchingRule::Event { .. } => { + Rule::Event { .. } => { tracing::error!("Event matching rule not yet supported for delta lake processing, function {:?} {:?}", indexer.account_id, indexer.function_name); Ok(vec![]) } @@ -222,7 +222,7 @@ pub(crate) async fn start_block_stream( .context("Failed to set last_published_block")?; let matches = crate::rules::reduce_indexer_rule_matches( - &indexer.indexer_rule, + &indexer.rule, &streamer_message, chain_id.clone(), ); @@ -294,14 +294,9 @@ mod tests { ) .unwrap(), function_name: "test".to_string(), - indexer_rule: registry_types::OldIndexerRule { - indexer_rule_kind: registry_types::IndexerRuleKind::Action, - matching_rule: registry_types::MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: registry_types::Status::Success, - }, - name: None, - id: None, + rule: registry_types::Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: registry_types::Status::Success, }, }; diff --git a/block-streamer/src/indexer_config.rs b/block-streamer/src/indexer_config.rs index 435c4aa3b..f56eb5c54 100644 --- a/block-streamer/src/indexer_config.rs +++ b/block-streamer/src/indexer_config.rs @@ -2,17 +2,13 @@ use near_lake_framework::near_indexer_primitives::types::AccountId; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; -use registry_types::OldIndexerRule as IndexerRule; +use registry_types::Rule; #[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] pub struct IndexerConfig { pub account_id: AccountId, pub function_name: String, - // pub code: String, - // pub start_block_height: Option, - // pub schema: Option, - // pub provisioned: bool, - pub indexer_rule: IndexerRule, + pub rule: Rule, } impl IndexerConfig { diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index dfaee7d78..aee9fe667 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -1,3 +1,5 @@ +#![cfg_attr(test, allow(dead_code))] + use std::fmt::Debug; use redis::{aio::ConnectionManager, RedisError, ToRedisArgs}; diff --git a/block-streamer/src/rules/matcher.rs b/block-streamer/src/rules/matcher.rs index 8c1684caa..17fdc420b 100644 --- a/block-streamer/src/rules/matcher.rs +++ b/block-streamer/src/rules/matcher.rs @@ -2,20 +2,20 @@ use near_lake_framework::near_indexer_primitives::{ views::{ActionView, ExecutionStatusView, ReceiptEnumView}, IndexerExecutionOutcomeWithReceipt, }; -use registry_types::{MatchingRule, Status}; +use registry_types::{Rule, Status}; use crate::rules::types::Event; pub fn matches( - matching_rule: &MatchingRule, + indexer_rule: &Rule, receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt, ) -> bool { - match matching_rule { - MatchingRule::ActionAny { + match indexer_rule { + Rule::ActionAny { affected_account_id, status, } => match_action_any(affected_account_id, status, receipt_execution_outcome), - MatchingRule::ActionFunctionCall { + Rule::ActionFunctionCall { affected_account_id, status, function, @@ -25,7 +25,7 @@ pub fn matches( function, receipt_execution_outcome, ), - MatchingRule::Event { + Rule::Event { contract_account_id, event, standard, diff --git a/block-streamer/src/rules/mod.rs b/block-streamer/src/rules/mod.rs index f87b906fd..68146ba23 100644 --- a/block-streamer/src/rules/mod.rs +++ b/block-streamer/src/rules/mod.rs @@ -3,19 +3,17 @@ pub mod outcomes_reducer; pub mod types; use near_lake_framework::near_indexer_primitives::StreamerMessage; -use registry_types::{MatchingRule, OldIndexerRule as IndexerRule}; +use registry_types::Rule; use types::{ChainId, IndexerRuleMatch}; pub fn reduce_indexer_rule_matches( - indexer_rule: &IndexerRule, + indexer_rule: &Rule, streamer_message: &StreamerMessage, chain_id: ChainId, ) -> Vec { - match &indexer_rule.matching_rule { - MatchingRule::ActionAny { .. } - | MatchingRule::ActionFunctionCall { .. } - | MatchingRule::Event { .. } => { + match &indexer_rule { + Rule::ActionAny { .. } | Rule::ActionFunctionCall { .. } | Rule::Event { .. } => { outcomes_reducer::reduce_indexer_rule_matches_from_outcomes( indexer_rule, streamer_message, diff --git a/block-streamer/src/rules/outcomes_reducer.rs b/block-streamer/src/rules/outcomes_reducer.rs index e66fab593..54b357c72 100644 --- a/block-streamer/src/rules/outcomes_reducer.rs +++ b/block-streamer/src/rules/outcomes_reducer.rs @@ -1,13 +1,13 @@ use crate::rules::matcher; use crate::rules::types::Event; use crate::rules::types::{ChainId, IndexerRuleMatch, IndexerRuleMatchPayload}; -use crate::rules::{IndexerRule, MatchingRule}; +use crate::rules::Rule; use near_lake_framework::near_indexer_primitives::{ IndexerExecutionOutcomeWithReceipt, StreamerMessage, }; pub fn reduce_indexer_rule_matches_from_outcomes( - indexer_rule: &IndexerRule, + indexer_rule: &Rule, streamer_message: &StreamerMessage, chain_id: ChainId, ) -> Vec { @@ -20,7 +20,7 @@ pub fn reduce_indexer_rule_matches_from_outcomes( .iter() // future: when extracting Actions, Events, etc this will be a filter operation .find(|receipt_execution_outcome| { - matcher::matches(&indexer_rule.matching_rule, receipt_execution_outcome) + matcher::matches(indexer_rule, receipt_execution_outcome) }) }) .map(|receipt_execution_outcome| { @@ -36,7 +36,7 @@ pub fn reduce_indexer_rule_matches_from_outcomes( } fn build_indexer_rule_match( - indexer_rule: &IndexerRule, + indexer_rule: &Rule, receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt, block_header_hash: String, block_height: u64, @@ -44,8 +44,6 @@ fn build_indexer_rule_match( ) -> IndexerRuleMatch { IndexerRuleMatch { chain_id, - indexer_rule_id: indexer_rule.id, - indexer_rule_name: indexer_rule.name.clone(), payload: build_indexer_rule_match_payload( indexer_rule, receipt_execution_outcome, @@ -56,7 +54,7 @@ fn build_indexer_rule_match( } fn build_indexer_rule_match_payload( - indexer_rule: &IndexerRule, + indexer_rule: &Rule, receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt, block_header_hash: String, ) -> IndexerRuleMatchPayload { @@ -64,15 +62,15 @@ fn build_indexer_rule_match_payload( // specified in the indexer function config. let transaction_hash = None; - match &indexer_rule.matching_rule { - MatchingRule::ActionAny { .. } | MatchingRule::ActionFunctionCall { .. } => { + match &indexer_rule { + Rule::ActionAny { .. } | Rule::ActionFunctionCall { .. } => { IndexerRuleMatchPayload::Actions { block_hash: block_header_hash, receipt_id: receipt_execution_outcome.receipt.receipt_id.to_string(), transaction_hash, } } - MatchingRule::Event { + Rule::Event { event, standard, version, @@ -115,21 +113,16 @@ fn build_indexer_rule_match_payload( #[cfg(test)] mod tests { - use registry_types::{IndexerRuleKind, MatchingRule, OldIndexerRule as IndexerRule, Status}; + use registry_types::{Rule, Status}; use crate::rules::outcomes_reducer::reduce_indexer_rule_matches_from_outcomes; use crate::rules::types::{ChainId, IndexerRuleMatch}; #[tokio::test] async fn match_wildcard_no_match() { - let wildcard_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "*.nearcrow.near".to_string(), - status: Status::Success, - }, - id: None, - name: None, + let wildcard_rule = Rule::ActionAny { + affected_account_id: "*.nearcrow.near".to_string(), + status: Status::Success, }; let streamer_message = crate::test_utils::get_streamer_message(93085141); @@ -144,14 +137,9 @@ mod tests { #[tokio::test] async fn match_wildcard_contract_subaccount_name() { - let wildcard_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "*.nearcrowd.near".to_string(), - status: Status::Success, - }, - id: None, - name: None, + let wildcard_rule = Rule::ActionAny { + affected_account_id: "*.nearcrowd.near".to_string(), + status: Status::Success, }; let streamer_message = crate::test_utils::get_streamer_message(93085141); @@ -166,14 +154,9 @@ mod tests { #[tokio::test] async fn match_wildcard_mid_contract_name() { - let wildcard_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "*crowd.near".to_string(), - status: Status::Success, - }, - id: None, - name: None, + let wildcard_rule = Rule::ActionAny { + affected_account_id: "*crowd.near".to_string(), + status: Status::Success, }; let streamer_message = crate::test_utils::get_streamer_message(93085141); @@ -185,14 +168,9 @@ mod tests { assert_eq!(result.len(), 1); // see Extraction note in previous test - let wildcard_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "app.nea*owd.near".to_string(), - status: Status::Success, - }, - id: None, - name: None, + let wildcard_rule = Rule::ActionAny { + affected_account_id: "app.nea*owd.near".to_string(), + status: Status::Success, }; let result: Vec = reduce_indexer_rule_matches_from_outcomes( @@ -206,14 +184,9 @@ mod tests { #[tokio::test] async fn match_csv_account() { - let wildcard_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "notintheblockaccount.near, app.nearcrowd.near".to_string(), - status: Status::Success, - }, - id: None, - name: None, + let wildcard_rule = Rule::ActionAny { + affected_account_id: "notintheblockaccount.near, app.nearcrowd.near".to_string(), + status: Status::Success, }; let streamer_message = crate::test_utils::get_streamer_message(93085141); @@ -228,14 +201,9 @@ mod tests { #[tokio::test] async fn match_csv_wildcard_account() { - let wildcard_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "notintheblockaccount.near, *.nearcrowd.near".to_string(), - status: Status::Success, - }, - id: None, - name: None, + let wildcard_rule = Rule::ActionAny { + affected_account_id: "notintheblockaccount.near, *.nearcrowd.near".to_string(), + status: Status::Success, }; let streamer_message = crate::test_utils::get_streamer_message(93085141); diff --git a/block-streamer/src/rules/types.rs b/block-streamer/src/rules/types.rs index b61eff208..d3139b28e 100644 --- a/block-streamer/src/rules/types.rs +++ b/block-streamer/src/rules/types.rs @@ -14,8 +14,6 @@ pub type BlockHashString = String; )] pub struct IndexerRuleMatch { pub chain_id: ChainId, - pub indexer_rule_id: Option, - pub indexer_rule_name: Option, pub payload: IndexerRuleMatchPayload, pub block_height: u64, } diff --git a/block-streamer/src/s3_client.rs b/block-streamer/src/s3_client.rs index 6fd3d909a..412fd4ee0 100644 --- a/block-streamer/src/s3_client.rs +++ b/block-streamer/src/s3_client.rs @@ -1,3 +1,5 @@ +#![cfg_attr(test, allow(dead_code))] + const MAX_S3_LIST_REQUESTS: usize = 1000; #[cfg(test)] diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 2fbb6aee5..cb8e7beac 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -6,7 +6,6 @@ use tonic::{Request, Response, Status}; use crate::indexer_config::IndexerConfig; use crate::rules::types::ChainId; -use registry_types::{IndexerRuleKind, MatchingRule, OldIndexerRule as IndexerRule}; use crate::block_stream; use crate::server::blockstreamer; @@ -69,26 +68,21 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic .rule .ok_or(Status::invalid_argument("Rule must be provided"))?; - let matching_rule = match rule { - start_stream_request::Rule::ActionAnyRule(action_any) => MatchingRule::ActionAny { - affected_account_id: action_any.affected_account_id, - status: Self::match_status(action_any.status)?, - }, + let rule = match rule { + start_stream_request::Rule::ActionAnyRule(action_any) => { + registry_types::Rule::ActionAny { + affected_account_id: action_any.affected_account_id, + status: Self::match_status(action_any.status)?, + } + } start_stream_request::Rule::ActionFunctionCallRule(action_function_call) => { - MatchingRule::ActionFunctionCall { + registry_types::Rule::ActionFunctionCall { affected_account_id: action_function_call.affected_account_id, status: Self::match_status(action_function_call.status)?, function: action_function_call.function_name, } } }; - let filter_rule = IndexerRule { - // TODO: Remove kind as it is unused - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule, - id: None, - name: None, - }; let account_id = near_indexer_primitives::types::AccountId::try_from(request.account_id) .map_err(|err| { @@ -99,8 +93,8 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic })?; let indexer_config = IndexerConfig { account_id, + rule, function_name: request.function_name, - indexer_rule: filter_rule, }; let lock = self.get_block_streams_lock()?; diff --git a/coordinator/src/block_streams_handler.rs b/coordinator/src/block_streams/handler.rs similarity index 81% rename from coordinator/src/block_streams_handler.rs rename to coordinator/src/block_streams/handler.rs index 241afa569..f17a974ce 100644 --- a/coordinator/src/block_streams_handler.rs +++ b/coordinator/src/block_streams/handler.rs @@ -1,14 +1,17 @@ #![cfg_attr(test, allow(dead_code))] +pub use block_streamer::StreamInfo; + use anyhow::Context; use block_streamer::block_streamer_client::BlockStreamerClient; use block_streamer::{ start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, ListStreamsRequest, - StartStreamRequest, Status, StopStreamRequest, StreamInfo, + StartStreamRequest, Status, StopStreamRequest, }; use tonic::transport::channel::Channel; use tonic::Request; +use crate::indexer_config::IndexerConfig; use crate::utils::exponential_retry; #[cfg(not(test))] @@ -80,21 +83,17 @@ impl BlockStreamsHandlerImpl { pub async fn start( &self, start_block_height: u64, - account_id: String, - function_name: String, - version: u64, - redis_stream: String, - rule: registry_types::MatchingRule, + indexer_config: &IndexerConfig, ) -> anyhow::Result<()> { - let rule = match &rule { - registry_types::MatchingRule::ActionAny { + let rule = match &indexer_config.rule { + registry_types::Rule::ActionAny { affected_account_id, status, } => Rule::ActionAnyRule(ActionAnyRule { affected_account_id: affected_account_id.to_owned(), status: Self::match_status(status), }), - registry_types::MatchingRule::ActionFunctionCall { + registry_types::Rule::ActionFunctionCall { affected_account_id, status, function, @@ -114,10 +113,10 @@ impl BlockStreamsHandlerImpl { let request = StartStreamRequest { start_block_height, - version, - redis_stream, - account_id: account_id.clone(), - function_name: function_name.clone(), + version: indexer_config.get_registry_version(), + redis_stream: indexer_config.get_redis_stream_key(), + account_id: indexer_config.account_id.to_string(), + function_name: indexer_config.function_name.clone(), rule: Some(rule), }; @@ -128,16 +127,16 @@ impl BlockStreamsHandlerImpl { .await .map_err(|error| { tracing::error!( - account_id, - function_name, + account_id = indexer_config.account_id.as_str(), + function_name = indexer_config.function_name, "Failed to start stream\n{error:?}" ); }); tracing::debug!( - account_id, - function_name, - version, + account_id = indexer_config.account_id.as_str(), + function_name = indexer_config.function_name, + version = indexer_config.get_registry_version(), "Start stream response: {:#?}", response ); diff --git a/coordinator/src/block_streams/mod.rs b/coordinator/src/block_streams/mod.rs new file mode 100644 index 000000000..cd8b6fd96 --- /dev/null +++ b/coordinator/src/block_streams/mod.rs @@ -0,0 +1,5 @@ +mod handler; +mod synchronise; + +pub use handler::BlockStreamsHandler; +pub use synchronise::synchronise_block_streams; diff --git a/coordinator/src/block_streams/synchronise.rs b/coordinator/src/block_streams/synchronise.rs new file mode 100644 index 000000000..ed42ec1a3 --- /dev/null +++ b/coordinator/src/block_streams/synchronise.rs @@ -0,0 +1,677 @@ +use std::cmp::Ordering; + +use registry_types::StartBlock; + +use crate::indexer_config::IndexerConfig; +use crate::migration::MIGRATED_STREAM_VERSION; +use crate::redis::RedisClient; +use crate::registry::IndexerRegistry; + +use super::handler::{BlockStreamsHandler, StreamInfo}; + +pub async fn synchronise_block_streams( + indexer_registry: &IndexerRegistry, + redis_client: &RedisClient, + block_streams_handler: &BlockStreamsHandler, +) -> anyhow::Result<()> { + let mut active_block_streams = block_streams_handler.list().await?; + + for (account_id, indexers) in indexer_registry.iter() { + for (function_name, indexer_config) in indexers.iter() { + let active_block_stream = active_block_streams + .iter() + .position(|stream| { + stream.account_id == account_id.to_string() + && &stream.function_name == function_name + }) + .map(|index| active_block_streams.swap_remove(index)); + + let _ = synchronise_block_stream( + active_block_stream, + indexer_config, + redis_client, + block_streams_handler, + ) + .await + .map_err(|err| { + tracing::error!( + account_id = account_id.as_str(), + function_name, + version = indexer_config.get_registry_version(), + "failed to sync block stream: {err:?}" + ) + }); + } + } + + for unregistered_block_stream in active_block_streams { + tracing::info!( + account_id = unregistered_block_stream.account_id.as_str(), + function_name = unregistered_block_stream.function_name, + version = unregistered_block_stream.version, + "Stopping unregistered block stream" + ); + + block_streams_handler + .stop(unregistered_block_stream.stream_id) + .await?; + } + + Ok(()) +} + +#[tracing::instrument( + skip_all, + fields( + account_id = %indexer_config.account_id, + function_name = indexer_config.function_name, + version = indexer_config.get_registry_version() + ) +)] +async fn synchronise_block_stream( + active_block_stream: Option, + indexer_config: &IndexerConfig, + redis_client: &RedisClient, + block_streams_handler: &BlockStreamsHandler, +) -> anyhow::Result<()> { + if let Some(active_block_stream) = active_block_stream { + if active_block_stream.version == indexer_config.get_registry_version() { + return Ok(()); + } + + tracing::info!( + previous_version = active_block_stream.version, + "Stopping outdated block stream" + ); + + block_streams_handler + .stop(active_block_stream.stream_id) + .await?; + } + + let stream_status = get_stream_status(indexer_config, redis_client).await?; + + clear_block_stream_if_needed(&stream_status, indexer_config, redis_client).await?; + + let start_block_height = + determine_start_block_height(&stream_status, indexer_config, redis_client).await?; + + block_streams_handler + .start(start_block_height, indexer_config) + .await?; + + redis_client.set_stream_version(indexer_config).await?; + + Ok(()) +} + +#[derive(Debug)] +enum StreamStatus { + /// Stream has just been migrated to V2 + Migrated, + /// Stream version is synchronized with the registry + Synced, + /// Stream version does not match registry + Outdated, + /// No stream version, therefore new + New, +} + +async fn get_stream_status( + indexer_config: &IndexerConfig, + redis_client: &RedisClient, +) -> anyhow::Result { + let stream_version = redis_client.get_stream_version(indexer_config).await?; + + if stream_version.is_none() { + return Ok(StreamStatus::New); + } + + let stream_version = stream_version.unwrap(); + + if stream_version == MIGRATED_STREAM_VERSION { + return Ok(StreamStatus::Migrated); + } + + match indexer_config.get_registry_version().cmp(&stream_version) { + Ordering::Equal => Ok(StreamStatus::Synced), + Ordering::Greater => Ok(StreamStatus::Outdated), + Ordering::Less => { + tracing::warn!("Found stream with version greater than registry, treating as outdated"); + + Ok(StreamStatus::Outdated) + } + } +} + +async fn clear_block_stream_if_needed( + stream_status: &StreamStatus, + indexer_config: &IndexerConfig, + redis_client: &RedisClient, +) -> anyhow::Result<()> { + if matches!( + stream_status, + StreamStatus::Migrated | StreamStatus::Synced | StreamStatus::New + ) || indexer_config.start_block == StartBlock::Continue + { + return Ok(()); + } + + tracing::info!("Clearing redis stream"); + + redis_client.clear_block_stream(indexer_config).await +} + +async fn determine_start_block_height( + stream_status: &StreamStatus, + indexer_config: &IndexerConfig, + redis_client: &RedisClient, +) -> anyhow::Result { + if matches!(stream_status, StreamStatus::Migrated | StreamStatus::Synced) { + tracing::info!("Resuming block stream"); + + return get_continuation_block_height(indexer_config, redis_client).await; + } + + tracing::info!(start_block = ?indexer_config.start_block, "Stating new block stream"); + + match indexer_config.start_block { + StartBlock::Latest => Ok(indexer_config.get_registry_version()), + StartBlock::Height(height) => Ok(height), + StartBlock::Continue => get_continuation_block_height(indexer_config, redis_client).await, + } +} + +async fn get_continuation_block_height( + indexer_config: &IndexerConfig, + redis_client: &RedisClient, +) -> anyhow::Result { + redis_client + .get_last_published_block(indexer_config) + .await? + .map(|height| height + 1) + .ok_or(anyhow::anyhow!("Indexer has no `last_published_block`")) +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::HashMap; + + use mockall::predicate; + use registry_types::{Rule, Status}; + + #[tokio::test] + async fn resumes_stream_with_matching_redis_version() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(200), + start_block: StartBlock::Height(100), + }; + + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(200))) + .once(); + redis_client + .expect_get_last_published_block() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(500))) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + redis_client.expect_clear_block_stream().never(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler + .expect_start() + .with(predicate::eq(501), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn starts_stream_with_latest() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(200), + start_block: StartBlock::Latest, + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(1))) + .once(); + redis_client + .expect_clear_block_stream() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler.expect_stop().never(); + block_stream_handler + .expect_start() + .with(predicate::eq(200), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn starts_stream_with_height() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(200), + start_block: StartBlock::Height(100), + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(1))) + .once(); + redis_client + .expect_clear_block_stream() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler.expect_stop().never(); + block_stream_handler + .expect_start() + .with(predicate::eq(100), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn starts_stream_with_continue() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(200), + start_block: StartBlock::Continue, + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(1))) + .once(); + redis_client + .expect_get_last_published_block() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(100))) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler.expect_stop().never(); + block_stream_handler + .expect_start() + .with(predicate::eq(101), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn stops_stream_not_in_registry() { + let indexer_registry = HashMap::from([]); + + let redis_client = RedisClient::default(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| { + Ok(vec![block_streamer::StreamInfo { + stream_id: "stream_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 1, + }]) + }); + block_stream_handler + .expect_stop() + .with(predicate::eq("stream_id".to_string())) + .returning(|_| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn ignores_stream_with_matching_registry_version() { + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([( + "test".to_string(), + IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 101, + updated_at_block_height: None, + start_block: StartBlock::Latest, + }, + )]), + )]); + + let redis_client = RedisClient::default(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| { + Ok(vec![block_streamer::StreamInfo { + stream_id: "stream_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 101, + }]) + }); + block_stream_handler.expect_stop().never(); + block_stream_handler.expect_start().never(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn restarts_streams_when_registry_version_differs() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 101, + updated_at_block_height: Some(199), + start_block: StartBlock::Height(1000), + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(101))) + .once(); + redis_client + .expect_clear_block_stream() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| { + Ok(vec![block_streamer::StreamInfo { + stream_id: "stream_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 101, + }]) + }); + block_stream_handler + .expect_stop() + .with(predicate::eq("stream_id".to_string())) + .returning(|_| Ok(())) + .once(); + block_stream_handler + .expect_start() + .with(predicate::eq(1000), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn resumes_stream_post_migration() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 101, + updated_at_block_height: Some(200), + start_block: StartBlock::Height(1000), + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(MIGRATED_STREAM_VERSION))) + .once(); + redis_client + .expect_get_last_published_block() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(100))) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler.expect_stop().never(); + block_stream_handler + .expect_start() + .with(predicate::eq(101), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn does_not_start_stream_without_last_published_block() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 101, + updated_at_block_height: Some(200), + start_block: StartBlock::Continue, + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(101))) + .once(); + redis_client + .expect_get_last_published_block() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| anyhow::bail!("no last_published_block")) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler.expect_stop().never(); + block_stream_handler.expect_start().never(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn starts_block_stream_for_first_time() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 101, + updated_at_block_height: None, + start_block: StartBlock::Height(50), + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(None)) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler.expect_stop().never(); + block_stream_handler + .expect_start() + .with(predicate::eq(50), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } +} diff --git a/coordinator/src/executors_handler.rs b/coordinator/src/executors/handler.rs similarity index 71% rename from coordinator/src/executors_handler.rs rename to coordinator/src/executors/handler.rs index 496ce7a86..68be45f29 100644 --- a/coordinator/src/executors_handler.rs +++ b/coordinator/src/executors/handler.rs @@ -1,11 +1,14 @@ #![cfg_attr(test, allow(dead_code))] +pub use runner::ExecutorInfo; + use anyhow::Context; use runner::runner_client::RunnerClient; -use runner::{ExecutorInfo, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; +use runner::{ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; use tonic::transport::channel::Channel; use tonic::Request; +use crate::indexer_config::IndexerConfig; use crate::utils::exponential_retry; #[cfg(not(test))] @@ -46,22 +49,14 @@ impl ExecutorsHandlerImpl { .await } - pub async fn start( - &self, - account_id: String, - function_name: String, - code: String, - schema: String, - redis_stream: String, - version: u64, - ) -> anyhow::Result<()> { + pub async fn start(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { let request = StartExecutorRequest { - code, - schema, - redis_stream, - version, - account_id: account_id.clone(), - function_name: function_name.clone(), + code: indexer_config.code.clone(), + schema: indexer_config.schema.clone(), + redis_stream: indexer_config.get_redis_stream_key(), + version: indexer_config.get_registry_version(), + account_id: indexer_config.account_id.to_string(), + function_name: indexer_config.function_name.clone(), }; let response = self @@ -71,16 +66,16 @@ impl ExecutorsHandlerImpl { .await .map_err(|error| { tracing::error!( - account_id, - function_name, + account_id = indexer_config.account_id.as_str(), + function_name = indexer_config.function_name, "Failed to start executor\n{error:?}" ); }); tracing::debug!( - account_id, - function_name, - version, + account_id = indexer_config.account_id.as_str(), + function_name = indexer_config.function_name, + version = indexer_config.get_registry_version(), "Start executors response: {:#?}", response ); diff --git a/coordinator/src/executors/mod.rs b/coordinator/src/executors/mod.rs new file mode 100644 index 000000000..1b68609c6 --- /dev/null +++ b/coordinator/src/executors/mod.rs @@ -0,0 +1,5 @@ +mod handler; +mod synchronise; + +pub use handler::ExecutorsHandler; +pub use synchronise::synchronise_executors; diff --git a/coordinator/src/executors/synchronise.rs b/coordinator/src/executors/synchronise.rs new file mode 100644 index 000000000..ed8685a54 --- /dev/null +++ b/coordinator/src/executors/synchronise.rs @@ -0,0 +1,248 @@ +use crate::indexer_config::IndexerConfig; +use crate::registry::IndexerRegistry; + +use super::handler::{ExecutorInfo, ExecutorsHandler}; + +const V1_EXECUTOR_VERSION: u64 = 0; + +pub async fn synchronise_executors( + indexer_registry: &IndexerRegistry, + executors_handler: &ExecutorsHandler, +) -> anyhow::Result<()> { + let active_executors = executors_handler.list().await?; + + // Ignore V1 executors + let mut active_executors: Vec<_> = active_executors + .into_iter() + .filter(|executor| executor.version != V1_EXECUTOR_VERSION) + .collect(); + + for (account_id, indexers) in indexer_registry.iter() { + for (function_name, indexer_config) in indexers.iter() { + let active_executor = active_executors + .iter() + .position(|stream| { + stream.account_id == account_id.to_string() + && &stream.function_name == function_name + }) + .map(|index| active_executors.swap_remove(index)); + + let _ = synchronise_executor(active_executor, indexer_config, executors_handler) + .await + .map_err(|err| { + tracing::error!( + account_id = account_id.as_str(), + function_name, + version = indexer_config.get_registry_version(), + "failed to sync executor: {err:?}" + ) + }); + } + } + + for unregistered_executor in active_executors { + tracing::info!( + account_id = unregistered_executor.account_id.as_str(), + function_name = unregistered_executor.function_name, + registry_version = unregistered_executor.version, + "Stopping unregistered executor" + ); + + executors_handler + .stop(unregistered_executor.executor_id) + .await?; + } + + Ok(()) +} + +#[tracing::instrument( + skip_all, + fields( + account_id = %indexer_config.account_id, + function_name = indexer_config.function_name, + version = indexer_config.get_registry_version() + ) +)] +async fn synchronise_executor( + active_executor: Option, + indexer_config: &IndexerConfig, + executors_handler: &ExecutorsHandler, +) -> anyhow::Result<()> { + let registry_version = indexer_config.get_registry_version(); + + if let Some(active_executor) = active_executor { + if active_executor.version == registry_version { + return Ok(()); + } + + tracing::info!("Stopping outdated executor"); + + executors_handler.stop(active_executor.executor_id).await?; + } + + tracing::info!("Starting new executor"); + + executors_handler.start(indexer_config).await?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::HashMap; + + use mockall::predicate; + use registry_types::{Rule, StartBlock, Status}; + + use crate::indexer_config::IndexerConfig; + + #[tokio::test] + async fn starts_executor() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: "code".to_string(), + schema: "schema".to_string(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: None, + start_block: StartBlock::Height(100), + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler.expect_list().returning(|| Ok(vec![])); + executors_handler + .expect_start() + .with(predicate::eq(indexer_config)) + .returning(|_| Ok(())) + .once(); + + synchronise_executors(&indexer_registry, &executors_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn restarts_executor_when_registry_version_differs() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: "code".to_string(), + schema: "schema".to_string(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(2), + start_block: StartBlock::Height(100), + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler.expect_list().returning(|| { + Ok(vec![runner::ExecutorInfo { + executor_id: "executor_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + status: "running".to_string(), + version: 1, + }]) + }); + executors_handler + .expect_stop() + .with(predicate::eq("executor_id".to_string())) + .returning(|_| Ok(())) + .once(); + + executors_handler + .expect_start() + .with(predicate::eq(indexer_config)) + .returning(|_| Ok(())) + .once(); + + synchronise_executors(&indexer_registry, &executors_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn ignores_executor_with_matching_registry_version() { + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([( + "test".to_string(), + IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: "code".to_string(), + schema: "schema".to_string(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(2), + start_block: StartBlock::Height(100), + }, + )]), + )]); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler.expect_list().returning(|| { + Ok(vec![runner::ExecutorInfo { + executor_id: "executor_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + status: "running".to_string(), + version: 2, + }]) + }); + executors_handler.expect_stop().never(); + + executors_handler.expect_start().never(); + + synchronise_executors(&indexer_registry, &executors_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn stops_executor_not_in_registry() { + let indexer_registry = HashMap::from([]); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler.expect_list().returning(|| { + Ok(vec![runner::ExecutorInfo { + executor_id: "executor_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + status: "running".to_string(), + version: 2, + }]) + }); + + executors_handler + .expect_stop() + .with(predicate::eq("executor_id".to_string())) + .returning(|_| Ok(())) + .once(); + + synchronise_executors(&indexer_registry, &executors_handler) + .await + .unwrap(); + } +} diff --git a/coordinator/src/indexer_config.rs b/coordinator/src/indexer_config.rs new file mode 100644 index 000000000..e7b049281 --- /dev/null +++ b/coordinator/src/indexer_config.rs @@ -0,0 +1,45 @@ +use near_primitives::types::AccountId; +use registry_types::{Rule, StartBlock}; + +#[derive(Debug, Clone, PartialEq)] +pub struct IndexerConfig { + pub account_id: AccountId, + pub function_name: String, + pub code: String, + pub start_block: StartBlock, + pub schema: String, + pub rule: Rule, + pub updated_at_block_height: Option, + pub created_at_block_height: u64, +} + +impl IndexerConfig { + pub fn get_full_name(&self) -> String { + format!("{}/{}", self.account_id, self.function_name) + } + + pub fn get_redis_stream_key(&self) -> String { + format!("{}:block_stream", self.get_full_name()) + } + + pub fn get_historical_redis_stream_key(&self) -> String { + format!("{}:historical:stream", self.get_full_name()) + } + + pub fn get_real_time_redis_stream_key(&self) -> String { + format!("{}:real_time:stream", self.get_full_name()) + } + + pub fn get_last_published_block_key(&self) -> String { + format!("{}:last_published_block", self.get_full_name()) + } + + pub fn get_redis_stream_version_key(&self) -> String { + format!("{}:version", self.get_redis_stream_key()) + } + + pub fn get_registry_version(&self) -> u64 { + self.updated_at_block_height + .unwrap_or(self.created_at_block_height) + } +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 66322065b..d87eeca45 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -4,20 +4,20 @@ use near_primitives::types::AccountId; use tokio::time::sleep; use tracing_subscriber::prelude::*; -use crate::block_streams_handler::BlockStreamsHandler; -use crate::executors_handler::ExecutorsHandler; +use crate::block_streams::{synchronise_block_streams, BlockStreamsHandler}; +use crate::executors::{synchronise_executors, ExecutorsHandler}; use crate::redis::RedisClient; -use crate::registry::{IndexerRegistry, Registry}; +use crate::registry::Registry; -mod block_streams_handler; -mod executors_handler; +mod block_streams; +mod executors; +mod indexer_config; mod migration; mod redis; mod registry; mod utils; const CONTROL_LOOP_THROTTLE_SECONDS: Duration = Duration::from_secs(1); -const V1_EXECUTOR_VERSION: u64 = 0; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -76,918 +76,3 @@ async fn main() -> anyhow::Result<()> { )?; } } - -async fn synchronise_executors( - indexer_registry: &IndexerRegistry, - executors_handler: &ExecutorsHandler, -) -> anyhow::Result<()> { - let active_executors = executors_handler.list().await?; - - // Ignore V1 executors - let mut active_executors: Vec<_> = active_executors - .into_iter() - .filter(|executor| executor.version != V1_EXECUTOR_VERSION) - .collect(); - - for (account_id, indexers) in indexer_registry.iter() { - for (function_name, indexer_config) in indexers.iter() { - let active_executor = active_executors - .iter() - .position(|stream| { - stream.account_id == account_id.to_string() - && &stream.function_name == function_name - }) - .map(|index| active_executors.swap_remove(index)); - - let registry_version = indexer_config - .updated_at_block_height - .unwrap_or(indexer_config.created_at_block_height); - - if let Some(active_executor) = active_executor { - if active_executor.version == registry_version { - continue; - } - - tracing::info!( - account_id = active_executor.account_id.as_str(), - function_name = active_executor.function_name, - registry_version = active_executor.version, - "Stopping executor" - ); - - executors_handler.stop(active_executor.executor_id).await?; - } - - tracing::info!( - account_id = account_id.as_str(), - function_name, - registry_version, - "Starting executor" - ); - - executors_handler - .start( - account_id.to_string(), - function_name.to_string(), - indexer_config.code.clone(), - indexer_config.schema.clone().unwrap_or_default(), - indexer_config.get_redis_stream(), - registry_version, - ) - .await?; - } - } - - for unregistered_executor in active_executors { - tracing::info!( - account_id = unregistered_executor.account_id.as_str(), - function_name = unregistered_executor.function_name, - registry_version = unregistered_executor.version, - "Stopping unregistered executor" - ); - - executors_handler - .stop(unregistered_executor.executor_id) - .await?; - } - - Ok(()) -} - -async fn synchronise_block_streams( - indexer_registry: &IndexerRegistry, - redis_client: &RedisClient, - block_streams_handler: &BlockStreamsHandler, -) -> anyhow::Result<()> { - let mut active_block_streams = block_streams_handler.list().await?; - - for (account_id, indexers) in indexer_registry.iter() { - for (function_name, indexer_config) in indexers.iter() { - let active_block_stream = active_block_streams - .iter() - .position(|stream| { - stream.account_id == account_id.to_string() - && &stream.function_name == function_name - }) - .map(|index| active_block_streams.swap_remove(index)); - - let registry_version = indexer_config - .updated_at_block_height - .unwrap_or(indexer_config.created_at_block_height); - - // TODO: Ensure start block height is only used to successfully start block stream ONCE - // TODO: Ensure last published blockheight is used on fresh restarts for existing indexers - if let Some(active_block_stream) = active_block_stream { - if active_block_stream.version == registry_version { - continue; - } - - tracing::info!( - account_id = active_block_stream.account_id.as_str(), - function_name = active_block_stream.function_name, - registry_version = active_block_stream.version, - "Stopping block stream" - ); - - block_streams_handler - .stop(active_block_stream.stream_id) - .await?; - } - - let start_block_height = if let Some(start_block_height) = - indexer_config.start_block_height - { - start_block_height - } else if let Ok(Some(last_published_block)) = redis_client - .get::(format!( - "{}:last_published_block", - indexer_config.get_full_name() - )) - .await - { - last_published_block - } else if let Some(updated_at_block_height) = indexer_config.updated_at_block_height { - updated_at_block_height - } else { - indexer_config.created_at_block_height - }; - - tracing::info!( - account_id = account_id.as_str(), - function_name, - registry_version, - "Starting block stream" - ); - - block_streams_handler - .start( - start_block_height, - indexer_config.account_id.to_string(), - indexer_config.function_name.clone(), - registry_version, - indexer_config.get_redis_stream(), - indexer_config.filter.matching_rule.clone(), - ) - .await?; - } - } - - for unregistered_block_stream in active_block_streams { - tracing::info!( - account_id = unregistered_block_stream.account_id.as_str(), - function_name = unregistered_block_stream.function_name, - registry_version = unregistered_block_stream.version, - "Stopping unregistered block stream" - ); - - block_streams_handler - .stop(unregistered_block_stream.stream_id) - .await?; - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - - use mockall::predicate; - use std::collections::HashMap; - - use registry_types::{IndexerRuleKind, MatchingRule, OldIndexerRule as IndexerRule, Status}; - - use crate::registry::IndexerConfig; - - mod executors { - use super::*; - - #[tokio::test] - async fn starts_executors() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: "code".to_string(), - schema: Some("schema".to_string()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: None, - start_block_height: Some(100), - }, - )]), - )]); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| Ok(vec![])); - executors_handler - .expect_start() - .with( - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq("code".to_string()), - predicate::eq("schema".to_string()), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(1), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn restarts_executors_with_mismatched_versions() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: "code".to_string(), - schema: Some("schema".to_string()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(2), - start_block_height: Some(100), - }, - )]), - )]); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| { - Ok(vec![runner::ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - status: "running".to_string(), - version: 1, - }]) - }); - executors_handler - .expect_stop() - .with(predicate::eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - executors_handler - .expect_start() - .with( - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq("code".to_string()), - predicate::eq("schema".to_string()), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(2), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_executors_with_matching_versions() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: "code".to_string(), - schema: Some("schema".to_string()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(2), - start_block_height: Some(100), - }, - )]), - )]); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| { - Ok(vec![runner::ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - status: "running".to_string(), - version: 2, - }]) - }); - executors_handler.expect_stop().never(); - - executors_handler.expect_start().never(); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn stops_executors_not_in_registry() { - let indexer_registry = HashMap::from([]); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| { - Ok(vec![runner::ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - status: "running".to_string(), - version: 2, - }]) - }); - - executors_handler - .expect_stop() - .with(predicate::eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - } - - mod block_stream { - use super::*; - - // TODO: Add Test for when indexer updated, block stream fails to start, and then restarted successfully - #[ignore] // TODO: Re-Enable when case is covered. - #[tokio::test] - async fn uses_last_published_block_height_when_restarting_existing_indexer_block_stream() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block_height: Some(100), - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| Ok(Some(500))); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler - .expect_start() - .with( - predicate::eq(500), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(200), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn uses_last_published_block_height_when_updating_without_start_block_height() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block_height: None, - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| Ok(Some(500))); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "morgs.near/test:block_stream".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 1, - }]) - }); - block_stream_handler - .expect_stop() - .with(predicate::eq("morgs.near/test:block_stream".to_string())) - .returning(|_| Ok(())) - .once(); - block_stream_handler - .expect_start() - .with( - predicate::eq(500), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(200), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn uses_start_block_height_for_brand_new_indexer() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: None, - start_block_height: Some(100), - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler - .expect_start() - .with( - predicate::eq(100), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(1), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn uses_start_block_height_when_updating_with_start_block_height() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block_height: Some(100), - }, - )]), - )]); - - let redis_client = RedisClient::default(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "morgs.near/test:block_stream".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 1, - }]) - }); - block_stream_handler - .expect_stop() - .with(predicate::eq("morgs.near/test:block_stream".to_string())) - .returning(|_| Ok(())) - .once(); - block_stream_handler - .expect_start() - .with( - predicate::eq(100), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(200), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn uses_start_block_height_when_no_last_published_block_and_no_block_stream() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block_height: Some(100), - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler - .expect_start() - .with( - predicate::eq(100), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(200), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn uses_updated_block_height_when_no_last_published_block_no_block_stream_no_start_block_height( - ) { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block_height: None, - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler - .expect_start() - .with( - predicate::eq(200), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(200), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn uses_created_block_height_for_brand_new_indexer_without_start() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: None, - start_block_height: None, - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler - .expect_start() - .with( - predicate::eq(1), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(1), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn stops_streams_not_in_registry() { - let indexer_registry = HashMap::from([]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "stream_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 1, - }]) - }); - block_stream_handler - .expect_stop() - .with(predicate::eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - - synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_streams_with_matching_versions() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 101, - updated_at_block_height: None, - start_block_height: None, - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "stream_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 101, - }]) - }); - block_stream_handler.expect_stop().never(); - block_stream_handler.expect_start().never(); - - synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn restarts_streams_with_mismatched_versions() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 101, - updated_at_block_height: Some(200), - start_block_height: Some(1000), - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "stream_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 101, - }]) - }); - block_stream_handler - .expect_stop() - .with(predicate::eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - block_stream_handler - .expect_start() - .with( - predicate::eq(1000), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(200), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) - .await - .unwrap(); - } - } -} diff --git a/coordinator/src/migration.rs b/coordinator/src/migration.rs index 514fbb596..f4faccbb1 100644 --- a/coordinator/src/migration.rs +++ b/coordinator/src/migration.rs @@ -4,9 +4,12 @@ use anyhow::Context; use near_primitives::types::AccountId; use redis::{ErrorKind, RedisError}; -use crate::executors_handler::ExecutorsHandler; +use crate::executors::ExecutorsHandler; +use crate::indexer_config::IndexerConfig; use crate::redis::RedisClient; -use crate::registry::{IndexerConfig, IndexerRegistry}; +use crate::registry::IndexerRegistry; + +pub const MIGRATED_STREAM_VERSION: u64 = 0; #[derive(serde::Deserialize, serde::Serialize, Debug)] pub struct AllowlistEntry { @@ -111,6 +114,9 @@ async fn migrate_account( merge_streams(redis_client, &existing_streams, indexer_config) .await .context("Failed to merge streams")?; + update_stream_version(redis_client, indexer_config) + .await + .context("Failed to set Redis Stream version")?; } set_migrated_flag(redis_client, account_id)?; @@ -129,29 +135,29 @@ async fn remove_from_streams_set( if redis_client .srem( RedisClient::STREAMS_SET, - indexer_config.get_historical_redis_stream(), + indexer_config.get_historical_redis_stream_key(), ) .await? .is_some() && redis_client - .exists(indexer_config.get_historical_redis_stream()) + .exists(indexer_config.get_historical_redis_stream_key()) .await? { - result.push(indexer_config.get_historical_redis_stream()); + result.push(indexer_config.get_historical_redis_stream_key()); } if redis_client .srem( RedisClient::STREAMS_SET, - indexer_config.get_real_time_redis_stream(), + indexer_config.get_real_time_redis_stream_key(), ) .await? .is_some() && redis_client - .exists(indexer_config.get_real_time_redis_stream()) + .exists(indexer_config.get_real_time_redis_stream_key()) .await? { - result.push(indexer_config.get_real_time_redis_stream()); + result.push(indexer_config.get_real_time_redis_stream_key()); }; Ok(result) @@ -179,7 +185,7 @@ async fn merge_streams( redis_client .rename( existing_streams[0].to_owned(), - indexer_config.get_redis_stream(), + indexer_config.get_redis_stream_key(), ) .await?; @@ -190,7 +196,7 @@ async fn merge_streams( let real_time_stream = existing_streams[1].to_owned(); redis_client - .rename(historical_stream, indexer_config.get_redis_stream()) + .rename(historical_stream, indexer_config.get_redis_stream_key()) .await?; loop { @@ -216,10 +222,13 @@ async fn merge_streams( .collect(); redis_client - .xadd(indexer_config.get_redis_stream(), &fields) + .xadd(indexer_config.get_redis_stream_key(), &fields) .await?; redis_client - .xdel(indexer_config.get_real_time_redis_stream(), stream_id.id) + .xdel( + indexer_config.get_real_time_redis_stream_key(), + stream_id.id, + ) .await? } } @@ -230,6 +239,20 @@ async fn merge_streams( } } +async fn update_stream_version( + redis_client: &RedisClient, + indexer_config: &IndexerConfig, +) -> anyhow::Result<()> { + redis_client + .set( + indexer_config.get_redis_stream_version_key(), + MIGRATED_STREAM_VERSION, + ) + .await?; + + Ok(()) +} + fn set_failed_flag(redis_client: &RedisClient, account_id: &AccountId) -> anyhow::Result<()> { let account_id = account_id.to_owned(); @@ -281,9 +304,9 @@ mod tests { use std::collections::HashMap; use mockall::predicate; - use registry_types::{IndexerRuleKind, MatchingRule, OldIndexerRule, Status}; + use registry_types::{Rule, StartBlock, Status}; - use crate::registry::IndexerConfig; + use crate::indexer_config::IndexerConfig; #[tokio::test] async fn ignores_migrated_indexers() { @@ -295,19 +318,14 @@ mod tests { account_id: "morgs.near".parse().unwrap(), function_name: "test".to_string(), code: String::new(), - schema: Some(String::new()), - filter: OldIndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, }, created_at_block_height: 101, updated_at_block_height: Some(200), - start_block_height: Some(1000), + start_block: StartBlock::Height(1000), }, )]), )]); @@ -368,19 +386,14 @@ mod tests { account_id: "morgs.near".parse().unwrap(), function_name: "test".to_string(), code: String::new(), - schema: Some(String::new()), - filter: OldIndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, }, created_at_block_height: 101, updated_at_block_height: Some(200), - start_block_height: Some(1000), + start_block: StartBlock::Height(1000), }, )]), )]); @@ -474,6 +487,14 @@ mod tests { ) .returning(|_, _| Ok(())) .once(); + redis_client + .expect_set::() + .with( + predicate::eq(String::from("morgs.near/test:block_stream:version")), + predicate::eq(MIGRATED_STREAM_VERSION), + ) + .returning(|_, _| Ok(())) + .once(); redis_client .expect_atomic_update::<&str, String, String>() .returning(|_, _| Ok(())); diff --git a/coordinator/src/redis.rs b/coordinator/src/redis.rs index 7780b582e..a5c26d28a 100644 --- a/coordinator/src/redis.rs +++ b/coordinator/src/redis.rs @@ -7,6 +7,8 @@ use redis::{ aio::ConnectionManager, streams, AsyncCommands, FromRedisValue, RedisResult, ToRedisArgs, }; +use crate::indexer_config::IndexerConfig; + #[cfg(test)] pub use MockRedisClientImpl as RedisClient; #[cfg(not(test))] @@ -50,6 +52,21 @@ impl RedisClientImpl { Ok(value) } + pub async fn set(&self, key: K, value: V) -> anyhow::Result<()> + where + K: ToRedisArgs + Debug + Send + Sync + 'static, + V: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("SET: {key:?} {value:?}"); + + self.connection + .clone() + .set(&key, &value) + .await + .context(format!("SET: {key:?} {value:?}"))?; + + Ok(()) + } pub async fn rename(&self, old_key: K, new_key: V) -> anyhow::Result<()> where @@ -158,6 +175,20 @@ impl RedisClientImpl { .context(format!("EXISTS {key:?}")) } + pub async fn del(&self, key: K) -> anyhow::Result<()> + where + K: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("DEL {key:?}"); + + self.connection + .clone() + .del(&key) + .await + .map_err(|e| anyhow::format_err!(e)) + .context(format!("DEL {key:?}")) + } + // `redis::transaction`s currently don't work with async connections, so we have to create a _new_ // blocking connection to atmoically update a value. pub fn atomic_update(&self, key: K, update_fn: F) -> anyhow::Result<()> @@ -178,4 +209,32 @@ impl RedisClientImpl { Ok(()) } + + pub async fn get_stream_version( + &self, + indexer_config: &IndexerConfig, + ) -> anyhow::Result> { + self.get::<_, u64>(indexer_config.get_redis_stream_version_key()) + .await + } + + pub async fn get_last_published_block( + &self, + indexer_config: &IndexerConfig, + ) -> anyhow::Result> { + self.get::<_, u64>(indexer_config.get_last_published_block_key()) + .await + } + + pub async fn clear_block_stream(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { + self.del(indexer_config.get_redis_stream_key()).await + } + + pub async fn set_stream_version(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { + self.set( + indexer_config.get_redis_stream_version_key(), + indexer_config.get_registry_version(), + ) + .await + } } diff --git a/coordinator/src/registry.rs b/coordinator/src/registry.rs index b3ed7776e..7318ae4f2 100644 --- a/coordinator/src/registry.rs +++ b/coordinator/src/registry.rs @@ -8,44 +8,13 @@ use near_jsonrpc_client::JsonRpcClient; use near_jsonrpc_primitives::types::query::QueryResponseKind; use near_primitives::types::{AccountId, BlockReference, Finality, FunctionArgs}; use near_primitives::views::QueryRequest; -use registry_types::{ - OldAccountOrAllIndexers as AccountOrAllIndexers, OldIndexerRule as IndexerRule, -}; +use registry_types::AllIndexers; +use crate::indexer_config::IndexerConfig; use crate::utils::exponential_retry; pub type IndexerRegistry = HashMap>; -#[derive(Debug, Clone)] -pub struct IndexerConfig { - pub account_id: AccountId, - pub function_name: String, - pub code: String, - pub start_block_height: Option, - pub schema: Option, - pub filter: IndexerRule, - pub updated_at_block_height: Option, - pub created_at_block_height: u64, -} - -impl IndexerConfig { - pub fn get_full_name(&self) -> String { - format!("{}/{}", self.account_id, self.function_name) - } - - pub fn get_redis_stream(&self) -> String { - format!("{}:block_stream", self.get_full_name()) - } - - pub fn get_historical_redis_stream(&self) -> String { - format!("{}:historical:stream", self.get_full_name()) - } - - pub fn get_real_time_redis_stream(&self) -> String { - format!("{}:real_time:stream", self.get_full_name()) - } -} - #[cfg(test)] pub use MockRegistryImpl as Registry; #[cfg(not(test))] @@ -58,7 +27,7 @@ pub struct RegistryImpl { #[cfg_attr(test, mockall::automock)] impl RegistryImpl { - const LIST_METHOD: &str = "list_indexer_functions"; + const LIST_METHOD: &str = "list_all"; pub fn connect(registry_contract_id: AccountId, rpc_url: &str) -> Self { let json_rpc_client = JsonRpcClient::connect(rpc_url); @@ -71,7 +40,7 @@ impl RegistryImpl { fn enrich_indexer_registry( &self, - registry: HashMap>, + registry: HashMap>, ) -> IndexerRegistry { registry .into_iter() @@ -85,9 +54,9 @@ impl RegistryImpl { account_id: account_id.clone(), function_name, code: indexer.code, - start_block_height: indexer.start_block_height, + start_block: indexer.start_block, schema: indexer.schema, - filter: indexer.filter, + rule: indexer.rule, updated_at_block_height: indexer.updated_at_block_height, created_at_block_height: indexer.created_at_block_height, }, @@ -116,12 +85,9 @@ impl RegistryImpl { .context("Failed to list registry contract")?; if let QueryResponseKind::CallResult(call_result) = response.kind { - let list_registry_response: AccountOrAllIndexers = - serde_json::from_slice(&call_result.result)?; + let all_indexers: AllIndexers = serde_json::from_slice(&call_result.result)?; - if let AccountOrAllIndexers::All(all_indexers) = list_registry_response { - return Ok(self.enrich_indexer_registry(all_indexers)); - } + return Ok(self.enrich_indexer_registry(all_indexers)); } anyhow::bail!("Invalid registry response")