From bfedf1e184a66760301dedee515033cc9d392ad5 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 18 Dec 2023 15:30:39 +1300 Subject: [PATCH] feat: Write `last_indexed_block` per block stream --- block-streamer/Cargo.lock | 5 ++--- block-streamer/Cargo.toml | 2 +- block-streamer/src/block_stream.rs | 18 +++++++++++++++++- block-streamer/src/redis.rs | 16 +++++++++++++++- 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index 6198e3dfd..379f48cea 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -1912,9 +1912,8 @@ dependencies = [ [[package]] name = "near-lake-framework" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a4259d516729a7d938e3024064fe53c005673169383daa1db5ea360e0950c9d" +version = "0.0.0" +source = "git+https://github.com/near/near-lake-framework-rs?branch=aws-logging#96f2d949c5cbdabce9b848c1ec93895369106355" dependencies = [ "anyhow", "async-stream", diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index 768929cc0..b5cda946c 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -26,7 +26,7 @@ wildmatch = "2.1.1" registry-types = { path = "../registry/types" } -near-lake-framework = "0.7.4" +near-lake-framework = { git = "https://github.com/near/near-lake-framework-rs", branch = "aws-logging" } [build-dependencies] tonic-build = "0.10" diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 6de09751e..6328ce7b1 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -155,13 +155,21 @@ pub(crate) async fn start_block_stream( ); for block in &blocks_from_index { + let block = block.to_owned(); redis_client .xadd( crate::redis::generate_historical_stream_key(&indexer.get_full_name()), - &[("block_height".to_string(), block.to_owned())], + &[("block_height".to_string(), block)], ) .await .context("Failed to add block to Redis Stream")?; + redis_client + .set( + format!("{}:last_indexed_block", indexer.get_full_name()), + block, + ) + .await + .context("Failed to set last_indexed_block")?; } let mut last_indexed_block = @@ -194,6 +202,14 @@ pub(crate) async fn start_block_stream( let block_height = streamer_message.block.header.height; last_indexed_block = block_height; + redis_client + .set( + format!("{}:last_indexed_block", indexer.get_full_name()), + last_indexed_block, + ) + .await + .context("Failed to set last_indexed_block")?; + let matches = crate::rules::reduce_indexer_rule_matches( &indexer.indexer_rule, &streamer_message, diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index a5ca22e30..4b99b16d3 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -18,7 +18,6 @@ pub struct RedisClientImpl { #[cfg_attr(test, mockall::automock)] impl RedisClientImpl { pub async fn connect(redis_connection_str: &str) -> Result { - println!("called this"); let connection = redis::Client::open(redis_connection_str)? .get_tokio_connection_manager() .await?; @@ -44,4 +43,19 @@ impl RedisClientImpl { Ok(()) } + + pub async fn set(&self, key: T, value: U) -> Result<(), RedisError> + where + T: ToRedisArgs + Debug + Send + Sync + 'static, + U: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("SET: {:?}, {:?}", key, value); + + let mut cmd = redis::cmd("SET"); + cmd.arg(key).arg(value); + + cmd.query_async(&mut self.connection.clone()).await?; + + Ok(()) + } }