Skip to content

Commit

Permalink
Cargo formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Mar 8, 2024
1 parent c4b27ed commit 34afd4e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 19 deletions.
11 changes: 7 additions & 4 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use registry_types::Rule;
/// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but
/// we need this configurable for testing purposes.
const LAKE_PREFETCH_SIZE: usize = 100;
const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100;
const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"];

pub struct Task {
Expand Down Expand Up @@ -285,11 +286,13 @@ async fn process_near_lake_blocks(
);

if !matches.is_empty() {
if let Ok(Some(stream_length)) = redis_client.get_stream_length(redis_stream.clone()).await {
if stream_length < 100 {
if let Ok(Some(stream_length)) =
redis_client.get_stream_length(redis_stream.clone()).await
{
if stream_length <= MAX_STREAM_SIZE_WITH_CACHE {
redis_client
.cache_streamer_message(&streamer_message)
.await?;
.cache_streamer_message(&streamer_message)
.await?;
}
}

Expand Down
11 changes: 4 additions & 7 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ impl RedisClientImpl {
let mut cmd = redis::cmd("XLEN");
cmd.arg(&stream_key);

let stream_length = cmd.query_async(&mut self.connection.clone())
let stream_length = cmd
.query_async(&mut self.connection.clone())
.await
.context(format!("XLEN {stream_key:?}"))?;

Expand Down Expand Up @@ -113,12 +114,8 @@ impl RedisClientImpl {
.context("Failed to set last processed block")
}

pub async fn get_stream_length(
&self,
stream: String,
) -> anyhow::Result<Option<u64>> {
self.xlen(stream)
.await
pub async fn get_stream_length(&self, stream: String) -> anyhow::Result<Option<u64>> {
self.xlen(stream).await
}

pub async fn cache_streamer_message(
Expand Down
5 changes: 4 additions & 1 deletion coordinator/src/block_streams/synchronise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ async fn synchronise_block_stream(
let start_block_height =
determine_start_block_height(&stream_status, indexer_config, redis_client).await?;

tracing::info!("Starting new block stream starting at block {}", start_block_height);
tracing::info!(
"Starting new block stream starting at block {}",
start_block_height
);

block_streams_handler
.start(start_block_height, indexer_config)
Expand Down
12 changes: 5 additions & 7 deletions coordinator/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
use std::fmt::Debug;

use anyhow::Context;
use redis::{
aio::ConnectionManager, AsyncCommands, FromRedisValue, ToRedisArgs,
};
use redis::{aio::ConnectionManager, AsyncCommands, FromRedisValue, ToRedisArgs};

use crate::indexer_config::IndexerConfig;

Expand All @@ -21,7 +19,6 @@ pub struct RedisClientImpl {

#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {

pub async fn connect(redis_url: &str) -> anyhow::Result<Self> {
let connection = redis::Client::open(redis_url)?
.get_connection_manager()
Expand All @@ -41,7 +38,8 @@ impl RedisClientImpl {
{
let mut cmd = redis::cmd("GET");
cmd.arg(&key);
let value = cmd.query_async(&mut self.connection.clone())
let value = cmd
.query_async(&mut self.connection.clone())
.await
.context(format!("GET: {key:?}"))?;

Expand Down Expand Up @@ -69,13 +67,13 @@ impl RedisClientImpl {
K: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("DEL {key:?}");

let mut cmd = redis::cmd("DEL");
cmd.arg(&key);
cmd.query_async(&mut self.connection.clone())
.await
.context(format!("DEL {key:?}"))?;

Ok(())
}

Expand Down

0 comments on commit 34afd4e

Please sign in to comment.