Skip to content

Commit

Permalink
Address some more PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Oct 5, 2023
1 parent 498ff1f commit cc018d0
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 14 deletions.
6 changes: 1 addition & 5 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,7 @@ async fn handle_streamer_message(
// Cache streamer message block and shards for use in real time processing
storage::set(
context.redis_connection_manager,
format!(
"{}{}",
storage::STREAMER_MESSAGE_HASH_KEY_BASE,
block_height
),
generate_real_time_streamer_message_key(block_height),
&serialize_to_camel_case_json_string(&context.streamer_message)?,
Some(60),
)
Expand Down
17 changes: 8 additions & 9 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const STORAGE: &str = "storage_alertexer";

pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-";
pub const STREAMS_SET_KEY: &str = "streams";
pub const STREAMER_MESSAGE_HASH_KEY_BASE: &str = "streamer:message:";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
redis::Client::open(redis_connection_str).expect("can create redis client")
Expand All @@ -14,6 +13,10 @@ pub fn generate_real_time_stream_key(prefix: &str) -> String {
format!("{}:real_time:stream", prefix)
}

pub fn generate_real_time_streamer_message_key(block_height: u64) -> String {
format!("streamer:message:{}", block_height)
}

pub fn generate_real_time_storage_key(prefix: &str) -> String {
format!("{}:real_time:stream:storage", prefix)
}
Expand Down Expand Up @@ -49,23 +52,19 @@ pub async fn set(
redis_connection_manager: &ConnectionManager,
key: impl ToRedisArgs + std::fmt::Debug,
value: impl ToRedisArgs + std::fmt::Debug,
expiration: Option<usize>,
expiration_seconds: Option<usize>,
) -> anyhow::Result<()> {
let mut cmd = redis::cmd("SET");
cmd.arg(&key).arg(&value);

// Add expiration arguments if present
let exp_to_print: String;
if let Some(expiration) = expiration {
cmd.arg("EX").arg(expiration);
exp_to_print = format!("EX {}", expiration);
} else {
exp_to_print = "".to_string();
if let Some(expiration_seconds) = expiration_seconds {
cmd.arg("EX").arg(expiration_seconds);
}

cmd.query_async(&mut redis_connection_manager.clone())
.await?;
tracing::debug!(target: STORAGE, "SET: {:?}: {:?} {:?}", key, value, exp_to_print);
tracing::debug!(target: STORAGE, "SET: {:?}: {:?} Ex: {:?}", key, value, expiration_seconds);
Ok(())
}

Expand Down

0 comments on commit cc018d0

Please sign in to comment.