diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 0c3f81acd..6d79a11a2 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -10,6 +10,7 @@ use registry_types::Rule; /// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but /// we need this configurable for testing purposes. const LAKE_PREFETCH_SIZE: usize = 100; +const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100; const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"]; pub struct Task { @@ -285,9 +286,15 @@ async fn process_near_lake_blocks( ); if !matches.is_empty() { - redis_client - .cache_streamer_message(&streamer_message) - .await?; + if let Ok(Some(stream_length)) = + redis_client.get_stream_length(redis_stream.clone()).await + { + if stream_length <= MAX_STREAM_SIZE_WITH_CACHE { + redis_client + .cache_streamer_message(&streamer_message) + .await?; + } + } redis_client .publish_block(indexer, redis_stream.clone(), block_height) @@ -346,6 +353,10 @@ mod tests { .expect_cache_streamer_message() .with(predicate::always()) .returning(|_| Ok(())); + mock_redis_client + .expect_get_stream_length() + .with(predicate::eq("stream key".to_string())) + .returning(|_| Ok(Some(10))); let indexer_config = crate::indexer_config::IndexerConfig { account_id: near_indexer_primitives::types::AccountId::try_from( @@ -375,6 +386,76 @@ mod tests { .unwrap(); } + #[tokio::test] + async fn skips_caching_of_lake_block_over_stream_size_limit() { + let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default(); + mock_delta_lake_client + .expect_get_latest_block_metadata() + .returning(|| { + Ok(crate::delta_lake_client::LatestBlockMetadata { + last_indexed_block: "107503700".to_string(), + processed_at_utc: "".to_string(), + first_indexed_block: "".to_string(), + last_indexed_block_date: "".to_string(), + first_indexed_block_date: "".to_string(), + }) + }); + + let mut mock_redis_client = crate::redis::RedisClient::default(); + mock_redis_client + .expect_publish_block() + .with( + predicate::always(), + predicate::eq("stream key".to_string()), + predicate::in_iter([107503705]), + ) + .returning(|_, _, _| Ok(())) + .times(1); + mock_redis_client + .expect_set_last_processed_block() + .with( + predicate::always(), + predicate::in_iter([107503704, 107503705]), + ) + .returning(|_, _| Ok(())) + .times(2); + mock_redis_client + .expect_cache_streamer_message() + .with(predicate::always()) + .never(); + mock_redis_client + .expect_get_stream_length() + .with(predicate::eq("stream key".to_string())) + .returning(|_| Ok(Some(200))); + + let indexer_config = crate::indexer_config::IndexerConfig { + account_id: near_indexer_primitives::types::AccountId::try_from( + "morgs.near".to_string(), + ) + .unwrap(), + function_name: "test".to_string(), + rule: registry_types::Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: registry_types::Status::Success, + }, + }; + + let lake_s3_config = crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]); + + start_block_stream( + 107503704, + &indexer_config, + std::sync::Arc::new(mock_redis_client), + std::sync::Arc::new(mock_delta_lake_client), + lake_s3_config, + &ChainId::Mainnet, + 1, + "stream key".to_string(), + ) + .await + .unwrap(); + } + #[tokio::test] async fn skips_delta_lake_for_star_filter() { let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default(); diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index 0813d2238..254e0c9db 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -49,6 +49,23 @@ impl RedisClientImpl { Ok(()) } + pub async fn xlen(&self, stream_key: T) -> anyhow::Result> + where + T: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("XLEN: {:?}", stream_key); + + let mut cmd = redis::cmd("XLEN"); + cmd.arg(&stream_key); + + let stream_length = cmd + .query_async(&mut self.connection.clone()) + .await + .context(format!("XLEN {stream_key:?}"))?; + + Ok(stream_length) + } + pub async fn set(&self, key: T, value: U) -> Result<(), RedisError> where T: ToRedisArgs + Debug + Send + Sync + 'static, @@ -97,6 +114,10 @@ impl RedisClientImpl { .context("Failed to set last processed block") } + pub async fn get_stream_length(&self, stream: String) -> anyhow::Result> { + self.xlen(stream).await + } + pub async fn cache_streamer_message( &self, streamer_message: &near_lake_framework::near_indexer_primitives::StreamerMessage, diff --git a/coordinator/src/block_streams/synchronise.rs b/coordinator/src/block_streams/synchronise.rs index 29996d64a..c0f21c588 100644 --- a/coordinator/src/block_streams/synchronise.rs +++ b/coordinator/src/block_streams/synchronise.rs @@ -95,6 +95,11 @@ async fn synchronise_block_stream( let start_block_height = determine_start_block_height(&stream_status, indexer_config, redis_client).await?; + tracing::info!( + "Starting new block stream starting at block {}", + start_block_height + ); + block_streams_handler .start(start_block_height, indexer_config) .await?; @@ -164,8 +169,6 @@ async fn determine_start_block_height( return get_continuation_block_height(indexer_config, redis_client).await; } - tracing::info!(start_block = ?indexer_config.start_block, "Stating new block stream"); - match indexer_config.start_block { StartBlock::Latest => Ok(indexer_config.get_registry_version()), StartBlock::Height(height) => Ok(height), diff --git a/coordinator/src/indexer_config.rs b/coordinator/src/indexer_config.rs index e7b049281..1b2b95e21 100644 --- a/coordinator/src/indexer_config.rs +++ b/coordinator/src/indexer_config.rs @@ -22,14 +22,6 @@ impl IndexerConfig { format!("{}:block_stream", self.get_full_name()) } - pub fn get_historical_redis_stream_key(&self) -> String { - format!("{}:historical:stream", self.get_full_name()) - } - - pub fn get_real_time_redis_stream_key(&self) -> String { - format!("{}:real_time:stream", self.get_full_name()) - } - pub fn get_last_published_block_key(&self) -> String { format!("{}:last_published_block", self.get_full_name()) } diff --git a/coordinator/src/redis.rs b/coordinator/src/redis.rs index a5c26d28a..4f3b15b6e 100644 --- a/coordinator/src/redis.rs +++ b/coordinator/src/redis.rs @@ -3,9 +3,7 @@ use std::fmt::Debug; use anyhow::Context; -use redis::{ - aio::ConnectionManager, streams, AsyncCommands, FromRedisValue, RedisResult, ToRedisArgs, -}; +use redis::{aio::ConnectionManager, AsyncCommands, FromRedisValue, ToRedisArgs}; use crate::indexer_config::IndexerConfig; @@ -21,9 +19,6 @@ pub struct RedisClientImpl { #[cfg_attr(test, mockall::automock)] impl RedisClientImpl { - pub const STREAMS_SET: &str = "streams"; - pub const ALLOWLIST: &str = "allowlist"; - pub async fn connect(redis_url: &str) -> anyhow::Result { let connection = redis::Client::open(redis_url)? .get_connection_manager() @@ -41,171 +36,43 @@ impl RedisClientImpl { T: ToRedisArgs + Debug + Send + Sync + 'static, U: FromRedisValue + Debug + 'static, { - let value: Option = self - .connection - .clone() - .get(&key) + let mut cmd = redis::cmd("GET"); + cmd.arg(&key); + let value = cmd + .query_async(&mut self.connection.clone()) .await .context(format!("GET: {key:?}"))?; - tracing::debug!("GET: {:?}={:?}", key, value); + tracing::debug!("GET: {:?}={:?}", key, &value); Ok(value) } - pub async fn set(&self, key: K, value: V) -> anyhow::Result<()> - where - K: ToRedisArgs + Debug + Send + Sync + 'static, - V: ToRedisArgs + Debug + Send + Sync + 'static, - { - tracing::debug!("SET: {key:?} {value:?}"); - - self.connection - .clone() - .set(&key, &value) - .await - .context(format!("SET: {key:?} {value:?}"))?; - Ok(()) - } - - pub async fn rename(&self, old_key: K, new_key: V) -> anyhow::Result<()> + pub async fn set(&self, key: K, value: V) -> anyhow::Result<()> where K: ToRedisArgs + Debug + Send + Sync + 'static, V: ToRedisArgs + Debug + Send + Sync + 'static, { - tracing::debug!("RENAME: {:?} -> {:?}", old_key, new_key); + tracing::debug!("SET: {:?}, {:?}", key, value); - self.connection - .clone() - .rename(&old_key, &new_key) - .await - .context(format!("RENAME: {old_key:?} {new_key:?}"))?; + let mut cmd = redis::cmd("SET"); + cmd.arg(key).arg(value); + cmd.query_async(&mut self.connection.clone()).await?; Ok(()) } - pub async fn srem(&self, key: T, value: U) -> anyhow::Result> - where - T: ToRedisArgs + Debug + Send + Sync + 'static, - U: ToRedisArgs + Debug + Send + Sync + 'static, - { - tracing::debug!("SREM: {:?}={:?}", key, value); - - match self.connection.clone().srem(&key, &value).await { - Ok(1) => Ok(Some(())), - Ok(_) => Ok(None), - Err(e) => Err(anyhow::format_err!(e)), - } - .context(format!("SREM: {key:?} {value:?}")) - } - - pub async fn xread( - &self, - key: K, - start_id: V, - count: usize, - ) -> anyhow::Result> - where - K: ToRedisArgs + Debug + Send + Sync + 'static, - V: ToRedisArgs + Debug + Send + Sync + 'static, - { - tracing::debug!("XREAD: {:?} {:?} {:?}", key, start_id, count); - - let mut results: streams::StreamReadReply = self - .connection - .clone() - .xread_options( - &[&key], - &[&start_id], - &streams::StreamReadOptions::default().count(count), - ) - .await - .context(format!("XREAD {key:?} {start_id:?} {count:?}"))?; - - if results.keys.is_empty() { - return Ok([].to_vec()); - } - - Ok(results.keys.remove(0).ids) - } - - pub async fn xadd(&self, key: K, fields: &[(String, U)]) -> anyhow::Result<()> - where - K: ToRedisArgs + Debug + Send + Sync + 'static, - U: ToRedisArgs + Debug + Send + Sync + 'static, - { - tracing::debug!("XADD: {:?} {:?} {:?}", key, "*", fields); - - self.connection - .clone() - .xadd(&key, "*", fields) - .await - .context(format!("XADD {key:?} {fields:?}"))?; - - Ok(()) - } - - pub async fn xdel(&self, key: K, id: I) -> anyhow::Result<()> - where - K: ToRedisArgs + Debug + Send + Sync + 'static, - I: ToRedisArgs + Debug + Send + Sync + 'static, - { - tracing::debug!("XDEL: {:?} {:?}", key, id); - - self.connection - .clone() - .xdel(&key, &[&id]) - .await - .context(format!("XDEL {key:?} {id:?}"))?; - - Ok(()) - } - - pub async fn exists(&self, key: K) -> anyhow::Result - where - K: ToRedisArgs + Debug + Send + Sync + 'static, - { - tracing::debug!("EXISTS {key:?}"); - - self.connection - .clone() - .exists(&key) - .await - .map_err(|e| anyhow::format_err!(e)) - .context(format!("EXISTS {key:?}")) - } - pub async fn del(&self, key: K) -> anyhow::Result<()> where K: ToRedisArgs + Debug + Send + Sync + 'static, { tracing::debug!("DEL {key:?}"); - self.connection - .clone() - .del(&key) + let mut cmd = redis::cmd("DEL"); + cmd.arg(&key); + cmd.query_async(&mut self.connection.clone()) .await - .map_err(|e| anyhow::format_err!(e)) - .context(format!("DEL {key:?}")) - } - - // `redis::transaction`s currently don't work with async connections, so we have to create a _new_ - // blocking connection to atmoically update a value. - pub fn atomic_update(&self, key: K, update_fn: F) -> anyhow::Result<()> - where - K: ToRedisArgs + Copy + 'static, - O: FromRedisValue + 'static, - N: ToRedisArgs + 'static, - F: Fn(O) -> RedisResult + 'static, - { - let mut conn = redis::Client::open(self.url.clone())?.get_connection()?; - - redis::transaction(&mut conn, &[key], |conn, pipe| { - let old_value = redis::cmd("GET").arg(key).query(conn)?; - let new_value = update_fn(old_value)?; - - pipe.cmd("SET").arg(key).arg(new_value).query(conn) - })?; + .context(format!("DEL {key:?}"))?; Ok(()) }