From 6038fe6ad7c5991fb2612b9886a2ae5fb4dbbf43 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Fri, 8 Mar 2024 14:21:02 -0800 Subject: [PATCH] fix: Cap Block Streamer Caching and Merge Redis Impl (#599) Redis memory usage is pegged at 100% due to Block Streamer backfilling * contract filter indexers. In addition, Redis implementation of Coordinator and Block Streamer have diverged. Coordinator is also not able to make successful calls to Redis. While not a targeted fix, the hope is that merging the implementation fixes the issue as Block Streamer is able to make calls to Redis. Before release, some manual intervention is necessary in dev and prod to update versions of indexers to the current, to prevent unwanted behavior from coordinator. I will do dev first, release the change, and then do prod before a prod release. --- block-streamer/src/block_stream.rs | 87 +++++++++- block-streamer/src/redis.rs | 21 +++ coordinator/src/block_streams/synchronise.rs | 7 +- coordinator/src/indexer_config.rs | 8 - coordinator/src/redis.rs | 163 ++----------------- 5 files changed, 125 insertions(+), 161 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 0c3f81acd..6d79a11a2 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -10,6 +10,7 @@ use registry_types::Rule; /// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but /// we need this configurable for testing purposes. const LAKE_PREFETCH_SIZE: usize = 100; +const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100; const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"]; pub struct Task { @@ -285,9 +286,15 @@ async fn process_near_lake_blocks( ); if !matches.is_empty() { - redis_client - .cache_streamer_message(&streamer_message) - .await?; + if let Ok(Some(stream_length)) = + redis_client.get_stream_length(redis_stream.clone()).await + { + if stream_length <= MAX_STREAM_SIZE_WITH_CACHE { + redis_client + .cache_streamer_message(&streamer_message) + .await?; + } + } redis_client .publish_block(indexer, redis_stream.clone(), block_height) @@ -346,6 +353,10 @@ mod tests { .expect_cache_streamer_message() .with(predicate::always()) .returning(|_| Ok(())); + mock_redis_client + .expect_get_stream_length() + .with(predicate::eq("stream key".to_string())) + .returning(|_| Ok(Some(10))); let indexer_config = crate::indexer_config::IndexerConfig { account_id: near_indexer_primitives::types::AccountId::try_from( @@ -375,6 +386,76 @@ mod tests { .unwrap(); } + #[tokio::test] + async fn skips_caching_of_lake_block_over_stream_size_limit() { + let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default(); + mock_delta_lake_client + .expect_get_latest_block_metadata() + .returning(|| { + Ok(crate::delta_lake_client::LatestBlockMetadata { + last_indexed_block: "107503700".to_string(), + processed_at_utc: "".to_string(), + first_indexed_block: "".to_string(), + last_indexed_block_date: "".to_string(), + first_indexed_block_date: "".to_string(), + }) + }); + + let mut mock_redis_client = crate::redis::RedisClient::default(); + mock_redis_client + .expect_publish_block() + .with( + predicate::always(), + predicate::eq("stream key".to_string()), + predicate::in_iter([107503705]), + ) + .returning(|_, _, _| Ok(())) + .times(1); + mock_redis_client + .expect_set_last_processed_block() + .with( + predicate::always(), + predicate::in_iter([107503704, 107503705]), + ) + .returning(|_, _| Ok(())) + .times(2); + mock_redis_client + .expect_cache_streamer_message() + .with(predicate::always()) + .never(); + mock_redis_client + .expect_get_stream_length() + .with(predicate::eq("stream key".to_string())) + .returning(|_| Ok(Some(200))); + + let indexer_config = crate::indexer_config::IndexerConfig { + account_id: near_indexer_primitives::types::AccountId::try_from( + "morgs.near".to_string(), + ) + .unwrap(), + function_name: "test".to_string(), + rule: registry_types::Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: registry_types::Status::Success, + }, + }; + + let lake_s3_config = crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); + + start_block_stream( + 107503704, + &indexer_config, + std::sync::Arc::new(mock_redis_client), + std::sync::Arc::new(mock_delta_lake_client), + lake_s3_config, + &ChainId::Mainnet, + 1, + "stream key".to_string(), + ) + .await + .unwrap(); + } + #[tokio::test] async fn skips_delta_lake_for_star_filter() { let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default(); diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index 0813d2238..254e0c9db 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -49,6 +49,23 @@ impl RedisClientImpl { Ok(()) } + pub async fn xlen(&self, stream_key: T) -> anyhow::Result> + where + T: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("XLEN: {:?}", stream_key); + + let mut cmd = redis::cmd("XLEN"); + cmd.arg(&stream_key); + + let stream_length = cmd + .query_async(&mut self.connection.clone()) + .await + .context(format!("XLEN {stream_key:?}"))?; + + Ok(stream_length) + } + pub async fn set(&self, key: T, value: U) -> Result<(), RedisError> where T: ToRedisArgs + Debug + Send + Sync + 'static, @@ -97,6 +114,10 @@ impl RedisClientImpl { .context("Failed to set last processed block") } + pub async fn get_stream_length(&self, stream: String) -> anyhow::Result> { + self.xlen(stream).await + } + pub async fn cache_streamer_message( &self, streamer_message: &near_lake_framework::near_indexer_primitives::StreamerMessage, diff --git a/coordinator/src/block_streams/synchronise.rs b/coordinator/src/block_streams/synchronise.rs index 29996d64a..c0f21c588 100644 --- a/coordinator/src/block_streams/synchronise.rs +++ b/coordinator/src/block_streams/synchronise.rs @@ -95,6 +95,11 @@ async fn synchronise_block_stream( let start_block_height = determine_start_block_height(&stream_status, indexer_config, redis_client).await?; + tracing::info!( + "Starting new block stream starting at block {}", + start_block_height + ); + block_streams_handler .start(start_block_height, indexer_config) .await?; @@ -164,8 +169,6 @@ async fn determine_start_block_height( return get_continuation_block_height(indexer_config, redis_client).await; } - tracing::info!(start_block = ?indexer_config.start_block, "Stating new block stream"); - match indexer_config.start_block { StartBlock::Latest => Ok(indexer_config.get_registry_version()), StartBlock::Height(height) => Ok(height), diff --git a/coordinator/src/indexer_config.rs b/coordinator/src/indexer_config.rs index e7b049281..1b2b95e21 100644 --- a/coordinator/src/indexer_config.rs +++ b/coordinator/src/indexer_config.rs @@ -22,14 +22,6 @@ impl IndexerConfig { format!("{}:block_stream", self.get_full_name()) } - pub fn get_historical_redis_stream_key(&self) -> String { - format!("{}:historical:stream", self.get_full_name()) - } - - pub fn get_real_time_redis_stream_key(&self) -> String { - format!("{}:real_time:stream", self.get_full_name()) - } - pub fn get_last_published_block_key(&self) -> String { format!("{}:last_published_block", self.get_full_name()) } diff --git a/coordinator/src/redis.rs b/coordinator/src/redis.rs index a5c26d28a..4f3b15b6e 100644 --- a/coordinator/src/redis.rs +++ b/coordinator/src/redis.rs @@ -3,9 +3,7 @@ use std::fmt::Debug; use anyhow::Context; -use redis::{ - aio::ConnectionManager, streams, AsyncCommands, FromRedisValue, RedisResult, ToRedisArgs, -}; +use redis::{aio::ConnectionManager, AsyncCommands, FromRedisValue, ToRedisArgs}; use crate::indexer_config::IndexerConfig; @@ -21,9 +19,6 @@ pub struct RedisClientImpl { #[cfg_attr(test, mockall::automock)] impl RedisClientImpl { - pub const STREAMS_SET: &str = "streams"; - pub const ALLOWLIST: &str = "allowlist"; - pub async fn connect(redis_url: &str) -> anyhow::Result { let connection = redis::Client::open(redis_url)? .get_connection_manager() @@ -41,171 +36,43 @@ impl RedisClientImpl { T: ToRedisArgs + Debug + Send + Sync + 'static, U: FromRedisValue + Debug + 'static, { - let value: Option = self - .connection - .clone() - .get(&key) + let mut cmd = redis::cmd("GET"); + cmd.arg(&key); + let value = cmd + .query_async(&mut self.connection.clone()) .await .context(format!("GET: {key:?}"))?; - tracing::debug!("GET: {:?}={:?}", key, value); + tracing::debug!("GET: {:?}={:?}", key, &value); Ok(value) } - pub async fn set(&self, key: K, value: V) -> anyhow::Result<()> - where - K: ToRedisArgs + Debug + Send + Sync + 'static, - V: ToRedisArgs + Debug + Send + Sync + 'static, - { - tracing::debug!("SET: {key:?} {value:?}"); - - self.connection - .clone() - .set(&key, &value) - .await - .context(format!("SET: {key:?} {value:?}"))?; - Ok(()) - } - - pub async fn rename(&self, old_key: K, new_key: V) -> anyhow::Result<()> + pub async fn set(&self, key: K, value: V) -> anyhow::Result<()> where K: ToRedisArgs + Debug + Send + Sync + 'static, V: ToRedisArgs + Debug + Send + Sync + 'static, { - tracing::debug!("RENAME: {:?} -> {:?}", old_key, new_key); + tracing::debug!("SET: {:?}, {:?}", key, value); - self.connection - .clone() - .rename(&old_key, &new_key) - .await - .context(format!("RENAME: {old_key:?} {new_key:?}"))?; + let mut cmd = redis::cmd("SET"); + cmd.arg(key).arg(value); + cmd.query_async(&mut self.connection.clone()).await?; Ok(()) } - pub async fn srem(&self, key: T, value: U) -> anyhow::Result> - where - T: ToRedisArgs + Debug + Send + Sync + 'static, - U: ToRedisArgs + Debug + Send + Sync + 'static, - { - tracing::debug!("SREM: {:?}={:?}", key, value); - - match self.connection.clone().srem(&key, &value).await { - Ok(1) => Ok(Some(())), - Ok(_) => Ok(None), - Err(e) => Err(anyhow::format_err!(e)), - } - .context(format!("SREM: {key:?} {value:?}")) - } - - pub async fn xread( - &self, - key: K, - start_id: V, - count: usize, - ) -> anyhow::Result> - where - K: ToRedisArgs + Debug + Send + Sync + 'static, - V: ToRedisArgs + Debug + Send + Sync + 'static, - { - tracing::debug!("XREAD: {:?} {:?} {:?}", key, start_id, count); - - let mut results: streams::StreamReadReply = self - .connection - .clone() - .xread_options( - &[&key], - &[&start_id], - &streams::StreamReadOptions::default().count(count), - ) - .await - .context(format!("XREAD {key:?} {start_id:?} {count:?}"))?; - - if results.keys.is_empty() { - return Ok([].to_vec()); - } - - Ok(results.keys.remove(0).ids) - } - - pub async fn xadd(&self, key: K, fields: &[(String, U)]) -> anyhow::Result<()> - where - K: ToRedisArgs + Debug + Send + Sync + 'static, - U: ToRedisArgs + Debug + Send + Sync + 'static, - { - tracing::debug!("XADD: {:?} {:?} {:?}", key, "*", fields); - - self.connection - .clone() - .xadd(&key, "*", fields) - .await - .context(format!("XADD {key:?} {fields:?}"))?; - - Ok(()) - } - - pub async fn xdel(&self, key: K, id: I) -> anyhow::Result<()> - where - K: ToRedisArgs + Debug + Send + Sync + 'static, - I: ToRedisArgs + Debug + Send + Sync + 'static, - { - tracing::debug!("XDEL: {:?} {:?}", key, id); - - self.connection - .clone() - .xdel(&key, &[&id]) - .await - .context(format!("XDEL {key:?} {id:?}"))?; - - Ok(()) - } - - pub async fn exists(&self, key: K) -> anyhow::Result - where - K: ToRedisArgs + Debug + Send + Sync + 'static, - { - tracing::debug!("EXISTS {key:?}"); - - self.connection - .clone() - .exists(&key) - .await - .map_err(|e| anyhow::format_err!(e)) - .context(format!("EXISTS {key:?}")) - } - pub async fn del(&self, key: K) -> anyhow::Result<()> where K: ToRedisArgs + Debug + Send + Sync + 'static, { tracing::debug!("DEL {key:?}"); - self.connection - .clone() - .del(&key) + let mut cmd = redis::cmd("DEL"); + cmd.arg(&key); + cmd.query_async(&mut self.connection.clone()) .await - .map_err(|e| anyhow::format_err!(e)) - .context(format!("DEL {key:?}")) - } - - // `redis::transaction`s currently don't work with async connections, so we have to create a _new_ - // blocking connection to atmoically update a value. - pub fn atomic_update(&self, key: K, update_fn: F) -> anyhow::Result<()> - where - K: ToRedisArgs + Copy + 'static, - O: FromRedisValue + 'static, - N: ToRedisArgs + 'static, - F: Fn(O) -> RedisResult + 'static, - { - let mut conn = redis::Client::open(self.url.clone())?.get_connection()?; - - redis::transaction(&mut conn, &[key], |conn, pipe| { - let old_value = redis::cmd("GET").arg(key).query(conn)?; - let new_value = update_fn(old_value)?; - - pipe.cmd("SET").arg(key).arg(new_value).query(conn) - })?; + .context(format!("DEL {key:?}"))?; Ok(()) }