Skip to content

Commit

Permalink
Store Real Time Streamer Messages in Redis (#241)
Browse files Browse the repository at this point in the history
The streamer message is used by both the coordinator and runner.
However, both currently poll the message from S3. There is a huge
latency impact for pulling the message from S3. In order to improve
this, the streamer message will now be cached in Redis with a TTL and
pulled by runner from Redis. Only if there is a cache miss will runner
pull from S3 again.

Pulling from S3 currently takes up 200-500ms, which is roughly 80-85% of
the overall execution time of a function in runner. By caching the
message, a cache hit leads to loading the data in 1-3ms in local
testing, which corresponds to about 3-5% of the execution time, or a
1100% improvement in latency. 

The reduction of network related activity
to a much lower percentage of execution time also reduces the variability
of a function's execution time greatly. Cache hits and misses will be
logged for further tuning of TTL to reduce cache misses. In addition,
processing the block takes around 1-3ms. This processing has been moved to
be done before caching, saving an extra 1-3ms each time that block is read from
cache. The improvement there will be important for historical backfill, which is planned to be optimized soon.

Tracking Issue: #262
Parent Issue: #204
  • Loading branch information
darunrs authored Oct 5, 2023
1 parent 9f13148 commit 9678087
Show file tree
Hide file tree
Showing 12 changed files with 450 additions and 229 deletions.
4 changes: 2 additions & 2 deletions block-server/handler.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
const AWS = require('aws-sdk');
const S3= new AWS.S3();
import { S3 } from '@aws-sdk/client-s3';
const S3 = new S3();

const NETWORK = process.env.NETWORK || 'mainnet';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ pub(crate) async fn process_historical_messages(
redis_connection_manager,
storage::generate_historical_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(&indexer_function)?,
None,
)
.await?;
}
Expand Down
13 changes: 12 additions & 1 deletion indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use tokio::sync::{Mutex, MutexGuard};
use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatch};
use near_lake_framework::near_indexer_primitives::types::{AccountId, BlockHeight};
use near_lake_framework::near_indexer_primitives::{types, StreamerMessage};
use utils::serialize_to_camel_case_json_string;

use crate::indexer_types::IndexerFunction;
use indexer_types::{IndexerQueueMessage, IndexerRegistry};
use opts::{Opts, Parser};
use storage::{self, ConnectionManager};
use storage::{self, generate_real_time_streamer_message_key, ConnectionManager};

mod historical_block_processing;
mod indexer_reducer;
Expand Down Expand Up @@ -146,6 +147,15 @@ async fn handle_streamer_message(

let block_height: BlockHeight = context.streamer_message.block.header.height;

// Cache streamer message block and shards for use in real time processing
storage::set(
context.redis_connection_manager,
generate_real_time_streamer_message_key(block_height),
&serialize_to_camel_case_json_string(&context.streamer_message)?,
Some(60),
)
.await?;

let spawned_indexers = indexer_registry::index_registry_changes(
block_height,
&mut indexer_registry_locked,
Expand Down Expand Up @@ -206,6 +216,7 @@ async fn handle_streamer_message(
context.redis_connection_manager,
storage::generate_real_time_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(indexer_function)?,
None,
)
.await?;
storage::xadd(
Expand Down
51 changes: 51 additions & 0 deletions indexer/queryapi_coordinator/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use serde_json::Value;

pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) {
let interval_secs = 10;
let mut previous_processed_blocks: u64 =
Expand Down Expand Up @@ -49,3 +51,52 @@ pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager)
tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
}
}

pub(crate) fn serialize_to_camel_case_json_string(
streamer_message: &near_lake_framework::near_indexer_primitives::StreamerMessage,
) -> anyhow::Result<String, serde_json::Error> {
// Serialize the Message object to a JSON string
let json_str = serde_json::to_string(&streamer_message)?;

// Deserialize the JSON string to a Value Object
let mut message_value: Value = serde_json::from_str(&json_str)?;

// Convert keys to Camel Case
to_camel_case_keys(&mut message_value);

return serde_json::to_string(&message_value);
}

