Skip to content

Commit

Permalink
feat: Write last_indexed_block per block stream
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Dec 18, 2023
1 parent 17a3316 commit b96c6e3
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 6 deletions.
5 changes: 2 additions & 3 deletions block-streamer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ wildmatch = "2.1.1"

registry-types = { path = "../registry/types" }

near-lake-framework = "0.7.4"
near-lake-framework = { git = "https://github.com/near/near-lake-framework-rs", branch = "aws-logging" }

[build-dependencies]
tonic-build = "0.10"
Expand Down
22 changes: 21 additions & 1 deletion block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,21 @@ pub(crate) async fn start_block_stream(
);

for block in &blocks_from_index {
let block = block.to_owned();
redis_client
.xadd(
crate::redis::generate_historical_stream_key(&indexer.get_full_name()),
&[("block_height".to_string(), block.to_owned())],
&[("block_height".to_string(), block)],
)
.await
.context("Failed to add block to Redis Stream")?;
redis_client
.set(
format!("{}:last_indexed_block", indexer.get_full_name()),
block,
)
.await
.context("Failed to set last_indexed_block")?;
}

let mut last_indexed_block =
Expand Down Expand Up @@ -194,6 +202,14 @@ pub(crate) async fn start_block_stream(
let block_height = streamer_message.block.header.height;
last_indexed_block = block_height;

redis_client
.set(
format!("{}:last_indexed_block", indexer.get_full_name()),
last_indexed_block,
)
.await
.context("Failed to set last_indexed_block")?;

let matches = crate::rules::reduce_indexer_rule_matches(
&indexer.indexer_rule,
&streamer_message,
Expand Down Expand Up @@ -252,6 +268,10 @@ mod tests {
.expect_xadd::<String, u64>()
.returning(|_, _| Ok(()))
.times(expected_matching_block_height_count);
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, _| Ok(()))
.times(4);

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
Expand Down
16 changes: 15 additions & 1 deletion block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub struct RedisClientImpl {
#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {
pub async fn connect(redis_connection_str: &str) -> Result<Self, RedisError> {
println!("called this");
let connection = redis::Client::open(redis_connection_str)?
.get_tokio_connection_manager()
.await?;
Expand All @@ -44,4 +43,19 @@ impl RedisClientImpl {

Ok(())
}

pub async fn set<T, U>(&self, key: T, value: U) -> Result<(), RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
U: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("SET: {:?}, {:?}", key, value);

let mut cmd = redis::cmd("SET");
cmd.arg(key).arg(value);

cmd.query_async(&mut self.connection.clone()).await?;

Ok(())
}
}

0 comments on commit b96c6e3

Please sign in to comment.