diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index bc4605c73..8990d781d 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -285,6 +285,10 @@ async fn process_near_lake_blocks( ); if !matches.is_empty() { + redis_client + .cache_streamer_message(&streamer_message) + .await?; + redis_client .publish_block(indexer, redis_stream.clone(), block_height) .await?; diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index 1ca9beca8..9ee08ad74 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -8,6 +8,7 @@ mod redis; mod rules; mod s3_client; mod server; +mod utils; #[cfg(test)] mod test_utils; diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index 0cd193644..77b8861f4 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -3,10 +3,11 @@ use std::fmt::Debug; use anyhow::Context; -use redis::{aio::ConnectionManager, RedisError, ToRedisArgs}; +use redis::{aio::ConnectionManager, AsyncCommands, RedisError, ToRedisArgs}; use crate::indexer_config::IndexerConfig; use crate::metrics; +use crate::utils; #[cfg(test)] pub use MockRedisClientImpl as RedisClient; @@ -19,6 +20,8 @@ pub struct RedisClientImpl { #[cfg_attr(test, mockall::automock)] impl RedisClientImpl { + const STREAMER_MESSAGE_PREFIX: &'static str = "streamer_message:"; + pub async fn connect(redis_url: &str) -> Result { let connection = redis::Client::open(redis_url)? .get_tokio_connection_manager() @@ -56,7 +59,17 @@ impl RedisClientImpl { let mut cmd = redis::cmd("SET"); cmd.arg(key).arg(value); - cmd.query_async(&mut self.connection.clone()).await?; + Ok(()) + } + + pub async fn set_ex(&self, key: T, value: U, expiry: usize) -> Result<(), RedisError> + where + T: ToRedisArgs + Debug + Send + Sync + 'static, + U: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("SET: {:?}, {:?}", key, value); + + self.connection.clone().set_ex(key, value, expiry).await?; Ok(()) } @@ -83,6 +96,25 @@ impl RedisClientImpl { .context("Failed to set last processed block") } + pub async fn cache_streamer_message( + &self, + streamer_message: &near_lake_framework::near_indexer_primitives::StreamerMessage, + ) -> anyhow::Result<()> { + let height = streamer_message.block.header.height; + + let mut streamer_message = serde_json::to_value(streamer_message)?; + + utils::snake_to_camel(&mut streamer_message); + + self.set_ex( + format!("{}{}", Self::STREAMER_MESSAGE_PREFIX, height), + serde_json::to_string(&streamer_message)?, + 60, + ) + .await + .context("Failed to cache streamer message") + } + pub async fn publish_block( &self, indexer: &IndexerConfig, diff --git a/block-streamer/src/utils.rs b/block-streamer/src/utils.rs new file mode 100644 index 000000000..5622226ca --- /dev/null +++ b/block-streamer/src/utils.rs @@ -0,0 +1,148 @@ +pub fn snake_to_camel(value: &mut serde_json::value::Value) { + match value { + serde_json::value::Value::Object(map) => { + for key in map.keys().cloned().collect::>() { + let new_key = key + .split('_') + .enumerate() + .map(|(i, str)| { + if i > 0 { + return str[..1].to_uppercase() + &str[1..]; + } + str.to_owned() + }) + .collect::>() + .join(""); + + if let Some(mut val) = map.remove(&key) { + snake_to_camel(&mut val); + map.insert(new_key, val); + } + } + } + serde_json::value::Value::Array(vec) => { + for val in vec { + snake_to_camel(val); + } + } + _ => {} + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn flat() { + let mut value = json!({ + "first_name": "John", + "last_name": "Doe" + }); + + snake_to_camel(&mut value); + + assert_eq!( + value, + json!({ + "firstName": "John", + "lastName": "Doe" + }) + ); + } + + #[test] + fn nested() { + let mut value = json!({ + "user_details": { + "first_name": "John", + "last_name": "Doe" + } + }); + + snake_to_camel(&mut value); + + assert_eq!( + value, + json!({ + "userDetails": { + "firstName": "John", + "lastName": "Doe" + } + }) + ); + } + + #[test] + fn array() { + let mut value = json!([{ + "first_name": "John", + "last_name": "Doe" + }, { + "first_name": "Jane", + "last_name": "Doe" + }]); + + snake_to_camel(&mut value); + + assert_eq!( + value, + json!([{ + "firstName": "John", + "lastName": "Doe" + }, { + "firstName": "Jane", + "lastName": "Doe" + }]) + ); + } + + #[test] + fn nested_and_array() { + let mut value = json!({ + "user_details": { + "personal_info": { + "first_name": "John", + "last_name": "Doe" + }, + "address": { + "city_name": "Some City", + "country_name": "Some Country" + } + }, + "user_education": [{ + "school_name": "XYZ High School", + "degree": "High School Diploma" + }, { + "university_name": "ABC University", + "degree": "Bachelor's" + }] + }); + + snake_to_camel(&mut value); + + assert_eq!( + value, + json!({ + "userDetails": { + "personalInfo": { + "firstName": "John", + "lastName": "Doe" + }, + "address": { + "cityName": "Some City", + "countryName": "Some Country" + } + }, + "userEducation": [{ + "schoolName": "XYZ High School", + "degree": "High School Diploma" + }, { + "universityName": "ABC University", + "degree": "Bachelor's" + }] + }) + ); + } +} diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index 41e2a2d3f..a41775de1 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -20,7 +20,7 @@ export default class RedisClient { SMALLEST_STREAM_ID = '0'; LARGEST_STREAM_ID = '+'; STREAMS_SET_KEY = 'streams'; - STREAMER_MESSAGE_HASH_KEY_BASE = 'streamer:message:'; + STREAMER_MESSAGE_HASH_KEY_BASE = 'streamer_message:'; constructor ( private readonly client: RedisClientType = createClient({ url: process.env.REDIS_CONNECTION_STRING })