fn to_camel_case_keys(message_value: &mut Value) {
// Only process if subfield contains objects
match message_value {
Value::Object(map) => {
for key in map.keys().cloned().collect::<Vec<String>>() {
// Generate Camel Case Key
let new_key = key
.split("_")
.enumerate()
.map(|(i, str)| {
if i > 0 {
return str[..1].to_uppercase() + &str[1..];
}
return str.to_owned();
})
.collect::<Vec<String>>()
.join("");

// Recursively process inner fields and update map with new key
if let Some(mut val) = map.remove(&key) {
to_camel_case_keys(&mut val);
map.insert(new_key, val);
}
}
}
Value::Array(vec) => {
for val in vec {
to_camel_case_keys(val);
}
}
_ => {}
}
}
31 changes: 24 additions & 7 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs};

const STORAGE: &str = "storage_alertexer";

pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-";
pub const STREAMS_SET_KEY: &str = "streams";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
Expand All @@ -12,6 +13,10 @@ pub fn generate_real_time_stream_key(prefix: &str) -> String {
format!("{}:real_time:stream", prefix)
}

pub fn generate_real_time_streamer_message_key(block_height: u64) -> String {
format!("streamer:message:{}", block_height)
}

pub fn generate_real_time_storage_key(prefix: &str) -> String {
format!("{}:real_time:stream:storage", prefix)
}
Expand Down Expand Up @@ -47,13 +52,19 @@ pub async fn set(
redis_connection_manager: &ConnectionManager,
key: impl ToRedisArgs + std::fmt::Debug,
value: impl ToRedisArgs + std::fmt::Debug,
expiration_seconds: Option<usize>,
) -> anyhow::Result<()> {
redis::cmd("SET")
.arg(&key)
.arg(&value)
.query_async(&mut redis_connection_manager.clone())
let mut cmd = redis::cmd("SET");
cmd.arg(&key).arg(&value);

// Add expiration arguments if present
if let Some(expiration_seconds) = expiration_seconds {
cmd.arg("EX").arg(expiration_seconds);
}

cmd.query_async(&mut redis_connection_manager.clone())
.await?;
tracing::debug!(target: STORAGE, "SET: {:?}: {:?}", key, value,);
tracing::debug!(target: STORAGE, "SET: {:?}: {:?} Ex: {:?}", key, value, expiration_seconds);
Ok(())
}

Expand Down Expand Up @@ -113,7 +124,7 @@ pub async fn push_receipt_to_watching_list(
receipt_id: &str,
cache_value: &[u8],
) -> anyhow::Result<()> {
set(redis_connection_manager, receipt_id, cache_value).await?;
set(redis_connection_manager, receipt_id, cache_value, None).await?;
// redis::cmd("INCR")
// .arg(format!("receipts_{}", transaction_hash))
// .query_async(&mut redis_connection_manager.clone())
Expand Down Expand Up @@ -161,7 +172,13 @@ pub async fn update_last_indexed_block(
redis_connection_manager: &ConnectionManager,
block_height: u64,
) -> anyhow::Result<()> {
set(redis_connection_manager, "last_indexed_block", block_height).await?;
set(
redis_connection_manager,
"last_indexed_block",
block_height,
None,
)
.await?;
redis::cmd("INCR")
.arg("blocks_processed")
.query_async(&mut redis_connection_manager.clone())
Expand Down
2 changes: 1 addition & 1 deletion runner/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
"typescript": "^5.1.6"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.414.0",
"@near-lake/primitives": "^0.1.0",
"aws-sdk": "^2.1402.0",
"express": "^4.18.2",
"node-fetch": "^2.6.11",
"node-sql-parser": "^4.10.0",
Expand Down
Loading

0 comments on commit 9678087

Please sign in to comment.