Skip to content

Commit

Permalink
Add limit for caching block and fix sync block stream log
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Mar 8, 2024
1 parent 24f85bc commit c4b27ed
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
10 changes: 7 additions & 3 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,13 @@ async fn process_near_lake_blocks(
);

if !matches.is_empty() {
redis_client
.cache_streamer_message(&streamer_message)
.await?;
if let Ok(Some(stream_length)) = redis_client.get_stream_length(redis_stream.clone()).await {
if stream_length < 100 {
redis_client
.cache_streamer_message(&streamer_message)
.await?;
}
}

redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
Expand Down
24 changes: 24 additions & 0 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ impl RedisClientImpl {
Ok(())
}

pub async fn xlen<T>(&self, stream_key: T) -> anyhow::Result<Option<u64>>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("XLEN: {:?}", stream_key);

let mut cmd = redis::cmd("XLEN");
cmd.arg(&stream_key);

let stream_length = cmd.query_async(&mut self.connection.clone())
.await
.context(format!("XLEN {stream_key:?}"))?;

Ok(stream_length)
}

pub async fn set<T, U>(&self, key: T, value: U) -> Result<(), RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
Expand Down Expand Up @@ -97,6 +113,14 @@ impl RedisClientImpl {
.context("Failed to set last processed block")
}

pub async fn get_stream_length(
&self,
stream: String,
) -> anyhow::Result<Option<u64>> {
self.xlen(stream)
.await
}

pub async fn cache_streamer_message(
&self,
streamer_message: &near_lake_framework::near_indexer_primitives::StreamerMessage,
Expand Down
4 changes: 2 additions & 2 deletions coordinator/src/block_streams/synchronise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ async fn synchronise_block_stream(
let start_block_height =
determine_start_block_height(&stream_status, indexer_config, redis_client).await?;

tracing::info!("Starting new block stream starting at block {}", start_block_height);

block_streams_handler
.start(start_block_height, indexer_config)
.await?;
Expand Down Expand Up @@ -164,8 +166,6 @@ async fn determine_start_block_height(
return get_continuation_block_height(indexer_config, redis_client).await;
}

tracing::info!(start_block = ?indexer_config.start_block, "Stating new block stream");

match indexer_config.start_block {
StartBlock::Latest => Ok(indexer_config.get_registry_version()),
StartBlock::Height(height) => Ok(height),
Expand Down

0 comments on commit c4b27ed

Please sign in to comment.