diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index b62c0fad6..a67f5653a 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -11,7 +11,7 @@ use utils::serialize_to_camel_case_json_string; use crate::indexer_types::IndexerFunction; use indexer_types::{IndexerQueueMessage, IndexerRegistry}; use opts::{Opts, Parser}; -use storage::{self, ConnectionManager}; +use storage::{self, generate_real_time_streamer_message_key, ConnectionManager}; mod historical_block_processing; mod indexer_reducer; @@ -150,11 +150,7 @@ async fn handle_streamer_message( // Cache streamer message block and shards for use in real time processing storage::set( context.redis_connection_manager, - format!( - "{}{}", - storage::STREAMER_MESSAGE_HASH_KEY_BASE, - block_height - ), + generate_real_time_streamer_message_key(block_height), &serialize_to_camel_case_json_string(&context.streamer_message)?, Some(60), ) diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 5d0557b11..a2d20b850 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -4,7 +4,6 @@ const STORAGE: &str = "storage_alertexer"; pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; pub const STREAMS_SET_KEY: &str = "streams"; -pub const STREAMER_MESSAGE_HASH_KEY_BASE: &str = "streamer:message:"; pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") @@ -14,6 +13,10 @@ pub fn generate_real_time_stream_key(prefix: &str) -> String { format!("{}:real_time:stream", prefix) } +pub fn generate_real_time_streamer_message_key(block_height: u64) -> String { + format!("streamer:message:{}", block_height) +} + pub fn generate_real_time_storage_key(prefix: &str) -> String { format!("{}:real_time:stream:storage", prefix) } @@ -49,23 +52,19 @@ pub async fn set( redis_connection_manager: &ConnectionManager, key: impl ToRedisArgs + std::fmt::Debug, value: impl ToRedisArgs + std::fmt::Debug, - expiration: Option, + expiration_seconds: Option, ) -> anyhow::Result<()> { let mut cmd = redis::cmd("SET"); cmd.arg(&key).arg(&value); // Add expiration arguments if present - let exp_to_print: String; - if let Some(expiration) = expiration { - cmd.arg("EX").arg(expiration); - exp_to_print = format!("EX {}", expiration); - } else { - exp_to_print = "".to_string(); + if let Some(expiration_seconds) = expiration_seconds { + cmd.arg("EX").arg(expiration_seconds); } cmd.query_async(&mut redis_connection_manager.clone()) .await?; - tracing::debug!(target: STORAGE, "SET: {:?}: {:?} {:?}", key, value, exp_to_print); + tracing::debug!(target: STORAGE, "SET: {:?}: {:?} Ex: {:?}", key, value, expiration_seconds); Ok(()) }