Skip to content

Commit

Permalink
feat: Cache Streamer Message from Block Streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Feb 26, 2024
1 parent 9079a57 commit e894d17
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 3 deletions.
4 changes: 4 additions & 0 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ async fn process_near_lake_blocks(
);

if !matches.is_empty() {
redis_client
.cache_streamer_message(&streamer_message)
.await?;

redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
.await?;
Expand Down
1 change: 1 addition & 0 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod redis;
mod rules;
mod s3_client;
mod server;
mod utils;

#[cfg(test)]
mod test_utils;
Expand Down
36 changes: 34 additions & 2 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
use std::fmt::Debug;

use anyhow::Context;
use redis::{aio::ConnectionManager, RedisError, ToRedisArgs};
use redis::{aio::ConnectionManager, AsyncCommands, RedisError, ToRedisArgs};

use crate::indexer_config::IndexerConfig;
use crate::metrics;
use crate::utils;

#[cfg(test)]
pub use MockRedisClientImpl as RedisClient;
Expand All @@ -19,6 +20,8 @@ pub struct RedisClientImpl {

#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {
const STREAMER_MESSAGE_PREFIX: &'static str = "streamer_message:";

pub async fn connect(redis_url: &str) -> Result<Self, RedisError> {
let connection = redis::Client::open(redis_url)?
.get_tokio_connection_manager()
Expand Down Expand Up @@ -56,7 +59,17 @@ impl RedisClientImpl {
let mut cmd = redis::cmd("SET");
cmd.arg(key).arg(value);

cmd.query_async(&mut self.connection.clone()).await?;
Ok(())
}

pub async fn set_ex<T, U>(&self, key: T, value: U, expiry: usize) -> Result<(), RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
U: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("SET: {:?}, {:?}", key, value);

self.connection.clone().set_ex(key, value, expiry).await?;

Ok(())
}
Expand All @@ -83,6 +96,25 @@ impl RedisClientImpl {
.context("Failed to set last processed block")
}

pub async fn cache_streamer_message(
&self,
streamer_message: &near_lake_framework::near_indexer_primitives::StreamerMessage,
) -> anyhow::Result<()> {
let height = streamer_message.block.header.height;

let mut streamer_message = serde_json::to_value(streamer_message)?;

utils::snake_to_camel(&mut streamer_message);

self.set_ex(
format!("{}{}", Self::STREAMER_MESSAGE_PREFIX, height),
serde_json::to_string(&streamer_message)?,
60,
)
.await
.context("Failed to cache streamer message")
}

pub async fn publish_block(
&self,
indexer: &IndexerConfig,
Expand Down
148 changes: 148 additions & 0 deletions block-streamer/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
pub fn snake_to_camel(value: &mut serde_json::value::Value) {
match value {
serde_json::value::Value::Object(map) => {
for key in map.keys().cloned().collect::<Vec<String>>() {
let new_key = key
.split('_')
.enumerate()
.map(|(i, str)| {
if i > 0 {
return str[..1].to_uppercase() + &str[1..];
}
str.to_owned()
})
.collect::<Vec<String>>()
.join("");

if let Some(mut val) = map.remove(&key) {
snake_to_camel(&mut val);
map.insert(new_key, val);
}
}
}
serde_json::value::Value::Array(vec) => {
for val in vec {
snake_to_camel(val);
}
}
_ => {}
}
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;

#[test]
fn flat() {
let mut value = json!({
"first_name": "John",
"last_name": "Doe"
});

snake_to_camel(&mut value);

assert_eq!(
value,
json!({
"firstName": "John",
"lastName": "Doe"
})
);
}

#[test]
fn nested() {
let mut value = json!({
"user_details": {
"first_name": "John",
"last_name": "Doe"
}
});

snake_to_camel(&mut value);

assert_eq!(
value,
json!({
"userDetails": {
"firstName": "John",
"lastName": "Doe"
}
})
);
}

#[test]
fn array() {
let mut value = json!([{
"first_name": "John",
"last_name": "Doe"
}, {
"first_name": "Jane",
"last_name": "Doe"
}]);

snake_to_camel(&mut value);

assert_eq!(
value,
json!([{
"firstName": "John",
"lastName": "Doe"
}, {
"firstName": "Jane",
"lastName": "Doe"
}])
);
}

#[test]
fn nested_and_array() {
let mut value = json!({
"user_details": {
"personal_info": {
"first_name": "John",
"last_name": "Doe"
},
"address": {
"city_name": "Some City",
"country_name": "Some Country"
}
},
"user_education": [{
"school_name": "XYZ High School",
"degree": "High School Diploma"
}, {
"university_name": "ABC University",
"degree": "Bachelor's"
}]
});

snake_to_camel(&mut value);

assert_eq!(
value,
json!({
"userDetails": {
"personalInfo": {
"firstName": "John",
"lastName": "Doe"
},
"address": {
"cityName": "Some City",
"countryName": "Some Country"
}
},
"userEducation": [{
"schoolName": "XYZ High School",
"degree": "High School Diploma"
}, {
"universityName": "ABC University",
"degree": "Bachelor's"
}]
})
);
}
}
2 changes: 1 addition & 1 deletion runner/src/redis-client/redis-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export default class RedisClient {
SMALLEST_STREAM_ID = '0';
LARGEST_STREAM_ID = '+';
STREAMS_SET_KEY = 'streams';
STREAMER_MESSAGE_HASH_KEY_BASE = 'streamer:message:';
STREAMER_MESSAGE_HASH_KEY_BASE = 'streamer_message:';

constructor (
private readonly client: RedisClientType = createClient({ url: process.env.REDIS_CONNECTION_STRING })
Expand Down

0 comments on commit e894d17

Please sign in to comment.