diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index fd98e03cd..815c750f5 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -4,7 +4,7 @@ use tokio::task::JoinHandle; use crate::indexer_config::IndexerConfig; use crate::rules::types::ChainId; -use registry_types::MatchingRule; +use registry_types::Rule; /// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but /// we need this configurable for testing purposes. @@ -113,6 +113,15 @@ impl BlockStream { } } +#[tracing::instrument( + skip_all, + fields( + account_id = indexer.account_id.as_str(), + function_name = indexer.function_name, + start_block_height = start_block_height, + redis_stream = redis_stream + ) +)] pub(crate) async fn start_block_stream( start_block_height: near_indexer_primitives::types::BlockHeight, indexer: &IndexerConfig, @@ -123,26 +132,59 @@ pub(crate) async fn start_block_stream( lake_prefetch_size: usize, redis_stream: String, ) -> anyhow::Result<()> { - tracing::info!( - account_id = indexer.account_id.as_str(), - function_name = indexer.function_name, + tracing::info!("Starting block stream",); + + let last_indexed_delta_lake_block = process_delta_lake_blocks( start_block_height, - "Starting block stream", + delta_lake_client, + redis_client.clone(), + indexer, + redis_stream.clone(), + ) + .await?; + + let last_indexed_near_lake_block = process_near_lake_blocks( + last_indexed_delta_lake_block, + lake_s3_config, + lake_prefetch_size, + redis_client, + indexer, + redis_stream, + chain_id, + ) + .await?; + + tracing::debug!( + last_indexed_block = last_indexed_near_lake_block, + "Stopped block stream", ); + Ok(()) +} + +async fn process_delta_lake_blocks( + start_block_height: near_indexer_primitives::types::BlockHeight, + delta_lake_client: std::sync::Arc, + redis_client: std::sync::Arc, + indexer: &IndexerConfig, + redis_stream: String, +) -> anyhow::Result { let latest_block_metadata = delta_lake_client.get_latest_block_metadata().await?; - let last_indexed_block = latest_block_metadata + let last_indexed_block_from_metadata = latest_block_metadata .last_indexed_block - .parse::()?; + .parse::() + .context("Failed to parse Delta Lake metadata")?; - let blocks_from_index = match &indexer.indexer_rule.matching_rule { - MatchingRule::ActionAny { + if start_block_height >= last_indexed_block_from_metadata { + return Ok(start_block_height); + } + + let blocks_from_index = match &indexer.rule { + Rule::ActionAny { affected_account_id, .. } => { tracing::debug!( - account_id = indexer.account_id.as_str(), - function_name = indexer.function_name, "Fetching block heights starting from {} from delta lake", start_block_height, ); @@ -151,19 +193,17 @@ pub(crate) async fn start_block_stream( .list_matching_block_heights(start_block_height, affected_account_id) .await } - MatchingRule::ActionFunctionCall { .. } => { + Rule::ActionFunctionCall { .. } => { tracing::error!("ActionFunctionCall matching rule not yet supported for delta lake processing, function: {:?} {:?}", indexer.account_id, indexer.function_name); Ok(vec![]) } - MatchingRule::Event { .. } => { + Rule::Event { .. } => { tracing::error!("Event matching rule not yet supported for delta lake processing, function {:?} {:?}", indexer.account_id, indexer.function_name); Ok(vec![]) } }?; tracing::debug!( - account_id = indexer.account_id.as_str(), - function_name = indexer.function_name, "Flushing {} block heights from index files to Redis Stream", blocks_from_index.len(), ); @@ -183,30 +223,40 @@ pub(crate) async fn start_block_stream( .context("Failed to set last_published_block")?; } - let mut last_indexed_block = + let last_indexed_block = blocks_from_index .last() - .map_or(last_indexed_block, |&last_block_in_index| { + .map_or(last_indexed_block_from_metadata, |&last_block_in_index| { // Check for the case where index files are written right after we fetch the last_indexed_block metadata - std::cmp::max(last_block_in_index, last_indexed_block) + std::cmp::max(last_block_in_index, last_indexed_block_from_metadata) }); - tracing::debug!( - account_id = indexer.account_id.as_str(), - function_name = indexer.function_name, - "Starting near-lake-framework from {last_indexed_block} for indexer", - ); + Ok(last_indexed_block) +} + +async fn process_near_lake_blocks( + start_block_height: near_indexer_primitives::types::BlockHeight, + lake_s3_config: aws_sdk_s3::Config, + lake_prefetch_size: usize, + redis_client: std::sync::Arc, + indexer: &IndexerConfig, + redis_stream: String, + chain_id: &ChainId, +) -> anyhow::Result { + tracing::debug!(start_block_height, "Starting near-lake-framework",); let lake_config = match &chain_id { ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(), ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(), } .s3_config(lake_s3_config) - .start_block_height(last_indexed_block) + .start_block_height(start_block_height) .blocks_preload_pool_size(lake_prefetch_size) .build() .context("Failed to build lake config")?; + let mut last_indexed_block = start_block_height; + let (sender, mut stream) = near_lake_framework::streamer(lake_config); while let Some(streamer_message) = stream.recv().await { @@ -222,7 +272,7 @@ pub(crate) async fn start_block_stream( .context("Failed to set last_published_block")?; let matches = crate::rules::reduce_indexer_rule_matches( - &indexer.indexer_rule, + &indexer.rule, &streamer_message, chain_id.clone(), ); @@ -240,14 +290,7 @@ pub(crate) async fn start_block_stream( drop(sender); - tracing::debug!( - account_id = indexer.account_id.as_str(), - function_name = indexer.function_name, - "Stopped block stream at {}", - last_indexed_block, - ); - - Ok(()) + Ok(last_indexed_block) } #[cfg(test)] @@ -294,14 +337,9 @@ mod tests { ) .unwrap(), function_name: "test".to_string(), - indexer_rule: registry_types::IndexerRule { - indexer_rule_kind: registry_types::IndexerRuleKind::Action, - matching_rule: registry_types::MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: registry_types::Status::Success, - }, - name: None, - id: None, + rule: registry_types::Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: registry_types::Status::Success, }, }; diff --git a/block-streamer/src/delta_lake_client.rs b/block-streamer/src/delta_lake_client.rs index ae02d6c20..ed0a0d542 100644 --- a/block-streamer/src/delta_lake_client.rs +++ b/block-streamer/src/delta_lake_client.rs @@ -246,6 +246,7 @@ impl DeltaLakeClientImpl { } }) .flat_map(|index_file| index_file.heights) + .filter(|block_height| *block_height >= start_block_height) .collect(); let pattern_has_multiple_contracts = contract_pattern.chars().any(|c| c == ',' || c == '*'); @@ -260,7 +261,6 @@ impl DeltaLakeClientImpl { contract_pattern, ); - // TODO Remove all block heights after start_block_height Ok(block_heights) } } @@ -637,10 +637,42 @@ mod tests { assert_eq!( block_heights, - vec![45894617, 45894627, 45894628, 45894712, 45898413, 45898423, 45898424] + vec![45894628, 45894712, 45898413, 45898423, 45898424] ) } + #[tokio::test] + async fn filters_heights_less_than_start_block() { + let mut mock_s3_client = crate::s3_client::S3Client::default(); + + mock_s3_client + .expect_get_text_file() + .with( + predicate::eq("near-lake-data-mainnet"), + predicate::eq("000045898423/block.json"), + ) + .returning(|_bucket, _prefix| Ok(generate_block_with_timestamp("2021-05-26"))); + mock_s3_client.expect_list_all_objects().returning(|_, _| { + Ok(vec![ + "silver/accounts/action_receipt_actions/metadata/near/keypom/2023-10-31.json" + .to_string(), + ]) + }); + mock_s3_client + .expect_get_text_file() + .with(predicate::eq(DELTA_LAKE_BUCKET.to_string()), predicate::eq("silver/accounts/action_receipt_actions/metadata/near/keypom/2023-10-31.json".to_string())) + .returning(|_bucket, _prefix| Ok("{\"heights\":[45898424,45898423,45898413,45894712],\"actions\":[{\"action_kind\":\"ADD_KEY\",\"block_heights\":[104616819]}]}".to_string())); + + let delta_lake_client = DeltaLakeClientImpl::new(mock_s3_client); + + let block_heights = delta_lake_client + .list_matching_block_heights(45898423, "keypom.near, hackathon.agency.near") + .await + .unwrap(); + + assert_eq!(block_heights, vec![45898423, 45898424]) + } + #[tokio::test] async fn gets_the_date_of_the_closest_block() { let mut mock_s3_client = crate::s3_client::S3Client::default(); diff --git a/block-streamer/src/indexer_config.rs b/block-streamer/src/indexer_config.rs index aebd7bb2c..f56eb5c54 100644 --- a/block-streamer/src/indexer_config.rs +++ b/block-streamer/src/indexer_config.rs @@ -2,17 +2,13 @@ use near_lake_framework::near_indexer_primitives::types::AccountId; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; -use registry_types::IndexerRule; +use registry_types::Rule; #[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] pub struct IndexerConfig { pub account_id: AccountId, pub function_name: String, - // pub code: String, - // pub start_block_height: Option, - // pub schema: Option, - // pub provisioned: bool, - pub indexer_rule: IndexerRule, + pub rule: Rule, } impl IndexerConfig { diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index dfaee7d78..aee9fe667 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -1,3 +1,5 @@ +#![cfg_attr(test, allow(dead_code))] + use std::fmt::Debug; use redis::{aio::ConnectionManager, RedisError, ToRedisArgs}; diff --git a/block-streamer/src/rules/matcher.rs b/block-streamer/src/rules/matcher.rs index 8c1684caa..17fdc420b 100644 --- a/block-streamer/src/rules/matcher.rs +++ b/block-streamer/src/rules/matcher.rs @@ -2,20 +2,20 @@ use near_lake_framework::near_indexer_primitives::{ views::{ActionView, ExecutionStatusView, ReceiptEnumView}, IndexerExecutionOutcomeWithReceipt, }; -use registry_types::{MatchingRule, Status}; +use registry_types::{Rule, Status}; use crate::rules::types::Event; pub fn matches( - matching_rule: &MatchingRule, + indexer_rule: &Rule, receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt, ) -> bool { - match matching_rule { - MatchingRule::ActionAny { + match indexer_rule { + Rule::ActionAny { affected_account_id, status, } => match_action_any(affected_account_id, status, receipt_execution_outcome), - MatchingRule::ActionFunctionCall { + Rule::ActionFunctionCall { affected_account_id, status, function, @@ -25,7 +25,7 @@ pub fn matches( function, receipt_execution_outcome, ), - MatchingRule::Event { + Rule::Event { contract_account_id, event, standard, diff --git a/block-streamer/src/rules/mod.rs b/block-streamer/src/rules/mod.rs index 5c5058514..68146ba23 100644 --- a/block-streamer/src/rules/mod.rs +++ b/block-streamer/src/rules/mod.rs @@ -3,19 +3,17 @@ pub mod outcomes_reducer; pub mod types; use near_lake_framework::near_indexer_primitives::StreamerMessage; -use registry_types::{IndexerRule, MatchingRule}; +use registry_types::Rule; use types::{ChainId, IndexerRuleMatch}; pub fn reduce_indexer_rule_matches( - indexer_rule: &IndexerRule, + indexer_rule: &Rule, streamer_message: &StreamerMessage, chain_id: ChainId, ) -> Vec { - match &indexer_rule.matching_rule { - MatchingRule::ActionAny { .. } - | MatchingRule::ActionFunctionCall { .. } - | MatchingRule::Event { .. } => { + match &indexer_rule { + Rule::ActionAny { .. } | Rule::ActionFunctionCall { .. } | Rule::Event { .. } => { outcomes_reducer::reduce_indexer_rule_matches_from_outcomes( indexer_rule, streamer_message, diff --git a/block-streamer/src/rules/outcomes_reducer.rs b/block-streamer/src/rules/outcomes_reducer.rs index 934eda37c..54b357c72 100644 --- a/block-streamer/src/rules/outcomes_reducer.rs +++ b/block-streamer/src/rules/outcomes_reducer.rs @@ -1,13 +1,13 @@ use crate::rules::matcher; use crate::rules::types::Event; use crate::rules::types::{ChainId, IndexerRuleMatch, IndexerRuleMatchPayload}; -use crate::rules::{IndexerRule, MatchingRule}; +use crate::rules::Rule; use near_lake_framework::near_indexer_primitives::{ IndexerExecutionOutcomeWithReceipt, StreamerMessage, }; pub fn reduce_indexer_rule_matches_from_outcomes( - indexer_rule: &IndexerRule, + indexer_rule: &Rule, streamer_message: &StreamerMessage, chain_id: ChainId, ) -> Vec { @@ -20,7 +20,7 @@ pub fn reduce_indexer_rule_matches_from_outcomes( .iter() // future: when extracting Actions, Events, etc this will be a filter operation .find(|receipt_execution_outcome| { - matcher::matches(&indexer_rule.matching_rule, receipt_execution_outcome) + matcher::matches(indexer_rule, receipt_execution_outcome) }) }) .map(|receipt_execution_outcome| { @@ -36,7 +36,7 @@ pub fn reduce_indexer_rule_matches_from_outcomes( } fn build_indexer_rule_match( - indexer_rule: &IndexerRule, + indexer_rule: &Rule, receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt, block_header_hash: String, block_height: u64, @@ -44,8 +44,6 @@ fn build_indexer_rule_match( ) -> IndexerRuleMatch { IndexerRuleMatch { chain_id, - indexer_rule_id: indexer_rule.id, - indexer_rule_name: indexer_rule.name.clone(), payload: build_indexer_rule_match_payload( indexer_rule, receipt_execution_outcome, @@ -56,7 +54,7 @@ fn build_indexer_rule_match( } fn build_indexer_rule_match_payload( - indexer_rule: &IndexerRule, + indexer_rule: &Rule, receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt, block_header_hash: String, ) -> IndexerRuleMatchPayload { @@ -64,15 +62,15 @@ fn build_indexer_rule_match_payload( // specified in the indexer function config. let transaction_hash = None; - match &indexer_rule.matching_rule { - MatchingRule::ActionAny { .. } | MatchingRule::ActionFunctionCall { .. } => { + match &indexer_rule { + Rule::ActionAny { .. } | Rule::ActionFunctionCall { .. } => { IndexerRuleMatchPayload::Actions { block_hash: block_header_hash, receipt_id: receipt_execution_outcome.receipt.receipt_id.to_string(), transaction_hash, } } - MatchingRule::Event { + Rule::Event { event, standard, version, @@ -115,21 +113,16 @@ fn build_indexer_rule_match_payload( #[cfg(test)] mod tests { - use registry_types::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; + use registry_types::{Rule, Status}; use crate::rules::outcomes_reducer::reduce_indexer_rule_matches_from_outcomes; use crate::rules::types::{ChainId, IndexerRuleMatch}; #[tokio::test] async fn match_wildcard_no_match() { - let wildcard_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "*.nearcrow.near".to_string(), - status: Status::Success, - }, - id: None, - name: None, + let wildcard_rule = Rule::ActionAny { + affected_account_id: "*.nearcrow.near".to_string(), + status: Status::Success, }; let streamer_message = crate::test_utils::get_streamer_message(93085141); @@ -144,14 +137,9 @@ mod tests { #[tokio::test] async fn match_wildcard_contract_subaccount_name() { - let wildcard_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "*.nearcrowd.near".to_string(), - status: Status::Success, - }, - id: None, - name: None, + let wildcard_rule = Rule::ActionAny { + affected_account_id: "*.nearcrowd.near".to_string(), + status: Status::Success, }; let streamer_message = crate::test_utils::get_streamer_message(93085141); @@ -166,14 +154,9 @@ mod tests { #[tokio::test] async fn match_wildcard_mid_contract_name() { - let wildcard_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "*crowd.near".to_string(), - status: Status::Success, - }, - id: None, - name: None, + let wildcard_rule = Rule::ActionAny { + affected_account_id: "*crowd.near".to_string(), + status: Status::Success, }; let streamer_message = crate::test_utils::get_streamer_message(93085141); @@ -185,14 +168,9 @@ mod tests { assert_eq!(result.len(), 1); // see Extraction note in previous test - let wildcard_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "app.nea*owd.near".to_string(), - status: Status::Success, - }, - id: None, - name: None, + let wildcard_rule = Rule::ActionAny { + affected_account_id: "app.nea*owd.near".to_string(), + status: Status::Success, }; let result: Vec = reduce_indexer_rule_matches_from_outcomes( @@ -206,14 +184,9 @@ mod tests { #[tokio::test] async fn match_csv_account() { - let wildcard_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "notintheblockaccount.near, app.nearcrowd.near".to_string(), - status: Status::Success, - }, - id: None, - name: None, + let wildcard_rule = Rule::ActionAny { + affected_account_id: "notintheblockaccount.near, app.nearcrowd.near".to_string(), + status: Status::Success, }; let streamer_message = crate::test_utils::get_streamer_message(93085141); @@ -228,14 +201,9 @@ mod tests { #[tokio::test] async fn match_csv_wildcard_account() { - let wildcard_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "notintheblockaccount.near, *.nearcrowd.near".to_string(), - status: Status::Success, - }, - id: None, - name: None, + let wildcard_rule = Rule::ActionAny { + affected_account_id: "notintheblockaccount.near, *.nearcrowd.near".to_string(), + status: Status::Success, }; let streamer_message = crate::test_utils::get_streamer_message(93085141); diff --git a/block-streamer/src/rules/types.rs b/block-streamer/src/rules/types.rs index b61eff208..d3139b28e 100644 --- a/block-streamer/src/rules/types.rs +++ b/block-streamer/src/rules/types.rs @@ -14,8 +14,6 @@ pub type BlockHashString = String; )] pub struct IndexerRuleMatch { pub chain_id: ChainId, - pub indexer_rule_id: Option, - pub indexer_rule_name: Option, pub payload: IndexerRuleMatchPayload, pub block_height: u64, } diff --git a/block-streamer/src/s3_client.rs b/block-streamer/src/s3_client.rs index 6fd3d909a..412fd4ee0 100644 --- a/block-streamer/src/s3_client.rs +++ b/block-streamer/src/s3_client.rs @@ -1,3 +1,5 @@ +#![cfg_attr(test, allow(dead_code))] + const MAX_S3_LIST_REQUESTS: usize = 1000; #[cfg(test)] diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 829ec4876..cb8e7beac 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -6,7 +6,6 @@ use tonic::{Request, Response, Status}; use crate::indexer_config::IndexerConfig; use crate::rules::types::ChainId; -use registry_types::{IndexerRule, IndexerRuleKind, MatchingRule}; use crate::block_stream; use crate::server::blockstreamer; @@ -69,26 +68,21 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic .rule .ok_or(Status::invalid_argument("Rule must be provided"))?; - let matching_rule = match rule { - start_stream_request::Rule::ActionAnyRule(action_any) => MatchingRule::ActionAny { - affected_account_id: action_any.affected_account_id, - status: Self::match_status(action_any.status)?, - }, + let rule = match rule { + start_stream_request::Rule::ActionAnyRule(action_any) => { + registry_types::Rule::ActionAny { + affected_account_id: action_any.affected_account_id, + status: Self::match_status(action_any.status)?, + } + } start_stream_request::Rule::ActionFunctionCallRule(action_function_call) => { - MatchingRule::ActionFunctionCall { + registry_types::Rule::ActionFunctionCall { affected_account_id: action_function_call.affected_account_id, status: Self::match_status(action_function_call.status)?, function: action_function_call.function_name, } } }; - let filter_rule = IndexerRule { - // TODO: Remove kind as it is unused - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule, - id: None, - name: None, - }; let account_id = near_indexer_primitives::types::AccountId::try_from(request.account_id) .map_err(|err| { @@ -99,8 +93,8 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic })?; let indexer_config = IndexerConfig { account_id, + rule, function_name: request.function_name, - indexer_rule: filter_rule, }; let lock = self.get_block_streams_lock()?; diff --git a/coordinator/src/block_streams_handler.rs b/coordinator/src/block_streams/handler.rs similarity index 81% rename from coordinator/src/block_streams_handler.rs rename to coordinator/src/block_streams/handler.rs index 241afa569..f17a974ce 100644 --- a/coordinator/src/block_streams_handler.rs +++ b/coordinator/src/block_streams/handler.rs @@ -1,14 +1,17 @@ #![cfg_attr(test, allow(dead_code))] +pub use block_streamer::StreamInfo; + use anyhow::Context; use block_streamer::block_streamer_client::BlockStreamerClient; use block_streamer::{ start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, ListStreamsRequest, - StartStreamRequest, Status, StopStreamRequest, StreamInfo, + StartStreamRequest, Status, StopStreamRequest, }; use tonic::transport::channel::Channel; use tonic::Request; +use crate::indexer_config::IndexerConfig; use crate::utils::exponential_retry; #[cfg(not(test))] @@ -80,21 +83,17 @@ impl BlockStreamsHandlerImpl { pub async fn start( &self, start_block_height: u64, - account_id: String, - function_name: String, - version: u64, - redis_stream: String, - rule: registry_types::MatchingRule, + indexer_config: &IndexerConfig, ) -> anyhow::Result<()> { - let rule = match &rule { - registry_types::MatchingRule::ActionAny { + let rule = match &indexer_config.rule { + registry_types::Rule::ActionAny { affected_account_id, status, } => Rule::ActionAnyRule(ActionAnyRule { affected_account_id: affected_account_id.to_owned(), status: Self::match_status(status), }), - registry_types::MatchingRule::ActionFunctionCall { + registry_types::Rule::ActionFunctionCall { affected_account_id, status, function, @@ -114,10 +113,10 @@ impl BlockStreamsHandlerImpl { let request = StartStreamRequest { start_block_height, - version, - redis_stream, - account_id: account_id.clone(), - function_name: function_name.clone(), + version: indexer_config.get_registry_version(), + redis_stream: indexer_config.get_redis_stream_key(), + account_id: indexer_config.account_id.to_string(), + function_name: indexer_config.function_name.clone(), rule: Some(rule), }; @@ -128,16 +127,16 @@ impl BlockStreamsHandlerImpl { .await .map_err(|error| { tracing::error!( - account_id, - function_name, + account_id = indexer_config.account_id.as_str(), + function_name = indexer_config.function_name, "Failed to start stream\n{error:?}" ); }); tracing::debug!( - account_id, - function_name, - version, + account_id = indexer_config.account_id.as_str(), + function_name = indexer_config.function_name, + version = indexer_config.get_registry_version(), "Start stream response: {:#?}", response ); diff --git a/coordinator/src/block_streams/mod.rs b/coordinator/src/block_streams/mod.rs new file mode 100644 index 000000000..cd8b6fd96 --- /dev/null +++ b/coordinator/src/block_streams/mod.rs @@ -0,0 +1,5 @@ +mod handler; +mod synchronise; + +pub use handler::BlockStreamsHandler; +pub use synchronise::synchronise_block_streams; diff --git a/coordinator/src/block_streams/synchronise.rs b/coordinator/src/block_streams/synchronise.rs new file mode 100644 index 000000000..ed42ec1a3 --- /dev/null +++ b/coordinator/src/block_streams/synchronise.rs @@ -0,0 +1,677 @@ +use std::cmp::Ordering; + +use registry_types::StartBlock; + +use crate::indexer_config::IndexerConfig; +use crate::migration::MIGRATED_STREAM_VERSION; +use crate::redis::RedisClient; +use crate::registry::IndexerRegistry; + +use super::handler::{BlockStreamsHandler, StreamInfo}; + +pub async fn synchronise_block_streams( + indexer_registry: &IndexerRegistry, + redis_client: &RedisClient, + block_streams_handler: &BlockStreamsHandler, +) -> anyhow::Result<()> { + let mut active_block_streams = block_streams_handler.list().await?; + + for (account_id, indexers) in indexer_registry.iter() { + for (function_name, indexer_config) in indexers.iter() { + let active_block_stream = active_block_streams + .iter() + .position(|stream| { + stream.account_id == account_id.to_string() + && &stream.function_name == function_name + }) + .map(|index| active_block_streams.swap_remove(index)); + + let _ = synchronise_block_stream( + active_block_stream, + indexer_config, + redis_client, + block_streams_handler, + ) + .await + .map_err(|err| { + tracing::error!( + account_id = account_id.as_str(), + function_name, + version = indexer_config.get_registry_version(), + "failed to sync block stream: {err:?}" + ) + }); + } + } + + for unregistered_block_stream in active_block_streams { + tracing::info!( + account_id = unregistered_block_stream.account_id.as_str(), + function_name = unregistered_block_stream.function_name, + version = unregistered_block_stream.version, + "Stopping unregistered block stream" + ); + + block_streams_handler + .stop(unregistered_block_stream.stream_id) + .await?; + } + + Ok(()) +} + +#[tracing::instrument( + skip_all, + fields( + account_id = %indexer_config.account_id, + function_name = indexer_config.function_name, + version = indexer_config.get_registry_version() + ) +)] +async fn synchronise_block_stream( + active_block_stream: Option, + indexer_config: &IndexerConfig, + redis_client: &RedisClient, + block_streams_handler: &BlockStreamsHandler, +) -> anyhow::Result<()> { + if let Some(active_block_stream) = active_block_stream { + if active_block_stream.version == indexer_config.get_registry_version() { + return Ok(()); + } + + tracing::info!( + previous_version = active_block_stream.version, + "Stopping outdated block stream" + ); + + block_streams_handler + .stop(active_block_stream.stream_id) + .await?; + } + + let stream_status = get_stream_status(indexer_config, redis_client).await?; + + clear_block_stream_if_needed(&stream_status, indexer_config, redis_client).await?; + + let start_block_height = + determine_start_block_height(&stream_status, indexer_config, redis_client).await?; + + block_streams_handler + .start(start_block_height, indexer_config) + .await?; + + redis_client.set_stream_version(indexer_config).await?; + + Ok(()) +} + +#[derive(Debug)] +enum StreamStatus { + /// Stream has just been migrated to V2 + Migrated, + /// Stream version is synchronized with the registry + Synced, + /// Stream version does not match registry + Outdated, + /// No stream version, therefore new + New, +} + +async fn get_stream_status( + indexer_config: &IndexerConfig, + redis_client: &RedisClient, +) -> anyhow::Result { + let stream_version = redis_client.get_stream_version(indexer_config).await?; + + if stream_version.is_none() { + return Ok(StreamStatus::New); + } + + let stream_version = stream_version.unwrap(); + + if stream_version == MIGRATED_STREAM_VERSION { + return Ok(StreamStatus::Migrated); + } + + match indexer_config.get_registry_version().cmp(&stream_version) { + Ordering::Equal => Ok(StreamStatus::Synced), + Ordering::Greater => Ok(StreamStatus::Outdated), + Ordering::Less => { + tracing::warn!("Found stream with version greater than registry, treating as outdated"); + + Ok(StreamStatus::Outdated) + } + } +} + +async fn clear_block_stream_if_needed( + stream_status: &StreamStatus, + indexer_config: &IndexerConfig, + redis_client: &RedisClient, +) -> anyhow::Result<()> { + if matches!( + stream_status, + StreamStatus::Migrated | StreamStatus::Synced | StreamStatus::New + ) || indexer_config.start_block == StartBlock::Continue + { + return Ok(()); + } + + tracing::info!("Clearing redis stream"); + + redis_client.clear_block_stream(indexer_config).await +} + +async fn determine_start_block_height( + stream_status: &StreamStatus, + indexer_config: &IndexerConfig, + redis_client: &RedisClient, +) -> anyhow::Result { + if matches!(stream_status, StreamStatus::Migrated | StreamStatus::Synced) { + tracing::info!("Resuming block stream"); + + return get_continuation_block_height(indexer_config, redis_client).await; + } + + tracing::info!(start_block = ?indexer_config.start_block, "Stating new block stream"); + + match indexer_config.start_block { + StartBlock::Latest => Ok(indexer_config.get_registry_version()), + StartBlock::Height(height) => Ok(height), + StartBlock::Continue => get_continuation_block_height(indexer_config, redis_client).await, + } +} + +async fn get_continuation_block_height( + indexer_config: &IndexerConfig, + redis_client: &RedisClient, +) -> anyhow::Result { + redis_client + .get_last_published_block(indexer_config) + .await? + .map(|height| height + 1) + .ok_or(anyhow::anyhow!("Indexer has no `last_published_block`")) +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::HashMap; + + use mockall::predicate; + use registry_types::{Rule, Status}; + + #[tokio::test] + async fn resumes_stream_with_matching_redis_version() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(200), + start_block: StartBlock::Height(100), + }; + + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(200))) + .once(); + redis_client + .expect_get_last_published_block() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(500))) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + redis_client.expect_clear_block_stream().never(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler + .expect_start() + .with(predicate::eq(501), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn starts_stream_with_latest() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(200), + start_block: StartBlock::Latest, + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(1))) + .once(); + redis_client + .expect_clear_block_stream() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler.expect_stop().never(); + block_stream_handler + .expect_start() + .with(predicate::eq(200), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn starts_stream_with_height() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(200), + start_block: StartBlock::Height(100), + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(1))) + .once(); + redis_client + .expect_clear_block_stream() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler.expect_stop().never(); + block_stream_handler + .expect_start() + .with(predicate::eq(100), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn starts_stream_with_continue() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(200), + start_block: StartBlock::Continue, + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(1))) + .once(); + redis_client + .expect_get_last_published_block() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(100))) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler.expect_stop().never(); + block_stream_handler + .expect_start() + .with(predicate::eq(101), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn stops_stream_not_in_registry() { + let indexer_registry = HashMap::from([]); + + let redis_client = RedisClient::default(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| { + Ok(vec![block_streamer::StreamInfo { + stream_id: "stream_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 1, + }]) + }); + block_stream_handler + .expect_stop() + .with(predicate::eq("stream_id".to_string())) + .returning(|_| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn ignores_stream_with_matching_registry_version() { + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([( + "test".to_string(), + IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 101, + updated_at_block_height: None, + start_block: StartBlock::Latest, + }, + )]), + )]); + + let redis_client = RedisClient::default(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| { + Ok(vec![block_streamer::StreamInfo { + stream_id: "stream_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 101, + }]) + }); + block_stream_handler.expect_stop().never(); + block_stream_handler.expect_start().never(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn restarts_streams_when_registry_version_differs() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 101, + updated_at_block_height: Some(199), + start_block: StartBlock::Height(1000), + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(101))) + .once(); + redis_client + .expect_clear_block_stream() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| { + Ok(vec![block_streamer::StreamInfo { + stream_id: "stream_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 101, + }]) + }); + block_stream_handler + .expect_stop() + .with(predicate::eq("stream_id".to_string())) + .returning(|_| Ok(())) + .once(); + block_stream_handler + .expect_start() + .with(predicate::eq(1000), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn resumes_stream_post_migration() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 101, + updated_at_block_height: Some(200), + start_block: StartBlock::Height(1000), + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(MIGRATED_STREAM_VERSION))) + .once(); + redis_client + .expect_get_last_published_block() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(100))) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler.expect_stop().never(); + block_stream_handler + .expect_start() + .with(predicate::eq(101), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn does_not_start_stream_without_last_published_block() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 101, + updated_at_block_height: Some(200), + start_block: StartBlock::Continue, + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(Some(101))) + .once(); + redis_client + .expect_get_last_published_block() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| anyhow::bail!("no last_published_block")) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler.expect_stop().never(); + block_stream_handler.expect_start().never(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn starts_block_stream_for_first_time() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 101, + updated_at_block_height: None, + start_block: StartBlock::Height(50), + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(None)) + .once(); + redis_client + .expect_set_stream_version() + .with(predicate::eq(indexer_config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| Ok(vec![])); + block_stream_handler.expect_stop().never(); + block_stream_handler + .expect_start() + .with(predicate::eq(50), predicate::eq(indexer_config)) + .returning(|_, _| Ok(())) + .once(); + + synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) + .await + .unwrap(); + } +} diff --git a/coordinator/src/executors_handler.rs b/coordinator/src/executors/handler.rs similarity index 71% rename from coordinator/src/executors_handler.rs rename to coordinator/src/executors/handler.rs index 496ce7a86..68be45f29 100644 --- a/coordinator/src/executors_handler.rs +++ b/coordinator/src/executors/handler.rs @@ -1,11 +1,14 @@ #![cfg_attr(test, allow(dead_code))] +pub use runner::ExecutorInfo; + use anyhow::Context; use runner::runner_client::RunnerClient; -use runner::{ExecutorInfo, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; +use runner::{ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; use tonic::transport::channel::Channel; use tonic::Request; +use crate::indexer_config::IndexerConfig; use crate::utils::exponential_retry; #[cfg(not(test))] @@ -46,22 +49,14 @@ impl ExecutorsHandlerImpl { .await } - pub async fn start( - &self, - account_id: String, - function_name: String, - code: String, - schema: String, - redis_stream: String, - version: u64, - ) -> anyhow::Result<()> { + pub async fn start(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { let request = StartExecutorRequest { - code, - schema, - redis_stream, - version, - account_id: account_id.clone(), - function_name: function_name.clone(), + code: indexer_config.code.clone(), + schema: indexer_config.schema.clone(), + redis_stream: indexer_config.get_redis_stream_key(), + version: indexer_config.get_registry_version(), + account_id: indexer_config.account_id.to_string(), + function_name: indexer_config.function_name.clone(), }; let response = self @@ -71,16 +66,16 @@ impl ExecutorsHandlerImpl { .await .map_err(|error| { tracing::error!( - account_id, - function_name, + account_id = indexer_config.account_id.as_str(), + function_name = indexer_config.function_name, "Failed to start executor\n{error:?}" ); }); tracing::debug!( - account_id, - function_name, - version, + account_id = indexer_config.account_id.as_str(), + function_name = indexer_config.function_name, + version = indexer_config.get_registry_version(), "Start executors response: {:#?}", response ); diff --git a/coordinator/src/executors/mod.rs b/coordinator/src/executors/mod.rs new file mode 100644 index 000000000..1b68609c6 --- /dev/null +++ b/coordinator/src/executors/mod.rs @@ -0,0 +1,5 @@ +mod handler; +mod synchronise; + +pub use handler::ExecutorsHandler; +pub use synchronise::synchronise_executors; diff --git a/coordinator/src/executors/synchronise.rs b/coordinator/src/executors/synchronise.rs new file mode 100644 index 000000000..ed8685a54 --- /dev/null +++ b/coordinator/src/executors/synchronise.rs @@ -0,0 +1,248 @@ +use crate::indexer_config::IndexerConfig; +use crate::registry::IndexerRegistry; + +use super::handler::{ExecutorInfo, ExecutorsHandler}; + +const V1_EXECUTOR_VERSION: u64 = 0; + +pub async fn synchronise_executors( + indexer_registry: &IndexerRegistry, + executors_handler: &ExecutorsHandler, +) -> anyhow::Result<()> { + let active_executors = executors_handler.list().await?; + + // Ignore V1 executors + let mut active_executors: Vec<_> = active_executors + .into_iter() + .filter(|executor| executor.version != V1_EXECUTOR_VERSION) + .collect(); + + for (account_id, indexers) in indexer_registry.iter() { + for (function_name, indexer_config) in indexers.iter() { + let active_executor = active_executors + .iter() + .position(|stream| { + stream.account_id == account_id.to_string() + && &stream.function_name == function_name + }) + .map(|index| active_executors.swap_remove(index)); + + let _ = synchronise_executor(active_executor, indexer_config, executors_handler) + .await + .map_err(|err| { + tracing::error!( + account_id = account_id.as_str(), + function_name, + version = indexer_config.get_registry_version(), + "failed to sync executor: {err:?}" + ) + }); + } + } + + for unregistered_executor in active_executors { + tracing::info!( + account_id = unregistered_executor.account_id.as_str(), + function_name = unregistered_executor.function_name, + registry_version = unregistered_executor.version, + "Stopping unregistered executor" + ); + + executors_handler + .stop(unregistered_executor.executor_id) + .await?; + } + + Ok(()) +} + +#[tracing::instrument( + skip_all, + fields( + account_id = %indexer_config.account_id, + function_name = indexer_config.function_name, + version = indexer_config.get_registry_version() + ) +)] +async fn synchronise_executor( + active_executor: Option, + indexer_config: &IndexerConfig, + executors_handler: &ExecutorsHandler, +) -> anyhow::Result<()> { + let registry_version = indexer_config.get_registry_version(); + + if let Some(active_executor) = active_executor { + if active_executor.version == registry_version { + return Ok(()); + } + + tracing::info!("Stopping outdated executor"); + + executors_handler.stop(active_executor.executor_id).await?; + } + + tracing::info!("Starting new executor"); + + executors_handler.start(indexer_config).await?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::HashMap; + + use mockall::predicate; + use registry_types::{Rule, StartBlock, Status}; + + use crate::indexer_config::IndexerConfig; + + #[tokio::test] + async fn starts_executor() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: "code".to_string(), + schema: "schema".to_string(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: None, + start_block: StartBlock::Height(100), + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler.expect_list().returning(|| Ok(vec![])); + executors_handler + .expect_start() + .with(predicate::eq(indexer_config)) + .returning(|_| Ok(())) + .once(); + + synchronise_executors(&indexer_registry, &executors_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn restarts_executor_when_registry_version_differs() { + let indexer_config = IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: "code".to_string(), + schema: "schema".to_string(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(2), + start_block: StartBlock::Height(100), + }; + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([("test".to_string(), indexer_config.clone())]), + )]); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler.expect_list().returning(|| { + Ok(vec![runner::ExecutorInfo { + executor_id: "executor_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + status: "running".to_string(), + version: 1, + }]) + }); + executors_handler + .expect_stop() + .with(predicate::eq("executor_id".to_string())) + .returning(|_| Ok(())) + .once(); + + executors_handler + .expect_start() + .with(predicate::eq(indexer_config)) + .returning(|_| Ok(())) + .once(); + + synchronise_executors(&indexer_registry, &executors_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn ignores_executor_with_matching_registry_version() { + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([( + "test".to_string(), + IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: "code".to_string(), + schema: "schema".to_string(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(2), + start_block: StartBlock::Height(100), + }, + )]), + )]); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler.expect_list().returning(|| { + Ok(vec![runner::ExecutorInfo { + executor_id: "executor_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + status: "running".to_string(), + version: 2, + }]) + }); + executors_handler.expect_stop().never(); + + executors_handler.expect_start().never(); + + synchronise_executors(&indexer_registry, &executors_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn stops_executor_not_in_registry() { + let indexer_registry = HashMap::from([]); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler.expect_list().returning(|| { + Ok(vec![runner::ExecutorInfo { + executor_id: "executor_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + status: "running".to_string(), + version: 2, + }]) + }); + + executors_handler + .expect_stop() + .with(predicate::eq("executor_id".to_string())) + .returning(|_| Ok(())) + .once(); + + synchronise_executors(&indexer_registry, &executors_handler) + .await + .unwrap(); + } +} diff --git a/coordinator/src/indexer_config.rs b/coordinator/src/indexer_config.rs new file mode 100644 index 000000000..e7b049281 --- /dev/null +++ b/coordinator/src/indexer_config.rs @@ -0,0 +1,45 @@ +use near_primitives::types::AccountId; +use registry_types::{Rule, StartBlock}; + +#[derive(Debug, Clone, PartialEq)] +pub struct IndexerConfig { + pub account_id: AccountId, + pub function_name: String, + pub code: String, + pub start_block: StartBlock, + pub schema: String, + pub rule: Rule, + pub updated_at_block_height: Option, + pub created_at_block_height: u64, +} + +impl IndexerConfig { + pub fn get_full_name(&self) -> String { + format!("{}/{}", self.account_id, self.function_name) + } + + pub fn get_redis_stream_key(&self) -> String { + format!("{}:block_stream", self.get_full_name()) + } + + pub fn get_historical_redis_stream_key(&self) -> String { + format!("{}:historical:stream", self.get_full_name()) + } + + pub fn get_real_time_redis_stream_key(&self) -> String { + format!("{}:real_time:stream", self.get_full_name()) + } + + pub fn get_last_published_block_key(&self) -> String { + format!("{}:last_published_block", self.get_full_name()) + } + + pub fn get_redis_stream_version_key(&self) -> String { + format!("{}:version", self.get_redis_stream_key()) + } + + pub fn get_registry_version(&self) -> u64 { + self.updated_at_block_height + .unwrap_or(self.created_at_block_height) + } +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 25dc9fc49..d87eeca45 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -4,20 +4,20 @@ use near_primitives::types::AccountId; use tokio::time::sleep; use tracing_subscriber::prelude::*; -use crate::block_streams_handler::BlockStreamsHandler; -use crate::executors_handler::ExecutorsHandler; +use crate::block_streams::{synchronise_block_streams, BlockStreamsHandler}; +use crate::executors::{synchronise_executors, ExecutorsHandler}; use crate::redis::RedisClient; -use crate::registry::{IndexerRegistry, Registry}; +use crate::registry::Registry; -mod block_streams_handler; -mod executors_handler; +mod block_streams; +mod executors; +mod indexer_config; mod migration; mod redis; mod registry; mod utils; const CONTROL_LOOP_THROTTLE_SECONDS: Duration = Duration::from_secs(1); -const V1_EXECUTOR_VERSION: u64 = 0; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -76,918 +76,3 @@ async fn main() -> anyhow::Result<()> { )?; } } - -async fn synchronise_executors( - indexer_registry: &IndexerRegistry, - executors_handler: &ExecutorsHandler, -) -> anyhow::Result<()> { - let active_executors = executors_handler.list().await?; - - // Ignore V1 executors - let mut active_executors: Vec<_> = active_executors - .into_iter() - .filter(|executor| executor.version != V1_EXECUTOR_VERSION) - .collect(); - - for (account_id, indexers) in indexer_registry.iter() { - for (function_name, indexer_config) in indexers.iter() { - let active_executor = active_executors - .iter() - .position(|stream| { - stream.account_id == account_id.to_string() - && &stream.function_name == function_name - }) - .map(|index| active_executors.swap_remove(index)); - - let registry_version = indexer_config - .updated_at_block_height - .unwrap_or(indexer_config.created_at_block_height); - - if let Some(active_executor) = active_executor { - if active_executor.version == registry_version { - continue; - } - - tracing::info!( - account_id = active_executor.account_id.as_str(), - function_name = active_executor.function_name, - registry_version = active_executor.version, - "Stopping executor" - ); - - executors_handler.stop(active_executor.executor_id).await?; - } - - tracing::info!( - account_id = account_id.as_str(), - function_name, - registry_version, - "Starting executor" - ); - - executors_handler - .start( - account_id.to_string(), - function_name.to_string(), - indexer_config.code.clone(), - indexer_config.schema.clone().unwrap_or_default(), - indexer_config.get_redis_stream(), - registry_version, - ) - .await?; - } - } - - for unregistered_executor in active_executors { - tracing::info!( - account_id = unregistered_executor.account_id.as_str(), - function_name = unregistered_executor.function_name, - registry_version = unregistered_executor.version, - "Stopping unregistered executor" - ); - - executors_handler - .stop(unregistered_executor.executor_id) - .await?; - } - - Ok(()) -} - -async fn synchronise_block_streams( - indexer_registry: &IndexerRegistry, - redis_client: &RedisClient, - block_streams_handler: &BlockStreamsHandler, -) -> anyhow::Result<()> { - let mut active_block_streams = block_streams_handler.list().await?; - - for (account_id, indexers) in indexer_registry.iter() { - for (function_name, indexer_config) in indexers.iter() { - let active_block_stream = active_block_streams - .iter() - .position(|stream| { - stream.account_id == account_id.to_string() - && &stream.function_name == function_name - }) - .map(|index| active_block_streams.swap_remove(index)); - - let registry_version = indexer_config - .updated_at_block_height - .unwrap_or(indexer_config.created_at_block_height); - - // TODO: Ensure start block height is only used to successfully start block stream ONCE - // TODO: Ensure last published blockheight is used on fresh restarts for existing indexers - if let Some(active_block_stream) = active_block_stream { - if active_block_stream.version == registry_version { - continue; - } - - tracing::info!( - account_id = active_block_stream.account_id.as_str(), - function_name = active_block_stream.function_name, - registry_version = active_block_stream.version, - "Stopping block stream" - ); - - block_streams_handler - .stop(active_block_stream.stream_id) - .await?; - } - - let start_block_height = if let Some(start_block_height) = - indexer_config.start_block_height - { - start_block_height - } else if let Ok(last_published_block) = redis_client - .get::(format!( - "{}:last_published_block", - indexer_config.get_full_name() - )) - .await - { - last_published_block - } else if let Some(updated_at_block_height) = indexer_config.updated_at_block_height { - updated_at_block_height - } else { - indexer_config.created_at_block_height - }; - - tracing::info!( - account_id = account_id.as_str(), - function_name, - registry_version, - "Starting block stream" - ); - - block_streams_handler - .start( - start_block_height, - indexer_config.account_id.to_string(), - indexer_config.function_name.clone(), - registry_version, - indexer_config.get_redis_stream(), - indexer_config.filter.matching_rule.clone(), - ) - .await?; - } - } - - for unregistered_block_stream in active_block_streams { - tracing::info!( - account_id = unregistered_block_stream.account_id.as_str(), - function_name = unregistered_block_stream.function_name, - registry_version = unregistered_block_stream.version, - "Stopping unregistered block stream" - ); - - block_streams_handler - .stop(unregistered_block_stream.stream_id) - .await?; - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - - use mockall::predicate; - use std::collections::HashMap; - - use registry_types::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; - - use crate::registry::IndexerConfig; - - mod executors { - use super::*; - - #[tokio::test] - async fn starts_executors() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: "code".to_string(), - schema: Some("schema".to_string()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: None, - start_block_height: Some(100), - }, - )]), - )]); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| Ok(vec![])); - executors_handler - .expect_start() - .with( - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq("code".to_string()), - predicate::eq("schema".to_string()), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(1), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn restarts_executors_with_mismatched_versions() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: "code".to_string(), - schema: Some("schema".to_string()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(2), - start_block_height: Some(100), - }, - )]), - )]); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| { - Ok(vec![runner::ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - status: "running".to_string(), - version: 1, - }]) - }); - executors_handler - .expect_stop() - .with(predicate::eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - executors_handler - .expect_start() - .with( - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq("code".to_string()), - predicate::eq("schema".to_string()), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(2), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_executors_with_matching_versions() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: "code".to_string(), - schema: Some("schema".to_string()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(2), - start_block_height: Some(100), - }, - )]), - )]); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| { - Ok(vec![runner::ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - status: "running".to_string(), - version: 2, - }]) - }); - executors_handler.expect_stop().never(); - - executors_handler.expect_start().never(); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn stops_executors_not_in_registry() { - let indexer_registry = HashMap::from([]); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| { - Ok(vec![runner::ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - status: "running".to_string(), - version: 2, - }]) - }); - - executors_handler - .expect_stop() - .with(predicate::eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - } - - mod block_stream { - use super::*; - - // TODO: Add Test for when indexer updated, block stream fails to start, and then restarted successfully - #[ignore] // TODO: Re-Enable when case is covered. - #[tokio::test] - async fn uses_last_published_block_height_when_restarting_existing_indexer_block_stream() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block_height: Some(100), - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| Ok(500)); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler - .expect_start() - .with( - predicate::eq(500), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(200), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn uses_last_published_block_height_when_updating_without_start_block_height() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block_height: None, - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| Ok(500)); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "morgs.near/test:block_stream".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 1, - }]) - }); - block_stream_handler - .expect_stop() - .with(predicate::eq("morgs.near/test:block_stream".to_string())) - .returning(|_| Ok(())) - .once(); - block_stream_handler - .expect_start() - .with( - predicate::eq(500), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(200), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn uses_start_block_height_for_brand_new_indexer() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: None, - start_block_height: Some(100), - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler - .expect_start() - .with( - predicate::eq(100), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(1), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn uses_start_block_height_when_updating_with_start_block_height() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block_height: Some(100), - }, - )]), - )]); - - let redis_client = RedisClient::default(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "morgs.near/test:block_stream".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 1, - }]) - }); - block_stream_handler - .expect_stop() - .with(predicate::eq("morgs.near/test:block_stream".to_string())) - .returning(|_| Ok(())) - .once(); - block_stream_handler - .expect_start() - .with( - predicate::eq(100), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(200), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn uses_start_block_height_when_no_last_published_block_and_no_block_stream() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block_height: Some(100), - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler - .expect_start() - .with( - predicate::eq(100), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(200), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn uses_updated_block_height_when_no_last_published_block_no_block_stream_no_start_block_height( - ) { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block_height: None, - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler - .expect_start() - .with( - predicate::eq(200), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(200), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn uses_created_block_height_for_brand_new_indexer_without_start() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 1, - updated_at_block_height: None, - start_block_height: None, - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler - .expect_start() - .with( - predicate::eq(1), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(1), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn stops_streams_not_in_registry() { - let indexer_registry = HashMap::from([]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "stream_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 1, - }]) - }); - block_stream_handler - .expect_stop() - .with(predicate::eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - - synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_streams_with_matching_versions() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 101, - updated_at_block_height: None, - start_block_height: None, - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "stream_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 101, - }]) - }); - block_stream_handler.expect_stop().never(); - block_stream_handler.expect_start().never(); - - synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn restarts_streams_with_mismatched_versions() { - let indexer_registry = HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - }, - created_at_block_height: 101, - updated_at_block_height: Some(200), - start_block_height: Some(1000), - }, - )]), - )]); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get::() - .returning(|_| anyhow::bail!("none")); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "stream_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 101, - }]) - }); - block_stream_handler - .expect_stop() - .with(predicate::eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - block_stream_handler - .expect_start() - .with( - predicate::eq(1000), - predicate::eq("morgs.near".to_string()), - predicate::eq("test".to_string()), - predicate::eq(200), - predicate::eq("morgs.near/test:block_stream".to_string()), - predicate::eq(MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }), - ) - .returning(|_, _, _, _, _, _| Ok(())); - - synchronise_block_streams(&indexer_registry, &redis_client, &block_stream_handler) - .await - .unwrap(); - } - } -} diff --git a/coordinator/src/migration.rs b/coordinator/src/migration.rs index d45763f02..f4faccbb1 100644 --- a/coordinator/src/migration.rs +++ b/coordinator/src/migration.rs @@ -4,9 +4,12 @@ use anyhow::Context; use near_primitives::types::AccountId; use redis::{ErrorKind, RedisError}; -use crate::executors_handler::ExecutorsHandler; +use crate::executors::ExecutorsHandler; +use crate::indexer_config::IndexerConfig; use crate::redis::RedisClient; -use crate::registry::{IndexerConfig, IndexerRegistry}; +use crate::registry::IndexerRegistry; + +pub const MIGRATED_STREAM_VERSION: u64 = 0; #[derive(serde::Deserialize, serde::Serialize, Debug)] pub struct AllowlistEntry { @@ -14,12 +17,17 @@ pub struct AllowlistEntry { v1_ack: bool, migrated: bool, failed: bool, + v2_control: bool, } pub type Allowlist = Vec; pub async fn fetch_allowlist(redis_client: &RedisClient) -> anyhow::Result { - let raw_allowlist: String = redis_client.get(RedisClient::ALLOWLIST).await?; + let raw_allowlist: String = redis_client + .get(RedisClient::ALLOWLIST) + .await? + .ok_or(anyhow::anyhow!("Allowlist doesn't exist"))?; + serde_json::from_str(&raw_allowlist).context("Failed to parse allowlist") } @@ -31,7 +39,11 @@ pub async fn filter_registry_by_allowlist( .into_iter() .filter(|(account_id, _)| { allowlist.iter().any(|entry| { - entry.account_id == *account_id && entry.v1_ack && entry.migrated && !entry.failed + entry.account_id == *account_id + && entry.v1_ack + && entry.migrated + && !entry.failed + && entry.v2_control }) }) .collect(); @@ -102,10 +114,12 @@ async fn migrate_account( merge_streams(redis_client, &existing_streams, indexer_config) .await .context("Failed to merge streams")?; + update_stream_version(redis_client, indexer_config) + .await + .context("Failed to set Redis Stream version")?; } - // TODO Uncomment when V2 correctly continues from V1 stop point - // set_migrated_flag(redis_client, account_id)?; + set_migrated_flag(redis_client, account_id)?; tracing::info!("Finished migrating {}", account_id); @@ -121,23 +135,29 @@ async fn remove_from_streams_set( if redis_client .srem( RedisClient::STREAMS_SET, - indexer_config.get_historical_redis_stream(), + indexer_config.get_historical_redis_stream_key(), ) .await? .is_some() + && redis_client + .exists(indexer_config.get_historical_redis_stream_key()) + .await? { - result.push(indexer_config.get_historical_redis_stream()); + result.push(indexer_config.get_historical_redis_stream_key()); } if redis_client .srem( RedisClient::STREAMS_SET, - indexer_config.get_real_time_redis_stream(), + indexer_config.get_real_time_redis_stream_key(), ) .await? .is_some() + && redis_client + .exists(indexer_config.get_real_time_redis_stream_key()) + .await? { - result.push(indexer_config.get_real_time_redis_stream()); + result.push(indexer_config.get_real_time_redis_stream_key()); }; Ok(result) @@ -165,7 +185,7 @@ async fn merge_streams( redis_client .rename( existing_streams[0].to_owned(), - indexer_config.get_redis_stream(), + indexer_config.get_redis_stream_key(), ) .await?; @@ -176,7 +196,7 @@ async fn merge_streams( let real_time_stream = existing_streams[1].to_owned(); redis_client - .rename(historical_stream, indexer_config.get_redis_stream()) + .rename(historical_stream, indexer_config.get_redis_stream_key()) .await?; loop { @@ -202,10 +222,13 @@ async fn merge_streams( .collect(); redis_client - .xadd(indexer_config.get_redis_stream(), &fields) + .xadd(indexer_config.get_redis_stream_key(), &fields) .await?; redis_client - .xdel(indexer_config.get_real_time_redis_stream(), stream_id.id) + .xdel( + indexer_config.get_real_time_redis_stream_key(), + stream_id.id, + ) .await? } } @@ -216,6 +239,20 @@ async fn merge_streams( } } +async fn update_stream_version( + redis_client: &RedisClient, + indexer_config: &IndexerConfig, +) -> anyhow::Result<()> { + redis_client + .set( + indexer_config.get_redis_stream_version_key(), + MIGRATED_STREAM_VERSION, + ) + .await?; + + Ok(()) +} + fn set_failed_flag(redis_client: &RedisClient, account_id: &AccountId) -> anyhow::Result<()> { let account_id = account_id.to_owned(); @@ -267,9 +304,9 @@ mod tests { use std::collections::HashMap; use mockall::predicate; - use registry_types::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; + use registry_types::{Rule, StartBlock, Status}; - use crate::registry::IndexerConfig; + use crate::indexer_config::IndexerConfig; #[tokio::test] async fn ignores_migrated_indexers() { @@ -281,19 +318,14 @@ mod tests { account_id: "morgs.near".parse().unwrap(), function_name: "test".to_string(), code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, }, created_at_block_height: 101, updated_at_block_height: Some(200), - start_block_height: Some(1000), + start_block: StartBlock::Height(1000), }, )]), )]); @@ -303,6 +335,7 @@ mod tests { v1_ack: true, migrated: true, failed: false, + v2_control: false, }]; let redis_client = RedisClient::default(); @@ -327,6 +360,7 @@ mod tests { v1_ack: true, migrated: true, failed: false, + v2_control: false, }]; let redis_client = RedisClient::default(); @@ -352,19 +386,14 @@ mod tests { account_id: "morgs.near".parse().unwrap(), function_name: "test".to_string(), code: String::new(), - schema: Some(String::new()), - filter: IndexerRule { - id: None, - name: None, - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, }, created_at_block_height: 101, updated_at_block_height: Some(200), - start_block_height: Some(1000), + start_block: StartBlock::Height(1000), }, )]), )]); @@ -374,6 +403,7 @@ mod tests { v1_ack: true, migrated: false, failed: false, + v2_control: false, }]; let mut redis_client = RedisClient::default(); @@ -393,6 +423,20 @@ mod tests { ) .returning(|_, _| Ok(Some(()))) .once(); + redis_client + .expect_exists::() + .with(predicate::eq(String::from( + "morgs.near/test:historical:stream", + ))) + .returning(|_| Ok(true)) + .once(); + redis_client + .expect_exists::() + .with(predicate::eq(String::from( + "morgs.near/test:real_time:stream", + ))) + .returning(|_| Ok(true)) + .once(); redis_client .expect_rename::() .with( @@ -443,6 +487,14 @@ mod tests { ) .returning(|_, _| Ok(())) .once(); + redis_client + .expect_set::() + .with( + predicate::eq(String::from("morgs.near/test:block_stream:version")), + predicate::eq(MIGRATED_STREAM_VERSION), + ) + .returning(|_, _| Ok(())) + .once(); redis_client .expect_atomic_update::<&str, String, String>() .returning(|_, _| Ok(())); diff --git a/coordinator/src/redis.rs b/coordinator/src/redis.rs index c2df6d826..a5c26d28a 100644 --- a/coordinator/src/redis.rs +++ b/coordinator/src/redis.rs @@ -7,6 +7,8 @@ use redis::{ aio::ConnectionManager, streams, AsyncCommands, FromRedisValue, RedisResult, ToRedisArgs, }; +use crate::indexer_config::IndexerConfig; + #[cfg(test)] pub use MockRedisClientImpl as RedisClient; #[cfg(not(test))] @@ -34,21 +36,37 @@ impl RedisClientImpl { }) } - pub async fn get(&self, key: T) -> anyhow::Result + pub async fn get(&self, key: T) -> anyhow::Result> where - T: ToRedisArgs + Debug + 'static, + T: ToRedisArgs + Debug + Send + Sync + 'static, U: FromRedisValue + Debug + 'static, { - let value = redis::cmd("GET") - .arg(&key) - .query_async(&mut self.connection.clone()) + let value: Option = self + .connection + .clone() + .get(&key) .await - .map_err(|e| anyhow::format_err!(e))?; + .context(format!("GET: {key:?}"))?; tracing::debug!("GET: {:?}={:?}", key, value); Ok(value) } + pub async fn set(&self, key: K, value: V) -> anyhow::Result<()> + where + K: ToRedisArgs + Debug + Send + Sync + 'static, + V: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("SET: {key:?} {value:?}"); + + self.connection + .clone() + .set(&key, &value) + .await + .context(format!("SET: {key:?} {value:?}"))?; + + Ok(()) + } pub async fn rename(&self, old_key: K, new_key: V) -> anyhow::Result<()> where @@ -57,7 +75,11 @@ impl RedisClientImpl { { tracing::debug!("RENAME: {:?} -> {:?}", old_key, new_key); - self.connection.clone().rename(old_key, new_key).await?; + self.connection + .clone() + .rename(&old_key, &new_key) + .await + .context(format!("RENAME: {old_key:?} {new_key:?}"))?; Ok(()) } @@ -69,11 +91,12 @@ impl RedisClientImpl { { tracing::debug!("SREM: {:?}={:?}", key, value); - match self.connection.clone().srem(key, value).await { + match self.connection.clone().srem(&key, &value).await { Ok(1) => Ok(Some(())), Ok(_) => Ok(None), Err(e) => Err(anyhow::format_err!(e)), } + .context(format!("SREM: {key:?} {value:?}")) } pub async fn xread( @@ -92,11 +115,12 @@ impl RedisClientImpl { .connection .clone() .xread_options( - &[key], - &[start_id], + &[&key], + &[&start_id], &streams::StreamReadOptions::default().count(count), ) - .await?; + .await + .context(format!("XREAD {key:?} {start_id:?} {count:?}"))?; if results.keys.is_empty() { return Ok([].to_vec()); @@ -112,7 +136,11 @@ impl RedisClientImpl { { tracing::debug!("XADD: {:?} {:?} {:?}", key, "*", fields); - self.connection.clone().xadd(key, "*", fields).await?; + self.connection + .clone() + .xadd(&key, "*", fields) + .await + .context(format!("XADD {key:?} {fields:?}"))?; Ok(()) } @@ -124,11 +152,43 @@ impl RedisClientImpl { { tracing::debug!("XDEL: {:?} {:?}", key, id); - self.connection.clone().xdel(key, &[id]).await?; + self.connection + .clone() + .xdel(&key, &[&id]) + .await + .context(format!("XDEL {key:?} {id:?}"))?; Ok(()) } + pub async fn exists(&self, key: K) -> anyhow::Result + where + K: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("EXISTS {key:?}"); + + self.connection + .clone() + .exists(&key) + .await + .map_err(|e| anyhow::format_err!(e)) + .context(format!("EXISTS {key:?}")) + } + + pub async fn del(&self, key: K) -> anyhow::Result<()> + where + K: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("DEL {key:?}"); + + self.connection + .clone() + .del(&key) + .await + .map_err(|e| anyhow::format_err!(e)) + .context(format!("DEL {key:?}")) + } + // `redis::transaction`s currently don't work with async connections, so we have to create a _new_ // blocking connection to atmoically update a value. pub fn atomic_update(&self, key: K, update_fn: F) -> anyhow::Result<()> @@ -149,4 +209,32 @@ impl RedisClientImpl { Ok(()) } + + pub async fn get_stream_version( + &self, + indexer_config: &IndexerConfig, + ) -> anyhow::Result> { + self.get::<_, u64>(indexer_config.get_redis_stream_version_key()) + .await + } + + pub async fn get_last_published_block( + &self, + indexer_config: &IndexerConfig, + ) -> anyhow::Result> { + self.get::<_, u64>(indexer_config.get_last_published_block_key()) + .await + } + + pub async fn clear_block_stream(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { + self.del(indexer_config.get_redis_stream_key()).await + } + + pub async fn set_stream_version(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { + self.set( + indexer_config.get_redis_stream_version_key(), + indexer_config.get_registry_version(), + ) + .await + } } diff --git a/coordinator/src/registry.rs b/coordinator/src/registry.rs index db13c77af..7318ae4f2 100644 --- a/coordinator/src/registry.rs +++ b/coordinator/src/registry.rs @@ -8,42 +8,13 @@ use near_jsonrpc_client::JsonRpcClient; use near_jsonrpc_primitives::types::query::QueryResponseKind; use near_primitives::types::{AccountId, BlockReference, Finality, FunctionArgs}; use near_primitives::views::QueryRequest; -use registry_types::{AccountOrAllIndexers, IndexerRule}; +use registry_types::AllIndexers; +use crate::indexer_config::IndexerConfig; use crate::utils::exponential_retry; pub type IndexerRegistry = HashMap>; -#[derive(Debug, Clone)] -pub struct IndexerConfig { - pub account_id: AccountId, - pub function_name: String, - pub code: String, - pub start_block_height: Option, - pub schema: Option, - pub filter: IndexerRule, - pub updated_at_block_height: Option, - pub created_at_block_height: u64, -} - -impl IndexerConfig { - pub fn get_full_name(&self) -> String { - format!("{}/{}", self.account_id, self.function_name) - } - - pub fn get_redis_stream(&self) -> String { - format!("{}:block_stream", self.get_full_name()) - } - - pub fn get_historical_redis_stream(&self) -> String { - format!("{}:historical:stream", self.get_full_name()) - } - - pub fn get_real_time_redis_stream(&self) -> String { - format!("{}:real_time:stream", self.get_full_name()) - } -} - #[cfg(test)] pub use MockRegistryImpl as Registry; #[cfg(not(test))] @@ -56,7 +27,7 @@ pub struct RegistryImpl { #[cfg_attr(test, mockall::automock)] impl RegistryImpl { - const LIST_METHOD: &str = "list_indexer_functions"; + const LIST_METHOD: &str = "list_all"; pub fn connect(registry_contract_id: AccountId, rpc_url: &str) -> Self { let json_rpc_client = JsonRpcClient::connect(rpc_url); @@ -83,9 +54,9 @@ impl RegistryImpl { account_id: account_id.clone(), function_name, code: indexer.code, - start_block_height: indexer.start_block_height, + start_block: indexer.start_block, schema: indexer.schema, - filter: indexer.filter, + rule: indexer.rule, updated_at_block_height: indexer.updated_at_block_height, created_at_block_height: indexer.created_at_block_height, }, @@ -114,12 +85,9 @@ impl RegistryImpl { .context("Failed to list registry contract")?; if let QueryResponseKind::CallResult(call_result) = response.kind { - let list_registry_response: AccountOrAllIndexers = - serde_json::from_slice(&call_result.result)?; + let all_indexers: AllIndexers = serde_json::from_slice(&call_result.result)?; - if let AccountOrAllIndexers::All(all_indexers) = list_registry_response { - return Ok(self.enrich_indexer_registry(all_indexers)); - } + return Ok(self.enrich_indexer_registry(all_indexers)); } anyhow::bail!("Invalid registry response") diff --git a/docker-compose.yml b/docker-compose.yml index 48103d4ae..7dc2e5181 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -70,6 +70,7 @@ services: PGPASSWORD: postgrespassword PGDATABASE: postgres PORT: 9180 + AWS_REGION: eu-central-1 AWS_ACCESS_KEY_ID: AWS_SECRET_ACCESS_KEY: GRPC_SERVER_PORT: 7001 diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index 96511f544..9f0ffbdf6 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -47,6 +47,9 @@ pub(crate) struct QueryApiContext<'a> { struct DenylistEntry { account_id: AccountId, v1_ack: bool, + migrated: bool, + failed: bool, + v2_control: bool, } type Denylist = Vec; @@ -138,8 +141,9 @@ async fn main() -> anyhow::Result<()> { } async fn fetch_denylist(redis_connection_manager: &ConnectionManager) -> anyhow::Result { - let raw_denylist: String = - storage::get(redis_connection_manager, storage::DENYLIST_KEY).await?; + let raw_denylist: String = storage::get(redis_connection_manager, storage::DENYLIST_KEY) + .await + .unwrap_or("".to_owned()); let denylist: Denylist = serde_json::from_str(&raw_denylist).context("Failed to parse denylist")?; diff --git a/registry/contract/src/lib.rs b/registry/contract/src/lib.rs index c9b000bff..f33835461 100644 --- a/registry/contract/src/lib.rs +++ b/registry/contract/src/lib.rs @@ -5,50 +5,45 @@ use near_sdk::store::UnorderedMap; use near_sdk::{env, log, near_bindgen, serde_json, AccountId, BorshStorageKey, CryptoHash}; use registry_types::{ - AccountOrAllIndexers, IndexerConfig, IndexerRule, IndexerRuleKind, MatchingRule, Status, + AccountIndexers, AllIndexers, IndexerConfig, IndexerRuleKind, MatchingRule, + OldAccountOrAllIndexers, OldIndexerConfig, OldIndexerRule, Rule, StartBlock, Status, }; type FunctionName = String; -// Define the contract structure -#[near_bindgen] + #[derive(BorshDeserialize, BorshSerialize, Debug)] -pub struct Contract { - registry: IndexersByAccount, +pub struct OldContract { + registry: OldIndexersByAccount, account_roles: Vec, } -pub type IndexersByAccount = UnorderedMap; +pub type OldIndexersByAccount = UnorderedMap; -pub type IndexerConfigByFunctionName = UnorderedMap; +pub type OldIndexerConfigByFunctionName = UnorderedMap; +// Define the contract structure +#[near_bindgen] #[derive(BorshDeserialize, BorshSerialize, Debug)] -pub struct OldState { - registry: OldIndexersByAccount, +pub struct Contract { + registry: IndexersByAccount, account_roles: Vec, } -#[derive(BorshSerialize, BorshDeserialize, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -#[serde(crate = "near_sdk::serde")] -pub struct OldIndexerConfig { - pub code: String, - pub start_block_height: Option, - pub schema: Option, - pub filter: IndexerRule, -} - -pub type OldIndexersByAccount = UnorderedMap; +type IndexersByAccount = UnorderedMap; -pub type OldIndexerConfigByFunctionName = UnorderedMap; +type IndexerConfigByFunctionName = UnorderedMap; // Migration types #[derive(BorshStorageKey, BorshSerialize)] -pub enum StorageKeys { +enum StorageKeys { Registry, // can be removed after migration Account(CryptoHash), // can be removed after migration RegistryV1, AccountV1(CryptoHash), RegistryV2, AccountV2(CryptoHash), + RegistryV3, + AccountV3(CryptoHash), } /// These roles are used to control access across the various contract methods. @@ -74,11 +69,10 @@ pub struct AccountRole { role: Role, } -// Define the default, which automatically initializes the contract impl Default for Contract { fn default() -> Self { Self { - registry: IndexersByAccount::new(StorageKeys::Registry), + registry: IndexersByAccount::new(StorageKeys::RegistryV3), account_roles: vec![ AccountRole { account_id: "morgs.near".parse().unwrap(), @@ -115,50 +109,29 @@ impl Contract { #[private] #[init(ignore_state)] pub fn migrate() -> Self { - let state: OldState = env::state_read().expect("failed to parse existing state"); + let state: OldContract = env::state_read().expect("failed to parse existing state"); - let mut registry = IndexersByAccount::new(StorageKeys::RegistryV2); + let mut registry = IndexersByAccount::new(StorageKeys::RegistryV3); for (account_id, indexers) in state.registry.iter() { let mut new_indexers: IndexerConfigByFunctionName = IndexerConfigByFunctionName::new( - StorageKeys::AccountV2(env::sha256_array(account_id.as_bytes())), + StorageKeys::AccountV3(env::sha256_array(account_id.as_bytes())), ); for (function_name, indexer_config) in indexers.iter() { - new_indexers.insert( - function_name.to_string(), - IndexerConfig { - updated_at_block_height: None, - created_at_block_height: env::block_height(), - schema: indexer_config.schema.clone(), - code: indexer_config.code.clone(), - start_block_height: indexer_config.start_block_height, - filter: indexer_config.filter.clone(), - }, - ); + new_indexers.insert(function_name.to_string(), indexer_config.clone().into()); } registry.insert(account_id.clone(), new_indexers); } - let account_roles: Vec<_> = Contract::default() - .account_roles - .into_iter() - .chain( - state - .account_roles - .into_iter() - .filter(|account_role| account_role.role == Role::User), - ) - .collect(); - Self { registry, - account_roles, + account_roles: state.account_roles, } } - pub fn near_social_indexer_rule() -> IndexerRule { + pub fn near_social_indexer_rule() -> OldIndexerRule { let contract = "social.near"; let method = "set"; let matching_rule = MatchingRule::ActionFunctionCall { @@ -166,7 +139,7 @@ impl Contract { function: method.to_string(), status: Status::Any, }; - IndexerRule { + OldIndexerRule { indexer_rule_kind: IndexerRuleKind::Action, matching_rule, id: None, @@ -179,7 +152,7 @@ impl Contract { &self, function_name: String, account_id: Option, - ) -> IndexerConfig { + ) -> OldIndexerConfig { let account_id = match account_id { Some(account_id) => account_id.parse::().unwrap_or_else(|_| { env::panic_str(&format!("Account ID {} is invalid", account_id)); @@ -191,7 +164,7 @@ impl Contract { env::panic_str(format!("Account {} has no registered functions", account_id).as_str()) }); - let indexer_config = account_indexers.get(&function_name).unwrap_or_else(|| { + let config = account_indexers.get(&function_name).unwrap_or_else(|| { env::panic_str( format!( "Function {} is not registered under account {}", @@ -201,7 +174,7 @@ impl Contract { ) }); - indexer_config.clone() + config.clone().into() } pub fn assert_roles(&self, permitted_roles: Vec) { @@ -279,6 +252,70 @@ impl Contract { }) } + pub fn register( + &mut self, + function_name: String, + code: String, + schema: String, + rule: Rule, + start_block: StartBlock, + ) { + let account_id = env::signer_account_id(); + + log!( + "Registering function {} for account {}", + &function_name, + &account_id + ); + + match &rule { + Rule::ActionAny { + affected_account_id, + .. + } + | Rule::ActionFunctionCall { + affected_account_id, + .. + } => { + if affected_account_id == "*" { + self.assert_roles(vec![Role::Owner]); + } + } + _ => {} + } + + let account_indexers = + self.registry + .entry(account_id.clone()) + .or_insert(IndexerConfigByFunctionName::new(StorageKeys::AccountV3( + env::sha256_array(account_id.as_bytes()), + ))); + + match account_indexers.entry(function_name) { + near_sdk::store::unordered_map::Entry::Occupied(mut entry) => { + let indexer = entry.get(); + entry.insert(IndexerConfig { + code, + schema, + rule, + start_block, + updated_at_block_height: Some(env::block_height()), + created_at_block_height: indexer.created_at_block_height, + }); + } + near_sdk::store::unordered_map::Entry::Vacant(entry) => { + entry.insert(IndexerConfig { + code, + schema, + rule, + start_block, + updated_at_block_height: None, + created_at_block_height: env::block_height(), + }); + } + } + } + // Public method - registers indexer code under then function_name pub fn register_indexer_function( &mut self, @@ -303,13 +340,29 @@ impl Contract { } }; - let filter: IndexerRule = match filter_json { + let filter: OldIndexerRule = match filter_json { Some(filter_json) => { - let filter_rule: IndexerRule = - serde_json::from_str(&filter_json).unwrap_or_else(|_| { - env::panic_str(&format!("Invalid filter JSON {}", filter_json)); + let filter_rule: OldIndexerRule = serde_json::from_str(&filter_json) + .unwrap_or_else(|e| { + env::panic_str(&format!("Invalid filter JSON {}", e)); }); + match &filter_rule.matching_rule { + MatchingRule::ActionAny { + affected_account_id, + .. + } + | MatchingRule::ActionFunctionCall { + affected_account_id, + .. + } => { + if affected_account_id == "*" { + self.assert_roles(vec![Role::Owner]); + } + } + _ => {} + } + filter_rule } None => Contract::near_social_indexer_rule(), @@ -324,18 +377,23 @@ impl Contract { let account_indexers = self.registry .entry(account_id.clone()) - .or_insert(IndexerConfigByFunctionName::new(StorageKeys::Account( + .or_insert(IndexerConfigByFunctionName::new(StorageKeys::AccountV3( env::sha256_array(account_id.as_bytes()), ))); + let start_block = match start_block_height { + Some(height) => StartBlock::Height(height), + None => StartBlock::Latest, + }; + match account_indexers.entry(function_name) { near_sdk::store::unordered_map::Entry::Occupied(mut entry) => { let indexer = entry.get(); entry.insert(IndexerConfig { code, - start_block_height, - schema, - filter, + start_block, + schema: schema.unwrap_or(String::new()), + rule: filter.matching_rule.into(), updated_at_block_height: Some(env::block_height()), created_at_block_height: indexer.created_at_block_height, }); @@ -343,9 +401,9 @@ impl Contract { near_sdk::store::unordered_map::Entry::Vacant(entry) => { entry.insert(IndexerConfig { code, - start_block_height, - schema, - filter, + start_block, + schema: schema.unwrap_or(String::new()), + rule: filter.matching_rule.into(), updated_at_block_height: None, created_at_block_height: env::block_height(), }); @@ -393,7 +451,7 @@ impl Contract { } } - pub fn list_indexer_functions(&self, account_id: Option) -> AccountOrAllIndexers { + pub fn list_indexer_functions(&self, account_id: Option) -> OldAccountOrAllIndexers { match account_id { Some(account_id) => { let account_id = account_id.parse::().unwrap_or_else(|_| { @@ -406,14 +464,16 @@ impl Contract { ) }); - AccountOrAllIndexers::Account( + OldAccountOrAllIndexers::Account( account_indexers .iter() - .map(|(function_name, config)| (function_name.clone(), config.clone())) + .map(|(function_name, config)| { + (function_name.clone(), config.clone().into()) + }) .collect(), ) } - None => AccountOrAllIndexers::All( + None => OldAccountOrAllIndexers::All( self.registry .iter() .map(|(account_id, account_indexers)| { @@ -422,7 +482,7 @@ impl Contract { account_indexers .iter() .map(|(function_name, config)| { - (function_name.clone(), config.clone()) + (function_name.clone(), config.clone().into()) }) .collect(), ) @@ -431,12 +491,34 @@ impl Contract { ), } } + + pub fn list_by_account(&self, account_id: AccountId) -> AccountIndexers { + self.registry + .get(&account_id) + .unwrap_or(&IndexerConfigByFunctionName::new(StorageKeys::AccountV3( + env::sha256_array(account_id.as_bytes()), + ))) + .iter() + .map(|(function_name, config)| (function_name.clone(), config.clone())) + .collect() + } + + pub fn list_all(&self) -> AllIndexers { + self.registry + .iter() + .map(|(account_id, account_indexers)| { + ( + account_id.clone(), + account_indexers + .iter() + .map(|(function_name, config)| (function_name.clone(), config.clone())) + .collect(), + ) + }) + .collect() + } } -/* - * The rest of this file holds the inline tests for the code above - * Learn more about Rust tests: https://doc.rust-lang.org/book/ch11-01-writing-tests.html - */ #[cfg(test)] mod tests { use super::*; @@ -445,9 +527,9 @@ mod tests { #[test] fn migrate() { - let mut registry = OldIndexersByAccount::new(StorageKeys::RegistryV1); + let mut registry = OldIndexersByAccount::new(StorageKeys::RegistryV2); let account_id = "morgs.near".parse::().unwrap(); - let mut functions = OldIndexerConfigByFunctionName::new(StorageKeys::AccountV1( + let mut functions = OldIndexerConfigByFunctionName::new(StorageKeys::AccountV2( env::sha256_array(account_id.as_bytes()), )); @@ -458,35 +540,34 @@ mod tests { start_block_height: None, schema: None, filter: Contract::near_social_indexer_rule(), + created_at_block_height: 10, + updated_at_block_height: None, }, ); functions.insert( "test2".to_string(), OldIndexerConfig { code: "return block2;".to_string(), - start_block_height: None, - schema: None, - filter: Contract::near_social_indexer_rule(), + start_block_height: Some(100), + schema: Some(String::from("create table blah")), + filter: OldIndexerRule { + id: None, + name: None, + indexer_rule_kind: IndexerRuleKind::Action, + matching_rule: MatchingRule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Success, + }, + }, + created_at_block_height: 10, + updated_at_block_height: Some(20), }, ); registry.insert(account_id.clone(), functions); - env::state_write(&OldState { + env::state_write(&OldContract { registry, - account_roles: vec![ - AccountRole { - account_id: account_id.clone(), - role: Role::Owner, - }, - AccountRole { - account_id: "should-be-removed.near".parse().unwrap(), - role: Role::Owner, - }, - AccountRole { - account_id: "bob.near".parse().unwrap(), - role: Role::User, - }, - ], + account_roles: Contract::default().account_roles, }); let contract = Contract::migrate(); @@ -500,11 +581,15 @@ mod tests { .unwrap(), &IndexerConfig { code: "return block;".to_string(), - start_block_height: None, - schema: None, - filter: Contract::near_social_indexer_rule(), + start_block: StartBlock::Latest, + schema: String::new(), + rule: Rule::ActionFunctionCall { + affected_account_id: String::from("social.near"), + status: Status::Any, + function: String::from("set") + }, updated_at_block_height: None, - created_at_block_height: env::block_height(), + created_at_block_height: 10, } ); assert_eq!( @@ -516,24 +601,17 @@ mod tests { .unwrap(), &IndexerConfig { code: "return block2;".to_string(), - start_block_height: None, - schema: None, - filter: Contract::near_social_indexer_rule(), - updated_at_block_height: None, - created_at_block_height: env::block_height(), + schema: String::from("create table blah"), + start_block: StartBlock::Height(100), + rule: Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Success + }, + updated_at_block_height: Some(20), + created_at_block_height: 10, } ); - assert_eq!( - contract.account_roles, - Contract::default() - .account_roles - .into_iter() - .chain(std::iter::once(AccountRole { - account_id: "bob.near".parse().unwrap(), - role: Role::User, - })) - .collect::>() - ); + assert_eq!(contract.account_roles, Contract::default().account_roles); } #[test] @@ -597,7 +675,7 @@ mod tests { assert!(contract .account_roles .iter() - .any(|account| account.account_id.to_string() == "alice.near")) + .any(|account| account.account_id == "alice.near")) } #[test] @@ -689,7 +767,7 @@ mod tests { assert!(!contract .account_roles .iter() - .any(|account| account.account_id.to_string() == "alice.near")) + .any(|account| account.account_id == "alice.near")) } #[test] @@ -747,7 +825,7 @@ mod tests { role: Role::User, }], }; - let config = IndexerConfig { + let config = OldIndexerConfig { code: "var x= 1;".to_string(), start_block_height: Some(43434343), schema: None, @@ -780,7 +858,7 @@ mod tests { role: Role::Owner, }], }; - let config = IndexerConfig { + let config = OldIndexerConfig { code: "var x= 1;".to_string(), start_block_height: Some(43434343), schema: None, @@ -895,7 +973,7 @@ mod tests { role: Role::User, }], }; - let config = IndexerConfig { + let config = OldIndexerConfig { code: "var x= 1;".to_string(), start_block_height: None, schema: None, @@ -914,13 +992,16 @@ mod tests { ); assert_eq!( - contract - .registry - .get(&"bob.near".parse::().unwrap()) - .unwrap() - .get("test") - .unwrap(), - &config + OldIndexerConfig::from( + contract + .registry + .get(&"bob.near".parse::().unwrap()) + .unwrap() + .get("test") + .unwrap() + .clone() + ), + config ); } @@ -963,11 +1044,11 @@ mod tests { role: Role::User, }], }; - let config = IndexerConfig { + let config = OldIndexerConfig { code: "var x= 1;".to_string(), start_block_height: None, schema: None, - filter: IndexerRule { + filter: OldIndexerRule { indexer_rule_kind: IndexerRuleKind::Action, matching_rule: MatchingRule::ActionFunctionCall { affected_account_id: "test".to_string(), @@ -991,13 +1072,16 @@ mod tests { ); assert_eq!( - contract - .registry - .get(&"bob.near".parse::().unwrap()) - .unwrap() - .get("test") - .unwrap(), - &config + OldIndexerConfig::from( + contract + .registry + .get(&"bob.near".parse::().unwrap()) + .unwrap() + .get("test") + .unwrap() + .clone() + ), + config ); } @@ -1010,11 +1094,11 @@ mod tests { role: Role::User, }], }; - let config = IndexerConfig { + let config = OldIndexerConfig { code: "var x= 1;".to_string(), start_block_height: None, schema: None, - filter: IndexerRule { + filter: OldIndexerRule { indexer_rule_kind: IndexerRuleKind::Action, matching_rule: MatchingRule::ActionAny { affected_account_id: "test".to_string(), @@ -1037,13 +1121,16 @@ mod tests { ); assert_eq!( - contract - .registry - .get(&"bob.near".parse::().unwrap()) - .unwrap() - .get("test") - .unwrap(), - &config + OldIndexerConfig::from( + contract + .registry + .get(&"bob.near".parse::().unwrap()) + .unwrap() + .get("test") + .unwrap() + .clone() + ), + config ); } @@ -1080,9 +1167,12 @@ mod tests { "test".to_string(), IndexerConfig { code: "var x= 1;".to_string(), - start_block_height: Some(43434343), - schema: None, - filter: Contract::near_social_indexer_rule(), + start_block: StartBlock::Latest, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Success, + }, updated_at_block_height: None, created_at_block_height: 100, }, @@ -1130,11 +1220,14 @@ mod tests { "test".to_string(), IndexerConfig { code: "var x= 1;".to_string(), - start_block_height: Some(43434343), - schema: None, - filter: Contract::near_social_indexer_rule(), + start_block: StartBlock::Latest, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Success, + }, updated_at_block_height: None, - created_at_block_height: 0, + created_at_block_height: 100, }, ); let mut registry = IndexersByAccount::new(StorageKeys::Registry); @@ -1146,7 +1239,7 @@ mod tests { role: Role::User, }], }; - let config = IndexerConfig { + let config = OldIndexerConfig { code: "var x= 1;".to_string(), start_block_height: None, schema: None, @@ -1174,6 +1267,45 @@ mod tests { ); } + #[test] + #[should_panic(expected = "Account bob.near does not have one of required roles [Owner]")] + fn prevents_non_owners_from_using_wildcard() { + let mut contract = Contract::default(); + contract.account_roles.push(AccountRole { + account_id: "bob.near".parse().unwrap(), + role: Role::User, + }); + + contract.register_indexer_function( + String::from("name"), + String::from("code"), + Some(0), + Some(String::from("schema")), + None, + Some(r#"{"indexer_rule_kind":"Action","matching_rule":{"rule":"ACTION_ANY","affected_account_id":"*","status":"SUCCESS"}}"#.to_string()), + ); + } + + #[test] + fn allows_owners_to_use_wildcard() { + let mut contract = Contract::default(); + contract.account_roles.push(AccountRole { + account_id: "bob.near".parse().unwrap(), + role: Role::Owner, + }); + + contract.register_indexer_function( + String::from("name"), + String::from("code"), + Some(0), + Some(String::from("schema")), + None, + Some(r#"{"indexer_rule_kind":"Action","matching_rule":{"rule":"ACTION_ANY","affected_account_id":"*","status":"SUCCESS"}}"#.to_string()), + ); + + assert_eq!(contract.registry.len(), 1); + } + #[test] fn users_can_remove_their_own_functions() { let account_id = "bob.near".parse::().unwrap(); @@ -1184,11 +1316,14 @@ mod tests { "test".to_string(), IndexerConfig { code: "var x= 1;".to_string(), - start_block_height: Some(43434343), - schema: None, - filter: Contract::near_social_indexer_rule(), + start_block: StartBlock::Latest, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Success, + }, updated_at_block_height: None, - created_at_block_height: 0, + created_at_block_height: 100, }, ); let mut registry = IndexersByAccount::new(StorageKeys::Registry); @@ -1219,11 +1354,14 @@ mod tests { "test".to_string(), IndexerConfig { code: "var x= 1;".to_string(), - start_block_height: Some(43434343), - schema: None, - filter: Contract::near_social_indexer_rule(), + start_block: StartBlock::Latest, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Success, + }, updated_at_block_height: None, - created_at_block_height: 0, + created_at_block_height: 100, }, ); let mut registry = IndexersByAccount::new(StorageKeys::Registry); @@ -1255,11 +1393,14 @@ mod tests { "test".to_string(), IndexerConfig { code: "var x= 1;".to_string(), - start_block_height: Some(43434343), - schema: None, - filter: Contract::near_social_indexer_rule(), + start_block: StartBlock::Latest, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Success, + }, updated_at_block_height: None, - created_at_block_height: 0, + created_at_block_height: 100, }, ); let mut registry = IndexersByAccount::new(StorageKeys::Registry); @@ -1285,11 +1426,14 @@ mod tests { "test".to_string(), IndexerConfig { code: "var x= 1;".to_string(), - start_block_height: Some(43434343), - schema: None, - filter: Contract::near_social_indexer_rule(), + start_block: StartBlock::Latest, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Success, + }, updated_at_block_height: None, - created_at_block_height: 0, + created_at_block_height: 100, }, ); let mut registry = IndexersByAccount::new(StorageKeys::Registry); @@ -1321,11 +1465,14 @@ mod tests { "test".to_string(), IndexerConfig { code: "var x= 1;".to_string(), - start_block_height: Some(43434343), - schema: None, - filter: Contract::near_social_indexer_rule(), + start_block: StartBlock::Latest, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Success, + }, updated_at_block_height: None, - created_at_block_height: 0, + created_at_block_height: 100, }, ); let mut registry = IndexersByAccount::new(StorageKeys::Registry); @@ -1348,22 +1495,28 @@ mod tests { "test".to_string(), IndexerConfig { code: "var x= 1;".to_string(), - start_block_height: Some(43434343), - schema: None, - filter: Contract::near_social_indexer_rule(), + start_block: StartBlock::Latest, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Success, + }, updated_at_block_height: None, - created_at_block_height: 0, + created_at_block_height: 100, }, ); account_indexers.insert( "test2".to_string(), IndexerConfig { - code: "var x= 2;".to_string(), - start_block_height: Some(43434343), - schema: None, - filter: Contract::near_social_indexer_rule(), + code: "var x= 1;".to_string(), + start_block: StartBlock::Latest, + schema: String::new(), + rule: Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Success, + }, updated_at_block_height: None, - created_at_block_height: 0, + created_at_block_height: 100, }, ); let mut registry = IndexersByAccount::new(StorageKeys::Registry); @@ -1407,7 +1560,7 @@ mod tests { #[test] fn read_indexer_function() { - let config = IndexerConfig { + let config = OldIndexerConfig { code: "var x= 1;".to_string(), start_block_height: None, schema: None, @@ -1415,11 +1568,12 @@ mod tests { updated_at_block_height: None, created_at_block_height: 0, }; + let account_id = "bob.near".parse::().unwrap(); let mut account_indexers = IndexerConfigByFunctionName::new(StorageKeys::Account( env::sha256_array(account_id.as_bytes()), )); - account_indexers.insert("test".to_string(), config.clone()); + account_indexers.insert("test".to_string(), config.clone().into()); let mut registry = IndexersByAccount::new(StorageKeys::Registry); registry.insert(account_id, account_indexers); let contract = Contract { @@ -1435,7 +1589,7 @@ mod tests { #[test] fn read_indexer_function_from_other_account() { - let config = IndexerConfig { + let config = OldIndexerConfig { code: "var x= 1;".to_string(), start_block_height: None, schema: None, @@ -1447,7 +1601,7 @@ mod tests { let mut account_indexers = IndexerConfigByFunctionName::new(StorageKeys::Account( env::sha256_array(account_id.as_bytes()), )); - account_indexers.insert("test".to_string(), config.clone()); + account_indexers.insert("test".to_string(), config.clone().into()); let mut registry = IndexersByAccount::new(StorageKeys::Registry); registry.insert(account_id, account_indexers); let contract = Contract { @@ -1471,7 +1625,7 @@ mod tests { #[test] fn list_indexer_functions() { - let config = IndexerConfig { + let config = OldIndexerConfig { code: "var x= 1;".to_string(), start_block_height: Some(43434343), schema: None, @@ -1483,7 +1637,7 @@ mod tests { let mut account_indexers = IndexerConfigByFunctionName::new(StorageKeys::Account( env::sha256_array(account_id.as_bytes()), )); - account_indexers.insert("test".to_string(), config.clone()); + account_indexers.insert("test".to_string(), config.clone().into()); let mut registry = IndexersByAccount::new(StorageKeys::Registry); registry.insert(account_id, account_indexers); let contract = Contract { @@ -1493,7 +1647,7 @@ mod tests { assert_eq!( contract.list_indexer_functions(None), - AccountOrAllIndexers::All(HashMap::from([( + OldAccountOrAllIndexers::All(HashMap::from([( "bob.near".parse().unwrap(), HashMap::from([("test".to_string(), config)]) )])) @@ -1502,7 +1656,7 @@ mod tests { #[test] fn list_account_indexer_functions() { - let config = IndexerConfig { + let config = OldIndexerConfig { code: "var x= 1;".to_string(), start_block_height: Some(43434343), schema: None, @@ -1514,7 +1668,7 @@ mod tests { let mut account_indexers = IndexerConfigByFunctionName::new(StorageKeys::Account( env::sha256_array(account_id.as_bytes()), )); - account_indexers.insert("test".to_string(), config.clone()); + account_indexers.insert("test".to_string(), config.clone().into()); let mut registry = IndexersByAccount::new(StorageKeys::Registry); registry.insert(account_id, account_indexers); let contract = Contract { @@ -1524,7 +1678,7 @@ mod tests { assert_eq!( contract.list_indexer_functions(Some("bob.near".to_string())), - AccountOrAllIndexers::Account(HashMap::from([("test".to_string(), config)])) + OldAccountOrAllIndexers::Account(HashMap::from([("test".to_string(), config)])) ); } @@ -1541,7 +1695,7 @@ mod tests { #[test] fn list_other_account_indexer_functions() { - let config = IndexerConfig { + let config = OldIndexerConfig { code: "var x= 1;".to_string(), start_block_height: Some(43434343), schema: None, @@ -1553,7 +1707,7 @@ mod tests { let mut account_indexers = IndexerConfigByFunctionName::new(StorageKeys::Account( env::sha256_array(account_id.as_bytes()), )); - account_indexers.insert("test".to_string(), config.clone()); + account_indexers.insert("test".to_string(), config.clone().into()); let mut registry = IndexersByAccount::new(StorageKeys::Registry); registry.insert(account_id, account_indexers); let contract = Contract { @@ -1563,7 +1717,99 @@ mod tests { assert_eq!( contract.list_indexer_functions(Some("alice.near".to_string())), - AccountOrAllIndexers::Account(HashMap::from([("test".to_string(), config)])) + OldAccountOrAllIndexers::Account(HashMap::from([("test".to_string(), config)])) + ); + } + + #[test] + fn list_all_indexers() { + let mut contract = Contract::default(); + + contract.register( + String::from("test"), + String::from("code"), + String::from("schema"), + Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Any, + }, + StartBlock::Latest, + ); + + assert_eq!( + contract.list_all(), + HashMap::from([( + "bob.near".parse::().unwrap(), + HashMap::from([( + String::from("test"), + IndexerConfig { + code: String::from("code"), + schema: String::from("schema"), + rule: Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Any, + }, + start_block: StartBlock::Latest, + updated_at_block_height: None, + created_at_block_height: env::block_height(), + } + )]) + )]) + ); + } + + #[test] + fn list_empty_account_indexers() { + let mut contract = Contract::default(); + + contract.register( + String::from("test"), + String::from("code"), + String::from("schema"), + Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Any, + }, + StartBlock::Latest, + ); + + assert_eq!( + contract.list_by_account("morgs.near".parse().unwrap()), + HashMap::new() + ); + } + + #[test] + fn list_account_indexers() { + let mut contract = Contract::default(); + + contract.register( + String::from("test"), + String::from("code"), + String::from("schema"), + Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Any, + }, + StartBlock::Latest, + ); + + assert_eq!( + contract.list_by_account(env::signer_account_id()), + HashMap::from([( + String::from("test"), + IndexerConfig { + code: String::from("code"), + schema: String::from("schema"), + rule: Rule::ActionAny { + affected_account_id: String::from("social.near"), + status: Status::Any, + }, + start_block: StartBlock::Latest, + updated_at_block_height: None, + created_at_block_height: env::block_height(), + } + )]) ); } } diff --git a/registry/types/src/lib.rs b/registry/types/src/lib.rs index ddbd16bd8..ee9d1deaa 100644 --- a/registry/types/src/lib.rs +++ b/registry/types/src/lib.rs @@ -16,14 +16,6 @@ use serde::{Deserialize, Serialize}; type FunctionName = String; -#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, PartialEq, Eq)] -#[serde(rename_all = "SCREAMING_SNAKE_CASE")] -pub enum Status { - Any, - Success, - Fail, -} - #[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, PartialEq, Eq)] #[serde(tag = "rule", rename_all = "SCREAMING_SNAKE_CASE")] pub enum MatchingRule { @@ -44,6 +36,40 @@ pub enum MatchingRule { }, } +impl From for MatchingRule { + fn from(value: Rule) -> Self { + match value { + Rule::ActionAny { + affected_account_id, + status, + } => MatchingRule::ActionAny { + affected_account_id, + status, + }, + Rule::Event { + contract_account_id, + standard, + version, + event, + } => MatchingRule::Event { + contract_account_id, + standard, + version, + event, + }, + Rule::ActionFunctionCall { + affected_account_id, + status, + function, + } => MatchingRule::ActionFunctionCall { + affected_account_id, + status, + function, + }, + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, PartialEq, Eq)] pub enum IndexerRuleKind { Action, @@ -53,25 +79,160 @@ pub enum IndexerRuleKind { } #[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, PartialEq, Eq)] -pub struct IndexerRule { +pub struct OldIndexerRule { pub indexer_rule_kind: IndexerRuleKind, pub matching_rule: MatchingRule, + // These are not set, and not used anywhere pub id: Option, pub name: Option, } #[derive(BorshSerialize, BorshDeserialize, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct IndexerConfig { +pub struct OldIndexerConfig { pub code: String, pub start_block_height: Option, pub schema: Option, - pub filter: IndexerRule, + pub filter: OldIndexerRule, pub updated_at_block_height: Option, pub created_at_block_height: u64, } +impl From for OldIndexerConfig { + fn from(config: IndexerConfig) -> Self { + let start_block_height = match config.start_block { + StartBlock::Latest => None, + StartBlock::Continue => None, + StartBlock::Height(height) => Some(height), + }; + + let schema = if config.schema.is_empty() { + None + } else { + Some(config.schema) + }; + + OldIndexerConfig { + start_block_height, + schema, + code: config.code, + filter: OldIndexerRule { + indexer_rule_kind: IndexerRuleKind::Action, + matching_rule: config.rule.into(), + id: None, + name: None, + }, + created_at_block_height: config.created_at_block_height, + updated_at_block_height: config.updated_at_block_height, + } + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] -pub enum AccountOrAllIndexers { - All(HashMap>), - Account(HashMap), +pub enum OldAccountOrAllIndexers { + All(HashMap>), + Account(HashMap), +} + +#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, PartialEq, Eq)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum Status { + Any, + Success, + Fail, +} + +#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, PartialEq, Eq)] +#[serde(tag = "kind", rename_all = "SCREAMING_SNAKE_CASE")] +pub enum Rule { + ActionAny { + affected_account_id: String, + status: Status, + }, + ActionFunctionCall { + affected_account_id: String, + status: Status, + function: String, + }, + Event { + contract_account_id: String, + standard: String, + version: String, + event: String, + }, } + +impl From for Rule { + fn from(value: MatchingRule) -> Self { + match value { + MatchingRule::ActionAny { + affected_account_id, + status, + } => Rule::ActionAny { + affected_account_id, + status, + }, + MatchingRule::Event { + contract_account_id, + standard, + version, + event, + } => Rule::Event { + contract_account_id, + standard, + version, + event, + }, + MatchingRule::ActionFunctionCall { + affected_account_id, + status, + function, + } => Rule::ActionFunctionCall { + affected_account_id, + status, + function, + }, + } + } +} + +#[derive(BorshSerialize, BorshDeserialize, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum StartBlock { + /// Specifies the particular block height from which to start indexing from. + Height(u64), + /// Starts indexing from the most recently finalized block. + Latest, + /// Resumes indexing from the block immediately following the last one successfully indexed + /// prior to update. + Continue, +} + +#[derive(BorshSerialize, BorshDeserialize, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct IndexerConfig { + pub code: String, + pub start_block: StartBlock, + pub schema: String, + pub rule: Rule, + pub updated_at_block_height: Option, + pub created_at_block_height: u64, +} + +impl From for IndexerConfig { + fn from(config: OldIndexerConfig) -> Self { + Self { + start_block: match config.start_block_height { + Some(height) => StartBlock::Height(height), + None => StartBlock::Latest, + }, + schema: config.schema.unwrap_or(String::new()), + code: config.code, + rule: config.filter.matching_rule.into(), + created_at_block_height: config.created_at_block_height, + updated_at_block_height: config.updated_at_block_height, + } + } +} + +pub type AccountIndexers = HashMap; + +pub type AllIndexers = HashMap; diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index 9224c374b..89bd8e47e 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -1,6 +1,24 @@ import express from 'express'; import { Gauge, Histogram, Counter, AggregatorRegistry } from 'prom-client'; +const HEAP_TOTAL_ALLOCATION = new Gauge({ + name: 'queryapi_runner_heap_total_allocation_megabytes', + help: 'Size of heap allocation for indexer function', + labelNames: ['indexer', 'type'], +}); + +const HEAP_USED = new Gauge({ + name: 'queryapi_runner_heap_used_megabytes', + help: 'Size of used heap space for indexer function', + labelNames: ['indexer', 'type'], +}); + +const PREFETCH_QUEUE_COUNT = new Gauge({ + name: 'queryapi_runner_prefetch_queue_count', + help: 'Count of items in prefetch queue for indexer function', + labelNames: ['indexer', 'type'], +}); + const BLOCK_WAIT_DURATION = new Histogram({ name: 'queryapi_runner_block_wait_duration_milliseconds', help: 'Time an indexer function waited for a block before processing', @@ -37,6 +55,9 @@ const EXECUTION_DURATION = new Histogram({ }); export const METRICS = { + HEAP_TOTAL_ALLOCATION, + HEAP_USED, + PREFETCH_QUEUE_COUNT, BLOCK_WAIT_DURATION, CACHE_HIT, CACHE_MISS, @@ -46,10 +67,14 @@ export const METRICS = { }; const aggregatorRegistry = new AggregatorRegistry(); -const workerMetrics: Record = {}; +const workerMetrics = new Map(); export const registerWorkerMetrics = (workerId: number, metrics: string): void => { - workerMetrics[workerId] = metrics; + workerMetrics.set(workerId, metrics); +}; + +export const deregisterWorkerMetrics = (workerId: number): void => { + workerMetrics.delete(workerId); }; export const startServer = async (): Promise => { @@ -60,7 +85,7 @@ export const startServer = async (): Promise => { app.get('/metrics', async (_req, res) => { res.set('Content-Type', aggregatorRegistry.contentType); - const metrics = await AggregatorRegistry.aggregate(Object.values(workerMetrics)).metrics(); + const metrics = await AggregatorRegistry.aggregate(Array.from(workerMetrics.values())).metrics(); res.send(metrics); }); diff --git a/runner/src/server/runner-service.ts b/runner/src/server/runner-service.ts index 96d295e56..2118591a9 100644 --- a/runner/src/server/runner-service.ts +++ b/runner/src/server/runner-service.ts @@ -107,7 +107,8 @@ function getRunnerService (executors: Map, StreamHandlerT schema: '', }; context = { - status: Status.RUNNING + status: Status.RUNNING, + block_height: context.block_height, }; } response.push({ diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 490be6559..3ef5991c9 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -1,7 +1,7 @@ import path from 'path'; import { Worker, isMainThread } from 'worker_threads'; -import { registerWorkerMetrics } from '../metrics'; +import { registerWorkerMetrics, deregisterWorkerMetrics } from '../metrics'; import Indexer from '../indexer'; export enum Status { @@ -17,8 +17,19 @@ export interface IndexerConfig { version: number } +export enum WorkerMessageType { + METRICS = 'METRICS', + BLOCK_HEIGHT = 'BLOCK_HEIGHT', +} + +export interface WorkerMessage { + type: WorkerMessageType + data: any +} + interface ExecutorContext { status: Status + block_height: number } export default class StreamHandler { @@ -38,6 +49,7 @@ export default class StreamHandler { }); this.executorContext = { status: Status.RUNNING, + block_height: indexerConfig?.version ?? 0, }; this.worker.on('message', this.handleMessage.bind(this)); @@ -48,6 +60,8 @@ export default class StreamHandler { } async stop (): Promise { + deregisterWorkerMetrics(this.worker.threadId); + await this.worker.terminate(); } @@ -61,12 +75,22 @@ export default class StreamHandler { indexer.setStatus(functionName, 0, Status.STOPPED).catch((e) => { console.log(`Failed to set status STOPPED for stream: ${this.streamKey}`, e); }); + indexer.writeLog(functionName, this.executorContext.block_height, `Encountered error processing stream: ${this.streamKey}, terminating thread\n${error.toString()}`).catch((e) => { + console.log(`Failed to write log for stream: ${this.streamKey}`, e); + }); this.worker.terminate().catch(() => { console.log(`Failed to terminate thread for stream: ${this.streamKey}`); }); } - private handleMessage (message: string): void { - registerWorkerMetrics(this.worker.threadId, message); + private handleMessage (message: WorkerMessage): void { + switch (message.type) { + case WorkerMessageType.BLOCK_HEIGHT: + this.executorContext.block_height = message.data; + break; + case WorkerMessageType.METRICS: + registerWorkerMetrics(this.worker.threadId, message.data); + break; + } } } diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 311657112..ce73e646b 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -6,7 +6,7 @@ import RedisClient, { type StreamType } from '../redis-client'; import { METRICS } from '../metrics'; import type { Block } from '@near-lake/primitives'; import LakeClient from '../lake-client'; -import { type IndexerConfig } from './stream-handler'; +import { WorkerMessageType, type IndexerConfig, type WorkerMessage } from './stream-handler'; if (isMainThread) { throw new Error('Worker should not be run on main thread'); @@ -51,33 +51,33 @@ async function handleStream (workerContext: WorkerContext, streamKey: string): P void blockQueueConsumer(workerContext, streamKey); } -function incrementId (id: string): string { - const [main, sequence] = id.split('-'); - return `${main}-${Number(sequence) + 1}`; -} - async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise { - const HISTORICAL_BATCH_SIZE = parseInt(process.env.BATCH_SIZE ?? '10'); + const HISTORICAL_BATCH_SIZE = parseInt(process.env.PREFETCH_QUEUE_LIMIT ?? '10'); let streamMessageStartId = '0'; while (true) { const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length; - if (preFetchCount <= 0) { - await sleep(100); - continue; - } - const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount); - if (messages == null) { - await sleep(100); - continue; - } + try { + if (preFetchCount <= 0) { + await sleep(100); + continue; + } + const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount); + if (messages == null) { + await sleep(100); + continue; + } - for (const streamMessage of messages) { - const { id, message } = streamMessage; - workerContext.queue.push(generateQueueMessage(workerContext, Number(message.block_height), id)); - } + for (const streamMessage of messages) { + const { id, message } = streamMessage; + workerContext.queue.push(generateQueueMessage(workerContext, Number(message.block_height), id)); + } - streamMessageStartId = incrementId(messages[messages.length - 1].id); + streamMessageStartId = messages[messages.length - 1].id; + } catch (err) { + console.error('Error fetching stream messages', err); + await sleep(500); + } } } @@ -85,7 +85,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri const indexer = new Indexer(); const isHistorical = workerContext.streamType === 'historical'; let streamMessageId = ''; - let indexerName = ''; + let indexerName = streamKey.split(':')[0]; let currBlockHeight = 0; while (true) { @@ -114,6 +114,8 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri } const block = queueMessage.block; currBlockHeight = block.blockHeight; + const blockHeightMessage: WorkerMessage = { type: WorkerMessageType.BLOCK_HEIGHT, data: currBlockHeight }; + parentPort?.postMessage(blockHeightMessage); streamMessageId = queueMessage.streamMessageId; if (block === undefined || block.blockHeight == null) { @@ -136,8 +138,13 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri } finally { const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(streamKey); METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessageCount); + const memoryUsage = process.memoryUsage(); + METRICS.HEAP_TOTAL_ALLOCATION.labels({ indexer: indexerName, type: workerContext.streamType }).set(memoryUsage.heapTotal / (1024 * 1024)); + METRICS.HEAP_USED.labels({ indexer: indexerName, type: workerContext.streamType }).set(memoryUsage.heapUsed / (1024 * 1024)); + METRICS.PREFETCH_QUEUE_COUNT.labels({ indexer: indexerName, type: workerContext.streamType }).set(workerContext.queue.length); - parentPort?.postMessage(await promClient.register.getMetricsAsJSON()); + const metricsMessage: WorkerMessage = { type: WorkerMessageType.METRICS, data: await promClient.register.getMetricsAsJSON() }; + parentPort?.postMessage(metricsMessage); } } }