diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index 2d529d480..f4becc111 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -29,7 +29,7 @@ serde_json = "1.0.55" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tracing-stackdriver = "0.10.0" -tokio = { version = "1.28.0", features = ["full"]} +tokio = { version = "1.28.0", features = ["full", "test-util"]} tokio-util = "0.7.10" tokio-stream = "0.1.14" tonic = "0.10.2" diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index ee403a235..4dbb6b0e7 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -12,6 +12,7 @@ use registry_types::Rule; const LAKE_PREFETCH_SIZE: usize = 100; const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100; const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"]; +const MAX_STREAM_SIZE: u64 = 100; pub struct Task { handle: JoinHandle>, @@ -45,7 +46,7 @@ impl BlockStream { pub fn start( &mut self, start_block_height: near_indexer_primitives::types::BlockHeight, - redis_client: std::sync::Arc, + redis: std::sync::Arc, delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, ) -> anyhow::Result<()> { @@ -74,7 +75,7 @@ impl BlockStream { result = start_block_stream( start_block_height, &indexer_config, - redis_client, + redis, delta_lake_client, lake_s3_client, &chain_id, @@ -129,7 +130,7 @@ impl BlockStream { pub(crate) async fn start_block_stream( start_block_height: near_indexer_primitives::types::BlockHeight, indexer: &IndexerConfig, - redis_client: std::sync::Arc, + redis: std::sync::Arc, delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, chain_id: &ChainId, @@ -145,7 +146,7 @@ pub(crate) async fn start_block_stream( let last_indexed_delta_lake_block = process_delta_lake_blocks( start_block_height, delta_lake_client, - redis_client.clone(), + redis.clone(), indexer, redis_stream.clone(), ) @@ -156,7 +157,7 @@ pub(crate) async fn start_block_stream( last_indexed_delta_lake_block, lake_s3_client, lake_prefetch_size, - redis_client, + redis, indexer, redis_stream, chain_id, @@ -175,7 +176,7 @@ pub(crate) async fn start_block_stream( async fn process_delta_lake_blocks( start_block_height: near_indexer_primitives::types::BlockHeight, delta_lake_client: std::sync::Arc, - redis_client: std::sync::Arc, + redis: std::sync::Arc, indexer: &IndexerConfig, redis_stream: String, ) -> anyhow::Result { @@ -230,10 +231,10 @@ async fn process_delta_lake_blocks( for block_height in &blocks_from_index { let block_height = block_height.to_owned(); - redis_client - .publish_block(indexer, redis_stream.clone(), block_height) + redis + .publish_block(indexer, redis_stream.clone(), block_height, MAX_STREAM_SIZE) .await?; - redis_client + redis .set_last_processed_block(indexer, block_height) .await?; } @@ -253,7 +254,7 @@ async fn process_near_lake_blocks( start_block_height: near_indexer_primitives::types::BlockHeight, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, lake_prefetch_size: usize, - redis_client: std::sync::Arc, + redis: std::sync::Arc, indexer: &IndexerConfig, redis_stream: String, chain_id: &ChainId, @@ -278,7 +279,7 @@ async fn process_near_lake_blocks( let block_height = streamer_message.block.header.height; last_indexed_block = block_height; - redis_client + redis .set_last_processed_block(indexer, block_height) .await?; @@ -289,18 +290,14 @@ async fn process_near_lake_blocks( ); if !matches.is_empty() { - if let Ok(Some(stream_length)) = - redis_client.get_stream_length(redis_stream.clone()).await - { + if let Ok(Some(stream_length)) = redis.get_stream_length(redis_stream.clone()).await { if stream_length <= MAX_STREAM_SIZE_WITH_CACHE { - redis_client - .cache_streamer_message(&streamer_message) - .await?; + redis.cache_streamer_message(&streamer_message).await?; } } - redis_client - .publish_block(indexer, redis_stream.clone(), block_height) + redis + .publish_block(indexer, redis_stream.clone(), block_height, MAX_STREAM_SIZE) .await?; } } @@ -355,17 +352,18 @@ mod tests { .expect_list_matching_block_heights() .returning(|_, _| Ok(vec![107503702, 107503703])); - let mut mock_redis_client = crate::redis::RedisClient::default(); - mock_redis_client + let mut mock_redis = crate::redis::RedisClient::default(); + mock_redis .expect_publish_block() .with( predicate::always(), predicate::eq("stream key".to_string()), predicate::in_iter([107503702, 107503703, 107503705]), + predicate::always(), ) - .returning(|_, _, _| Ok(())) + .returning(|_, _, _, _| Ok(())) .times(3); - mock_redis_client + mock_redis .expect_set_last_processed_block() .with( predicate::always(), @@ -373,11 +371,11 @@ mod tests { ) .returning(|_, _| Ok(())) .times(4); - mock_redis_client + mock_redis .expect_cache_streamer_message() .with(predicate::always()) .returning(|_| Ok(())); - mock_redis_client + mock_redis .expect_get_stream_length() .with(predicate::eq("stream key".to_string())) .returning(|_| Ok(Some(10))); @@ -397,7 +395,7 @@ mod tests { start_block_stream( 91940840, &indexer_config, - std::sync::Arc::new(mock_redis_client), + std::sync::Arc::new(mock_redis), std::sync::Arc::new(mock_delta_lake_client), mock_lake_s3_client, &ChainId::Mainnet, @@ -435,8 +433,9 @@ mod tests { predicate::always(), predicate::eq("stream key".to_string()), predicate::in_iter([107503705]), + predicate::always(), ) - .returning(|_, _, _| Ok(())) + .returning(|_, _, _, _| Ok(())) .times(1); mock_redis_client .expect_set_last_processed_block() diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index f14c65706..7776f77d9 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> { "Starting Block Streamer" ); - let redis_client = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?); + let redis = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?); let aws_config = aws_config::from_env().load().await; let s3_config = aws_sdk_s3::Config::from(&aws_config); @@ -58,7 +58,7 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server")); - server::init(&grpc_port, redis_client, delta_lake_client, lake_s3_client).await?; + server::init(&grpc_port, redis, delta_lake_client, lake_s3_client).await?; Ok(()) } diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index 254e0c9db..03401a10a 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -10,18 +10,16 @@ use crate::metrics; use crate::utils; #[cfg(test)] -pub use MockRedisClientImpl as RedisClient; +use MockRedisCommandsImpl as RedisCommands; #[cfg(not(test))] -pub use RedisClientImpl as RedisClient; +use RedisCommandsImpl as RedisCommands; -pub struct RedisClientImpl { +struct RedisCommandsImpl { connection: ConnectionManager, } #[cfg_attr(test, mockall::automock)] -impl RedisClientImpl { - const STREAMER_MESSAGE_PREFIX: &'static str = "streamer_message:"; - +impl RedisCommandsImpl { pub async fn connect(redis_url: &str) -> Result { let connection = redis::Client::open(redis_url)? .get_tokio_connection_manager() @@ -91,6 +89,26 @@ impl RedisClientImpl { Ok(()) } +} + +#[cfg(test)] +pub use MockRedisClientImpl as RedisClient; +#[cfg(not(test))] +pub use RedisClientImpl as RedisClient; + +pub struct RedisClientImpl { + commands: RedisCommands, +} + +#[cfg_attr(test, mockall::automock)] +impl RedisClientImpl { + const STREAMER_MESSAGE_PREFIX: &'static str = "streamer_message:"; + + pub async fn connect(redis_url: &str) -> Result { + let commands = RedisCommands::connect(redis_url).await?; + + Ok(Self { commands }) + } pub async fn set_last_processed_block( &self, @@ -109,13 +127,14 @@ impl RedisClientImpl { .context("Failed to convert block height (u64) to metrics type (i64)")?, ); - self.set(indexer_config.last_processed_block_key(), height) + self.commands + .set(indexer_config.last_processed_block_key(), height) .await .context("Failed to set last processed block") } pub async fn get_stream_length(&self, stream: String) -> anyhow::Result> { - self.xlen(stream).await + self.commands.xlen(stream).await } pub async fn cache_streamer_message( @@ -128,13 +147,14 @@ impl RedisClientImpl { utils::snake_to_camel(&mut streamer_message); - self.set_ex( - format!("{}{}", Self::STREAMER_MESSAGE_PREFIX, height), - serde_json::to_string(&streamer_message)?, - 60, - ) - .await - .context("Failed to cache streamer message") + self.commands + .set_ex( + format!("{}{}", Self::STREAMER_MESSAGE_PREFIX, height), + serde_json::to_string(&streamer_message)?, + 60, + ) + .await + .context("Failed to cache streamer message") } pub async fn publish_block( @@ -142,16 +162,82 @@ impl RedisClientImpl { indexer: &IndexerConfig, stream: String, block_height: u64, + max_size: u64, ) -> anyhow::Result<()> { + loop { + let stream_length = self.get_stream_length(stream.clone()).await?; + + if stream_length.is_none() { + break; + } + + if stream_length.unwrap() < max_size { + break; + } + + println!("Waiting for stream to be consumed"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + metrics::PUBLISHED_BLOCKS_COUNT .with_label_values(&[&indexer.get_full_name()]) .inc(); - self.xadd( - stream.clone(), - &[(String::from("block_height"), block_height)], - ) - .await - .context("Failed to add block to Redis Stream") + self.commands + .xadd( + stream.clone(), + &[(String::from("block_height"), block_height)], + ) + .await + .context("Failed to add block to Redis Stream") + } +} + +#[cfg(test)] +mod test { + use super::*; + + use mockall::predicate; + use near_lake_framework::near_indexer_primitives; + + #[tokio::test] + async fn limits_block_stream_length() { + let mut mock_redis_commands = RedisCommands::default(); + mock_redis_commands + .expect_xadd::() + .with(predicate::eq("stream".to_string()), predicate::always()) + .returning(|_, _| Ok(())) + .once(); + let mut stream_len = 10; + mock_redis_commands + .expect_xlen::() + .with(predicate::eq("stream".to_string())) + .returning(move |_| { + stream_len -= 1; + Ok(Some(stream_len)) + }); + + let redis = RedisClientImpl { + commands: mock_redis_commands, + }; + + let indexer_config = crate::indexer_config::IndexerConfig { + account_id: near_indexer_primitives::types::AccountId::try_from( + "morgs.near".to_string(), + ) + .unwrap(), + function_name: "test".to_string(), + rule: registry_types::Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: registry_types::Status::Success, + }, + }; + + tokio::time::pause(); + + redis + .publish_block(&indexer_config, "stream".to_string(), 0, 1) + .await + .unwrap(); } } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 7c2a6d45a..4c7d2c69d 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -13,7 +13,7 @@ use crate::server::blockstreamer; use blockstreamer::*; pub struct BlockStreamerService { - redis_client: std::sync::Arc, + redis: std::sync::Arc, delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, chain_id: ChainId, @@ -22,12 +22,12 @@ pub struct BlockStreamerService { impl BlockStreamerService { pub fn new( - redis_client: std::sync::Arc, + redis: std::sync::Arc, delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, ) -> Self { Self { - redis_client, + redis, delta_lake_client, lake_s3_client, chain_id: ChainId::Mainnet, @@ -113,7 +113,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic block_stream .start( request.start_block_height, - self.redis_client.clone(), + self.redis.clone(), self.delta_lake_client.clone(), self.lake_s3_client.clone(), ) @@ -206,10 +206,7 @@ mod tests { .expect_list_matching_block_heights() .returning(|_, _| Ok(vec![])); - let mut mock_redis_client = crate::redis::RedisClient::default(); - mock_redis_client - .expect_xadd::() - .returning(|_, _| Ok(())); + let mock_redis = crate::redis::RedisClient::default(); let mut mock_lake_s3_client = crate::lake_s3_client::SharedLakeS3Client::default(); mock_lake_s3_client @@ -217,7 +214,7 @@ mod tests { .returning(crate::lake_s3_client::SharedLakeS3Client::default); BlockStreamerService::new( - std::sync::Arc::new(mock_redis_client), + std::sync::Arc::new(mock_redis), std::sync::Arc::new(mock_delta_lake_client), mock_lake_s3_client, ) diff --git a/block-streamer/src/server/mod.rs b/block-streamer/src/server/mod.rs index fcdb77068..0159550d4 100644 --- a/block-streamer/src/server/mod.rs +++ b/block-streamer/src/server/mod.rs @@ -6,7 +6,7 @@ pub mod blockstreamer { pub async fn init( port: &str, - redis_client: std::sync::Arc, + redis: std::sync::Arc, delta_lake_client: std::sync::Arc, lake_s3_client: crate::lake_s3_client::SharedLakeS3Client, ) -> anyhow::Result<()> { @@ -14,11 +14,8 @@ pub async fn init( tracing::info!("Starting gRPC server on {}", addr); - let block_streamer_service = block_streamer_service::BlockStreamerService::new( - redis_client, - delta_lake_client, - lake_s3_client, - ); + let block_streamer_service = + block_streamer_service::BlockStreamerService::new(redis, delta_lake_client, lake_s3_client); let block_streamer_server = blockstreamer::block_streamer_server::BlockStreamerServer::new(block_streamer_service);