From 0f2df830ad4b14349f57f5cea9f1d561b580a25c Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Wed, 2 Aug 2023 08:35:02 +1200 Subject: [PATCH] Revert "Revert "DPLT-1074 Queue real-time messages on Redis Streams" (#160)" This reverts commit 18d0f9f5d20e33de1fc645d9c32911016b850c74. --- .../queryapi_coordinator/src/indexer_types.rs | 6 ++ indexer/queryapi_coordinator/src/main.rs | 16 +++++- indexer/storage/src/lib.rs | 56 +++++++++++++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) diff --git a/indexer/queryapi_coordinator/src/indexer_types.rs b/indexer/queryapi_coordinator/src/indexer_types.rs index 4a4993ce5..f3880320b 100644 --- a/indexer/queryapi_coordinator/src/indexer_types.rs +++ b/indexer/queryapi_coordinator/src/indexer_types.rs @@ -40,3 +40,9 @@ pub struct IndexerFunction { pub provisioned: bool, pub indexer_rule: IndexerRule, } + +impl IndexerFunction { + pub fn get_full_name(&self) -> String { + format!("{}/{}", self.account_id, self.function_name) + } +} diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index e447f3316..6cd7398af 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -10,7 +10,7 @@ use near_lake_framework::near_indexer_primitives::{types, StreamerMessage}; use crate::indexer_types::IndexerFunction; use indexer_types::{IndexerQueueMessage, IndexerRegistry}; use opts::{Opts, Parser}; -use storage::ConnectionManager; +use storage::{self, ConnectionManager}; pub(crate) mod cache; mod historical_block_processing; @@ -195,6 +195,20 @@ async fn handle_streamer_message( if !indexer_function.provisioned { set_provisioned_flag(&mut indexer_registry_locked, &indexer_function); } + + storage::set( + context.redis_connection_manager, + &format!("{}:storage", indexer_function.get_full_name()), + serde_json::to_string(indexer_function)?, + ) + .await?; + + storage::add_to_registered_stream( + context.redis_connection_manager, + &format!("{}:stream", indexer_function.get_full_name()), + &[("block_height", block_height)], + ) + .await? } stream::iter(indexer_function_messages.into_iter()) diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 59e8c8b04..332d1c789 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -2,6 +2,8 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; const STORAGE: &str = "storage_alertexer"; +const STREAMS_SET_KEY: &str = "streams"; + pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") } @@ -50,6 +52,60 @@ pub async fn get( tracing::debug!(target: STORAGE, "GET: {:?}: {:?}", &key, &value,); Ok(value) } + +async fn sadd( + redis_connection_manager: &ConnectionManager, + key: impl ToRedisArgs + std::fmt::Debug, + value: impl ToRedisArgs + std::fmt::Debug, +) -> anyhow::Result<()> { + tracing::debug!(target: STORAGE, "SADD: {:?}: {:?}", key, value); + + redis::cmd("SADD") + .arg(key) + .arg(value) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(()) +} + +async fn xadd( + redis_connection_manager: &ConnectionManager, + stream_key: &str, + fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], +) -> anyhow::Result<()> { + tracing::debug!(target: STORAGE, "XADD: {}, {:?}", stream_key, fields); + + // TODO: Remove stream cap when we finally start processing it + redis::cmd("XTRIM") + .arg("MAXLEN") + .arg(100) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + let mut cmd = redis::cmd("XADD"); + cmd.arg(stream_key).arg("*"); + + for (field, value) in fields { + cmd.arg(*field).arg(value); + } + + cmd.query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(()) +} + +pub async fn add_to_registered_stream( + redis_connection_manager: &ConnectionManager, + key: &str, + fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], +) -> anyhow::Result<()> { + sadd(redis_connection_manager, STREAMS_SET_KEY, key).await?; + xadd(redis_connection_manager, key, fields).await?; + + Ok(()) +} /// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage. /// Increments the counter `receipts_{transaction_hash}` by one. /// The counter holds how many Receipts related to the Transaction are in watching list