From 9ef1f298e7fbae89f8d47544b799760c26fdd524 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Wed, 7 Feb 2024 08:07:12 +1300 Subject: [PATCH] feat: Auto migrate indexers to Control Plane (#527) This PR builds on the Redis `allowlist` to auto migrate indexers from the existing infrastructure to the Control Plane. An account migration requires coordination between both the V1 and V2 architecture - the indexer must be removed/ignored from V1, and then correctly configured within V2. ## Allowlist shape Each `allowlist` entry now contain the following properties: ```rs pub struct AllowlistEntry { account_id: AccountId, // The account which should be migrated v1_ack: bool, // True if Coordinator V1 has acknowledged the entry migrated: bool, // True if the migration was successful failed: bool, // True if the migration failed } ``` ## Coordinator V1 For Coordinator V1, the `allowlist` is really a Denylist, the code/types have therefore been named as such. Accounts within the "denylist" should be ignored completely by V1. Because we cannot guarantee the timing of when this "ignore" actually happens, a flag (`v1_ack`) will be set from V1. V2 will only take over once this flag has been set. Accounts within the "denylist" will be filtered out of the in-memory registry. Any new indexer "registrations" will also be ignored. In-progress historical backfills haven't been considered as we'll disable this functionality anyway. ## Coordinator V2 Once acknowledged by V1, Coordinator V2 will attempt to migrate all functions under the relevant account. The steps for migration are: 1. Remove the streams from the Redis `streams` set - preventing Runner from starting these indexers implicitly 2. Stop the existing V1 executors which have already been started via the `streams` set 3. Merge the existing historical (if it exists) and real-time streams Once migrated, accounts which have `v1_ack && migrated && !failed` will be exposed to the control loop, prompting V2 to act on these indexers. ### `migrated` flag For now, the `migrated` flag will not be set on success, preventing V2 from running the indexer on the new architecture. There are some issues around V2 continuing from the right block correctly, so it's best to not run them for now. This allows us to test the migration in isolation, not worrying about what V2 does after that. I'll add this logic back in once https://github.com/near/queryapi/issues/536 is complete. ### `failed` flag If any part of the migration fails, the `failed` flag will be set for that account. It would take a significant amount of time to cover all the edge cases in code so it would be faster to just set this flag to ignore the account, fix the migration manually and then reset the `failed` flag. --- coordinator/src/main.rs | 43 +- coordinator/src/migration.rs | 475 ++++++++++++++++++ coordinator/src/redis.rs | 113 ++++- coordinator/src/registry.rs | 8 + .../src/indexer_registry.rs | 24 +- indexer/queryapi_coordinator/src/main.rs | 106 +++- indexer/storage/src/lib.rs | 28 +- 7 files changed, 754 insertions(+), 43 deletions(-) create mode 100644 coordinator/src/migration.rs diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 7355b03e4..25dc9fc49 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use anyhow::Context; use near_primitives::types::AccountId; use tokio::time::sleep; use tracing_subscriber::prelude::*; @@ -12,6 +11,7 @@ use crate::registry::{IndexerRegistry, Registry}; mod block_streams_handler; mod executors_handler; +mod migration; mod redis; mod registry; mod utils; @@ -53,8 +53,18 @@ async fn main() -> anyhow::Result<()> { loop { let indexer_registry = registry.fetch().await?; + let allowlist = migration::fetch_allowlist(&redis_client).await?; + + migration::migrate_pending_accounts( + &indexer_registry, + &allowlist, + &redis_client, + &executors_handler, + ) + .await?; + let indexer_registry = - filter_registry_by_allowlist(indexer_registry, &redis_client).await?; + migration::filter_registry_by_allowlist(indexer_registry, &allowlist).await?; tokio::try_join!( synchronise_executors(&indexer_registry, &executors_handler), @@ -67,35 +77,6 @@ async fn main() -> anyhow::Result<()> { } } -#[derive(serde::Deserialize, Debug)] -struct AllowListEntry { - account_id: AccountId, -} - -type AllowList = Vec; - -async fn filter_registry_by_allowlist( - indexer_registry: IndexerRegistry, - redis_client: &RedisClient, -) -> anyhow::Result { - let raw_allowlist: String = redis_client.get(String::from("allowlist")).await?; - let allowlist: AllowList = - serde_json::from_str(&raw_allowlist).context("Failed to parse allowlist")?; - - let filtered_registry = indexer_registry - .into_iter() - .filter(|(account_id, _)| { - allowlist - .iter() - .any(|entry| entry.account_id == *account_id) - }) - .collect(); - - tracing::debug!("Using filtered registry: {:#?}", filtered_registry); - - Ok(filtered_registry) -} - async fn synchronise_executors( indexer_registry: &IndexerRegistry, executors_handler: &ExecutorsHandler, diff --git a/coordinator/src/migration.rs b/coordinator/src/migration.rs new file mode 100644 index 000000000..d45763f02 --- /dev/null +++ b/coordinator/src/migration.rs @@ -0,0 +1,475 @@ +use std::collections::HashMap; + +use anyhow::Context; +use near_primitives::types::AccountId; +use redis::{ErrorKind, RedisError}; + +use crate::executors_handler::ExecutorsHandler; +use crate::redis::RedisClient; +use crate::registry::{IndexerConfig, IndexerRegistry}; + +#[derive(serde::Deserialize, serde::Serialize, Debug)] +pub struct AllowlistEntry { + account_id: AccountId, + v1_ack: bool, + migrated: bool, + failed: bool, +} + +pub type Allowlist = Vec; + +pub async fn fetch_allowlist(redis_client: &RedisClient) -> anyhow::Result { + let raw_allowlist: String = redis_client.get(RedisClient::ALLOWLIST).await?; + serde_json::from_str(&raw_allowlist).context("Failed to parse allowlist") +} + +pub async fn filter_registry_by_allowlist( + indexer_registry: IndexerRegistry, + allowlist: &Allowlist, +) -> anyhow::Result { + let filtered_registry: IndexerRegistry = indexer_registry + .into_iter() + .filter(|(account_id, _)| { + allowlist.iter().any(|entry| { + entry.account_id == *account_id && entry.v1_ack && entry.migrated && !entry.failed + }) + }) + .collect(); + + tracing::debug!( + "Accounts in filtered registry: {:#?}", + filtered_registry.keys() + ); + + Ok(filtered_registry) +} + +pub async fn migrate_pending_accounts( + indexer_registry: &IndexerRegistry, + allowlist: &Allowlist, + redis_client: &RedisClient, + executors_handler: &ExecutorsHandler, +) -> anyhow::Result<()> { + for entry in allowlist + .iter() + .filter(|entry| !entry.migrated && entry.v1_ack && !entry.failed) + { + let indexers = indexer_registry.get(&entry.account_id); + + if indexers.is_none() { + tracing::warn!( + "Allowlist entry for account {} not in registry", + entry.account_id + ); + + continue; + } + + let _ = migrate_account( + redis_client, + executors_handler, + &entry.account_id, + indexers.unwrap(), + ) + .await + .or_else(|err| { + tracing::error!("Failed to migrate {}: {:?}", entry.account_id, err); + + set_failed_flag(redis_client, &entry.account_id) + }); + } + + Ok(()) +} + +async fn migrate_account( + redis_client: &RedisClient, + executors_handler: &ExecutorsHandler, + account_id: &AccountId, + indexers: &HashMap, +) -> anyhow::Result<()> { + tracing::info!("Migrating account {}", account_id); + + for (_, indexer_config) in indexers.iter() { + tracing::info!("Migrating {}", indexer_config.get_full_name()); + + let existing_streams = remove_from_streams_set(redis_client, indexer_config) + .await + .context("Failed to remove from streams set")?; + stop_v1_executors(executors_handler, &existing_streams) + .await + .context("Failed to stop executors")?; + merge_streams(redis_client, &existing_streams, indexer_config) + .await + .context("Failed to merge streams")?; + } + + // TODO Uncomment when V2 correctly continues from V1 stop point + // set_migrated_flag(redis_client, account_id)?; + + tracing::info!("Finished migrating {}", account_id); + + Ok(()) +} + +async fn remove_from_streams_set( + redis_client: &RedisClient, + indexer_config: &IndexerConfig, +) -> anyhow::Result> { + let mut result = vec![]; + + if redis_client + .srem( + RedisClient::STREAMS_SET, + indexer_config.get_historical_redis_stream(), + ) + .await? + .is_some() + { + result.push(indexer_config.get_historical_redis_stream()); + } + + if redis_client + .srem( + RedisClient::STREAMS_SET, + indexer_config.get_real_time_redis_stream(), + ) + .await? + .is_some() + { + result.push(indexer_config.get_real_time_redis_stream()); + }; + + Ok(result) +} + +async fn stop_v1_executors( + executors_handler: &ExecutorsHandler, + existing_streams: &Vec, +) -> anyhow::Result<()> { + for stream in existing_streams { + executors_handler.stop(stream.to_owned()).await?; + } + + Ok(()) +} + +async fn merge_streams( + redis_client: &RedisClient, + existing_streams: &Vec, + indexer_config: &IndexerConfig, +) -> anyhow::Result<()> { + match existing_streams.len() { + 0 => Ok(()), + 1 => { + redis_client + .rename( + existing_streams[0].to_owned(), + indexer_config.get_redis_stream(), + ) + .await?; + + Ok(()) + } + 2 => { + let historical_stream = existing_streams[0].to_owned(); + let real_time_stream = existing_streams[1].to_owned(); + + redis_client + .rename(historical_stream, indexer_config.get_redis_stream()) + .await?; + + loop { + let stream_ids = redis_client.xread(real_time_stream.clone(), 0, 100).await?; + + if stream_ids.is_empty() { + break; + } + + for stream_id in stream_ids { + let fields: Vec<(_, _)> = stream_id + .map + .into_iter() + .filter_map(|field| { + if let ::redis::Value::Data(data) = field.1 { + return Some((field.0, String::from_utf8(data).unwrap())); + } + + tracing::warn!("Ignoring unexpected value in stream: {:?}", field.1); + + None + }) + .collect(); + + redis_client + .xadd(indexer_config.get_redis_stream(), &fields) + .await?; + redis_client + .xdel(indexer_config.get_real_time_redis_stream(), stream_id.id) + .await? + } + } + + Ok(()) + } + _ => anyhow::bail!("Unexpected number of pre-existing streams"), + } +} + +fn set_failed_flag(redis_client: &RedisClient, account_id: &AccountId) -> anyhow::Result<()> { + let account_id = account_id.to_owned(); + + redis_client.atomic_update(RedisClient::ALLOWLIST, move |raw_allowlist: String| { + let mut allowlist: Allowlist = serde_json::from_str(&raw_allowlist).map_err(|_| { + RedisError::from((ErrorKind::TypeError, "failed to deserialize allowlist")) + })?; + + let entry = allowlist + .iter_mut() + .find(|entry| entry.account_id == account_id) + .unwrap(); + + entry.failed = true; + + serde_json::to_string(&allowlist) + .map_err(|_| RedisError::from((ErrorKind::TypeError, "failed to serialize allowlist"))) + })?; + + Ok(()) +} + +fn set_migrated_flag(redis_client: &RedisClient, account_id: &AccountId) -> anyhow::Result<()> { + let account_id = account_id.to_owned(); + + redis_client.atomic_update(RedisClient::ALLOWLIST, move |raw_allowlist: String| { + let mut allowlist: Allowlist = serde_json::from_str(&raw_allowlist).map_err(|_| { + RedisError::from((ErrorKind::TypeError, "failed to deserialize allowlist")) + })?; + + let entry = allowlist + .iter_mut() + .find(|entry| entry.account_id == account_id) + .unwrap(); + + entry.migrated = true; + + serde_json::to_string(&allowlist) + .map_err(|_| RedisError::from((ErrorKind::TypeError, "failed to serialize allowlist"))) + })?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::HashMap; + + use mockall::predicate; + use registry_types::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; + + use crate::registry::IndexerConfig; + + #[tokio::test] + async fn ignores_migrated_indexers() { + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([( + "test".to_string(), + IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: Some(String::new()), + filter: IndexerRule { + id: None, + name: None, + indexer_rule_kind: IndexerRuleKind::Action, + matching_rule: MatchingRule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + }, + created_at_block_height: 101, + updated_at_block_height: Some(200), + start_block_height: Some(1000), + }, + )]), + )]); + + let allowlist = vec![AllowlistEntry { + account_id: "morgs.near".parse().unwrap(), + v1_ack: true, + migrated: true, + failed: false, + }]; + + let redis_client = RedisClient::default(); + let executors_handler = ExecutorsHandler::default(); + + migrate_pending_accounts( + &indexer_registry, + &allowlist, + &redis_client, + &executors_handler, + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn ignores_indexers_not_in_regsitry() { + let indexer_registry = HashMap::from([]); + + let allowlist = vec![AllowlistEntry { + account_id: "morgs.near".parse().unwrap(), + v1_ack: true, + migrated: true, + failed: false, + }]; + + let redis_client = RedisClient::default(); + let executors_handler = ExecutorsHandler::default(); + + migrate_pending_accounts( + &indexer_registry, + &allowlist, + &redis_client, + &executors_handler, + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn migrates_indexers_to_control_plane() { + let indexer_registry = HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([( + "test".to_string(), + IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: Some(String::new()), + filter: IndexerRule { + id: None, + name: None, + indexer_rule_kind: IndexerRuleKind::Action, + matching_rule: MatchingRule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + }, + created_at_block_height: 101, + updated_at_block_height: Some(200), + start_block_height: Some(1000), + }, + )]), + )]); + + let allowlist = vec![AllowlistEntry { + account_id: "morgs.near".parse().unwrap(), + v1_ack: true, + migrated: false, + failed: false, + }]; + + let mut redis_client = RedisClient::default(); + redis_client + .expect_srem::<&str, String>() + .with( + predicate::eq("streams"), + predicate::eq(String::from("morgs.near/test:historical:stream")), + ) + .returning(|_, _| Ok(Some(()))) + .once(); + redis_client + .expect_srem::<&str, String>() + .with( + predicate::eq("streams"), + predicate::eq(String::from("morgs.near/test:real_time:stream")), + ) + .returning(|_, _| Ok(Some(()))) + .once(); + redis_client + .expect_rename::() + .with( + predicate::eq(String::from("morgs.near/test:historical:stream")), + predicate::eq(String::from("morgs.near/test:block_stream")), + ) + .returning(|_, _| Ok(())) + .once(); + redis_client + .expect_xread::() + .with( + predicate::eq(String::from("morgs.near/test:real_time:stream")), + predicate::eq(0), + predicate::eq(100), + ) + .returning(|_, _, _| { + Ok(vec![::redis::streams::StreamId { + id: String::from("1-0"), + map: HashMap::from([( + String::from("block_height"), + ::redis::Value::Data(b"123".to_vec()), + )]), + }]) + }) + .once(); + redis_client + .expect_xread::() + .with( + predicate::eq(String::from("morgs.near/test:real_time:stream")), + predicate::eq(0), + predicate::eq(100), + ) + .returning(|_, _, _| Ok(vec![])) + .once(); + redis_client + .expect_xadd::() + .with( + predicate::eq(String::from("morgs.near/test:block_stream")), + predicate::eq([(String::from("block_height"), String::from("123"))]), + ) + .returning(|_, _| Ok(())) + .once(); + redis_client + .expect_xdel::() + .with( + predicate::eq(String::from("morgs.near/test:real_time:stream")), + predicate::eq(String::from("1-0")), + ) + .returning(|_, _| Ok(())) + .once(); + redis_client + .expect_atomic_update::<&str, String, String>() + .returning(|_, _| Ok(())); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_stop() + .with(predicate::eq(String::from( + "morgs.near/test:historical:stream", + ))) + .returning(|_| Ok(())) + .once(); + executors_handler + .expect_stop() + .with(predicate::eq(String::from( + "morgs.near/test:real_time:stream", + ))) + .returning(|_| Ok(())) + .once(); + + migrate_pending_accounts( + &indexer_registry, + &allowlist, + &redis_client, + &executors_handler, + ) + .await + .unwrap(); + } +} diff --git a/coordinator/src/redis.rs b/coordinator/src/redis.rs index f0a2826eb..c2df6d826 100644 --- a/coordinator/src/redis.rs +++ b/coordinator/src/redis.rs @@ -3,7 +3,9 @@ use std::fmt::Debug; use anyhow::Context; -use redis::{aio::ConnectionManager, FromRedisValue, ToRedisArgs}; +use redis::{ + aio::ConnectionManager, streams, AsyncCommands, FromRedisValue, RedisResult, ToRedisArgs, +}; #[cfg(test)] pub use MockRedisClientImpl as RedisClient; @@ -12,17 +14,24 @@ pub use RedisClientImpl as RedisClient; pub struct RedisClientImpl { connection: ConnectionManager, + url: String, } #[cfg_attr(test, mockall::automock)] impl RedisClientImpl { + pub const STREAMS_SET: &str = "streams"; + pub const ALLOWLIST: &str = "allowlist"; + pub async fn connect(redis_url: &str) -> anyhow::Result { let connection = redis::Client::open(redis_url)? .get_connection_manager() .await .context("Unable to connect to Redis")?; - Ok(Self { connection }) + Ok(Self { + connection, + url: redis_url.to_string(), + }) } pub async fn get(&self, key: T) -> anyhow::Result @@ -40,4 +49,104 @@ impl RedisClientImpl { Ok(value) } + + pub async fn rename(&self, old_key: K, new_key: V) -> anyhow::Result<()> + where + K: ToRedisArgs + Debug + Send + Sync + 'static, + V: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("RENAME: {:?} -> {:?}", old_key, new_key); + + self.connection.clone().rename(old_key, new_key).await?; + + Ok(()) + } + + pub async fn srem(&self, key: T, value: U) -> anyhow::Result> + where + T: ToRedisArgs + Debug + Send + Sync + 'static, + U: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("SREM: {:?}={:?}", key, value); + + match self.connection.clone().srem(key, value).await { + Ok(1) => Ok(Some(())), + Ok(_) => Ok(None), + Err(e) => Err(anyhow::format_err!(e)), + } + } + + pub async fn xread( + &self, + key: K, + start_id: V, + count: usize, + ) -> anyhow::Result> + where + K: ToRedisArgs + Debug + Send + Sync + 'static, + V: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("XREAD: {:?} {:?} {:?}", key, start_id, count); + + let mut results: streams::StreamReadReply = self + .connection + .clone() + .xread_options( + &[key], + &[start_id], + &streams::StreamReadOptions::default().count(count), + ) + .await?; + + if results.keys.is_empty() { + return Ok([].to_vec()); + } + + Ok(results.keys.remove(0).ids) + } + + pub async fn xadd(&self, key: K, fields: &[(String, U)]) -> anyhow::Result<()> + where + K: ToRedisArgs + Debug + Send + Sync + 'static, + U: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("XADD: {:?} {:?} {:?}", key, "*", fields); + + self.connection.clone().xadd(key, "*", fields).await?; + + Ok(()) + } + + pub async fn xdel(&self, key: K, id: I) -> anyhow::Result<()> + where + K: ToRedisArgs + Debug + Send + Sync + 'static, + I: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("XDEL: {:?} {:?}", key, id); + + self.connection.clone().xdel(key, &[id]).await?; + + Ok(()) + } + + // `redis::transaction`s currently don't work with async connections, so we have to create a _new_ + // blocking connection to atmoically update a value. + pub fn atomic_update(&self, key: K, update_fn: F) -> anyhow::Result<()> + where + K: ToRedisArgs + Copy + 'static, + O: FromRedisValue + 'static, + N: ToRedisArgs + 'static, + F: Fn(O) -> RedisResult + 'static, + { + let mut conn = redis::Client::open(self.url.clone())?.get_connection()?; + + redis::transaction(&mut conn, &[key], |conn, pipe| { + let old_value = redis::cmd("GET").arg(key).query(conn)?; + let new_value = update_fn(old_value)?; + + pipe.cmd("SET").arg(key).arg(new_value).query(conn) + })?; + + Ok(()) + } } diff --git a/coordinator/src/registry.rs b/coordinator/src/registry.rs index e1a659692..db13c77af 100644 --- a/coordinator/src/registry.rs +++ b/coordinator/src/registry.rs @@ -34,6 +34,14 @@ impl IndexerConfig { pub fn get_redis_stream(&self) -> String { format!("{}:block_stream", self.get_full_name()) } + + pub fn get_historical_redis_stream(&self) -> String { + format!("{}:historical:stream", self.get_full_name()) + } + + pub fn get_real_time_redis_stream(&self) -> String { + format!("{}:real_time:stream", self.get_full_name()) + } } #[cfg(test)] diff --git a/indexer/queryapi_coordinator/src/indexer_registry.rs b/indexer/queryapi_coordinator/src/indexer_registry.rs index b3c695fb4..3df9cdd0b 100644 --- a/indexer/queryapi_coordinator/src/indexer_registry.rs +++ b/indexer/queryapi_coordinator/src/indexer_registry.rs @@ -89,15 +89,17 @@ pub(crate) fn build_registry_from_json(raw_registry: Value) -> IndexerRegistry { pub(crate) async fn index_registry_changes( current_block_height: BlockHeight, context: &QueryApiContext<'_>, + denylist: &crate::Denylist, ) -> anyhow::Result<()> { index_and_process_remove_calls(context).await; - index_and_process_register_calls(current_block_height, context).await + index_and_process_register_calls(current_block_height, context, denylist).await } async fn index_and_process_register_calls( current_block_height: BlockHeight, context: &QueryApiContext<'_>, + denylist: &crate::Denylist, ) -> anyhow::Result<()> { let registry_method_name = "register_indexer_function"; let registry_calls_rule = @@ -119,6 +121,26 @@ async fn index_and_process_register_calls( match new_indexer_function { None => continue, Some(mut new_indexer_function) => { + let account_in_deny_list = denylist + .iter() + .find(|entry| entry.account_id == new_indexer_function.account_id); + + if let Some(account_in_deny_list) = account_in_deny_list { + tracing::info!( + "Ignoring registered indexer {} from deny list", + new_indexer_function.get_full_name() + ); + + if !account_in_deny_list.v1_ack { + crate::acknowledge_account_in_denylist( + new_indexer_function.account_id, + context.redis_url, + )?; + } + + continue; + } + let mut indexer_registry_lock = context.indexer_registry.lock().await; let fns = indexer_registry_lock .entry(new_indexer_function.account_id.clone()) diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index d78c96066..96511f544 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -1,11 +1,13 @@ use std::collections::HashMap; +use anyhow::Context; use futures::stream::{self, StreamExt}; use near_jsonrpc_client::JsonRpcClient; +use storage::redis::{ErrorKind, RedisError}; use tokio::sync::Mutex; use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatch}; -use near_lake_framework::near_indexer_primitives::types::BlockHeight; +use near_lake_framework::near_indexer_primitives::types::{AccountId, BlockHeight}; use near_lake_framework::near_indexer_primitives::StreamerMessage; use utils::serialize_to_camel_case_json_string; @@ -38,8 +40,17 @@ pub(crate) struct QueryApiContext<'a> { pub redis_connection_manager: &'a ConnectionManager, pub indexer_registry: &'a SharedIndexerRegistry, pub streamers: &'a Streamers, + pub redis_url: &'a str, } +#[derive(serde::Deserialize, serde::Serialize, Debug)] +struct DenylistEntry { + account_id: AccountId, + v1_ack: bool, +} + +type Denylist = Vec; + #[tokio::main] async fn main() -> anyhow::Result<()> { opts::init_tracing(); @@ -57,6 +68,9 @@ async fn main() -> anyhow::Result<()> { tracing::info!(target: INDEXER, "Connecting to redis..."); let redis_connection_manager = storage::connect(&opts.redis_connection_string).await?; + let denylist = fetch_denylist(&redis_connection_manager).await?; + tracing::info!("Using denylist: {:#?}", denylist); + let json_rpc_client = JsonRpcClient::connect(opts.rpc_url()); // fetch raw indexer functions for use in indexer @@ -71,6 +85,10 @@ async fn main() -> anyhow::Result<()> { ) .await; let indexer_functions = indexer_registry::build_registry_from_json(indexer_functions); + let indexer_functions = + filter_registry_by_denylist(indexer_functions, &denylist, &opts.redis_connection_string) + .await; + let indexer_registry: SharedIndexerRegistry = std::sync::Arc::new(Mutex::new(indexer_functions)); @@ -97,6 +115,7 @@ async fn main() -> anyhow::Result<()> { s3_client: &s3_client, indexer_registry: &indexer_registry, streamers: &streamers, + redis_url: &opts.redis_connection_string, }; handle_streamer_message(context) @@ -118,14 +137,85 @@ async fn main() -> anyhow::Result<()> { } } +async fn fetch_denylist(redis_connection_manager: &ConnectionManager) -> anyhow::Result { + let raw_denylist: String = + storage::get(redis_connection_manager, storage::DENYLIST_KEY).await?; + let denylist: Denylist = + serde_json::from_str(&raw_denylist).context("Failed to parse denylist")?; + + Ok(denylist) +} + +fn acknowledge_account_in_denylist(account_id: AccountId, redis_url: &str) -> anyhow::Result<()> { + storage::atomic_update( + redis_url, + &[storage::DENYLIST_KEY], + move |raw_denylist: String| { + let mut denylist: Denylist = serde_json::from_str(&raw_denylist).map_err(|_| { + RedisError::from((ErrorKind::TypeError, "failed to deserialize denylist")) + })?; + + let entry = denylist + .iter_mut() + .find(|entry| entry.account_id == account_id) + .unwrap(); + + entry.v1_ack = true; + + serde_json::to_string(&denylist).map_err(|_| { + RedisError::from((ErrorKind::TypeError, "failed to serialize denylist")) + }) + }, + ) +} + +async fn filter_registry_by_denylist( + indexer_registry: IndexerRegistry, + denylist: &Denylist, + redis_url: &str, +) -> IndexerRegistry { + let mut filtered_registry = HashMap::new(); + + for (account_id, indexer) in indexer_registry.into_iter() { + let account_in_deny_list = denylist.iter().find(|entry| entry.account_id == account_id); + + match account_in_deny_list { + Some(account_in_deny_list) => { + tracing::info!( + target: INDEXER, + "Ignoring {account_id} from denylist", + ); + + if !account_in_deny_list.v1_ack { + acknowledge_account_in_denylist(account_id, redis_url).unwrap(); + } + + continue; + } + None => { + filtered_registry.insert(account_id, indexer); + } + } + } + + filtered_registry +} + async fn handle_streamer_message(context: QueryApiContext<'_>) -> anyhow::Result { - let indexer_functions = { - let lock = context.indexer_registry.lock().await; + let denylist = fetch_denylist(context.redis_connection_manager).await?; + + let indexer_functions: Vec = { + let mut indexer_registry = context.indexer_registry.lock().await; + + *indexer_registry = + filter_registry_by_denylist(indexer_registry.clone(), &denylist, context.redis_url) + .await; - lock.values() - .flat_map(|fns| fns.values()) - .cloned() - .collect::>() + indexer_registry + .clone() + .into_values() + .flat_map(|fns| fns.into_values()) + .collect() }; let mut indexer_function_filter_matches_futures = stream::iter(indexer_functions.iter()) @@ -150,7 +240,7 @@ async fn handle_streamer_message(context: QueryApiContext<'_>) -> anyhow::Result ) .await?; - indexer_registry::index_registry_changes(block_height, &context).await?; + indexer_registry::index_registry_changes(block_height, &context, &denylist).await?; while let Some(indexer_function_with_matches) = indexer_function_filter_matches_futures.next().await diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 3130f964b..13b7cf72e 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -1,9 +1,14 @@ -pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; +pub use redis::{ + self, aio::ConnectionManager, ErrorKind, FromRedisValue, RedisError, RedisResult, ToRedisArgs, +}; const STORAGE: &str = "storage_alertexer"; pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; pub const STREAMS_SET_KEY: &str = "streams"; +/// Corresponds to the `allowlist` key in Redis. Accounts within this list will be executed on the +/// V2 architecture, hence allow, and will be ignored on the V1 (this) architecture, hence deny. +pub const DENYLIST_KEY: &str = "allowlist"; pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") @@ -198,3 +203,24 @@ pub async fn get_last_indexed_block( .query_async(&mut redis_connection_manager.clone()) .await?) } + +// `redis::transaction`s currently don't work with async connections, so we have to create a _new_ +// blocking connection to atmoically update a value. +pub fn atomic_update(redis_url: &str, key: K, update_fn: F) -> anyhow::Result<()> +where + K: ToRedisArgs + Copy + 'static, + O: FromRedisValue + 'static, + N: ToRedisArgs + 'static, + F: Fn(O) -> RedisResult + 'static, +{ + let mut conn = redis::Client::open(redis_url)?.get_connection()?; + + redis::transaction(&mut conn, &[key], |conn, pipe| { + let old_value = redis::cmd("GET").arg(key).query(conn)?; + let new_value = update_fn(old_value)?; + + pipe.cmd("SET").arg(key).arg(new_value).query(conn) + })?; + + Ok(()) +}