From 34afd4e2d2c786830b52d9fd7a3a6618ec885d73 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 7 Mar 2024 18:20:53 -0800 Subject: [PATCH] Cargo formatting --- block-streamer/src/block_stream.rs | 11 +++++++---- block-streamer/src/redis.rs | 11 ++++------- coordinator/src/block_streams/synchronise.rs | 5 ++++- coordinator/src/redis.rs | 12 +++++------- 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 80d287de3..3283dbf21 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -10,6 +10,7 @@ use registry_types::Rule; /// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but /// we need this configurable for testing purposes. const LAKE_PREFETCH_SIZE: usize = 100; +const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100; const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"]; pub struct Task { @@ -285,11 +286,13 @@ async fn process_near_lake_blocks( ); if !matches.is_empty() { - if let Ok(Some(stream_length)) = redis_client.get_stream_length(redis_stream.clone()).await { - if stream_length < 100 { + if let Ok(Some(stream_length)) = + redis_client.get_stream_length(redis_stream.clone()).await + { + if stream_length <= MAX_STREAM_SIZE_WITH_CACHE { redis_client - .cache_streamer_message(&streamer_message) - .await?; + .cache_streamer_message(&streamer_message) + .await?; } } diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index 7be8ae4ee..254e0c9db 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -58,7 +58,8 @@ impl RedisClientImpl { let mut cmd = redis::cmd("XLEN"); cmd.arg(&stream_key); - let stream_length = cmd.query_async(&mut self.connection.clone()) + let stream_length = cmd + .query_async(&mut self.connection.clone()) .await .context(format!("XLEN {stream_key:?}"))?; @@ -113,12 +114,8 @@ impl RedisClientImpl { .context("Failed to set last processed block") } - pub async fn get_stream_length( - &self, - stream: String, - ) -> anyhow::Result> { - self.xlen(stream) - .await + pub async fn get_stream_length(&self, stream: String) -> anyhow::Result> { + self.xlen(stream).await } pub async fn cache_streamer_message( diff --git a/coordinator/src/block_streams/synchronise.rs b/coordinator/src/block_streams/synchronise.rs index bb54ce551..c0f21c588 100644 --- a/coordinator/src/block_streams/synchronise.rs +++ b/coordinator/src/block_streams/synchronise.rs @@ -95,7 +95,10 @@ async fn synchronise_block_stream( let start_block_height = determine_start_block_height(&stream_status, indexer_config, redis_client).await?; - tracing::info!("Starting new block stream starting at block {}", start_block_height); + tracing::info!( + "Starting new block stream starting at block {}", + start_block_height + ); block_streams_handler .start(start_block_height, indexer_config) diff --git a/coordinator/src/redis.rs b/coordinator/src/redis.rs index 79501512b..4f3b15b6e 100644 --- a/coordinator/src/redis.rs +++ b/coordinator/src/redis.rs @@ -3,9 +3,7 @@ use std::fmt::Debug; use anyhow::Context; -use redis::{ - aio::ConnectionManager, AsyncCommands, FromRedisValue, ToRedisArgs, -}; +use redis::{aio::ConnectionManager, AsyncCommands, FromRedisValue, ToRedisArgs}; use crate::indexer_config::IndexerConfig; @@ -21,7 +19,6 @@ pub struct RedisClientImpl { #[cfg_attr(test, mockall::automock)] impl RedisClientImpl { - pub async fn connect(redis_url: &str) -> anyhow::Result { let connection = redis::Client::open(redis_url)? .get_connection_manager() @@ -41,7 +38,8 @@ impl RedisClientImpl { { let mut cmd = redis::cmd("GET"); cmd.arg(&key); - let value = cmd.query_async(&mut self.connection.clone()) + let value = cmd + .query_async(&mut self.connection.clone()) .await .context(format!("GET: {key:?}"))?; @@ -69,13 +67,13 @@ impl RedisClientImpl { K: ToRedisArgs + Debug + Send + Sync + 'static, { tracing::debug!("DEL {key:?}"); - + let mut cmd = redis::cmd("DEL"); cmd.arg(&key); cmd.query_async(&mut self.connection.clone()) .await .context(format!("DEL {key:?}"))?; - + Ok(()) }