Skip to content

Commit

Permalink
Revert "Revert "DPLT-1074 Queue real-time messages on Redis Streams" (#…
Browse files Browse the repository at this point in the history
…160)"

This reverts commit 18d0f9f.
  • Loading branch information
morgsmccauley authored Aug 1, 2023
1 parent 18d0f9f commit 0f2df83
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 1 deletion.
6 changes: 6 additions & 0 deletions indexer/queryapi_coordinator/src/indexer_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,9 @@ pub struct IndexerFunction {
pub provisioned: bool,
pub indexer_rule: IndexerRule,
}

impl IndexerFunction {
pub fn get_full_name(&self) -> String {
format!("{}/{}", self.account_id, self.function_name)
}
}
16 changes: 15 additions & 1 deletion indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use near_lake_framework::near_indexer_primitives::{types, StreamerMessage};
use crate::indexer_types::IndexerFunction;
use indexer_types::{IndexerQueueMessage, IndexerRegistry};
use opts::{Opts, Parser};
use storage::ConnectionManager;
use storage::{self, ConnectionManager};

pub(crate) mod cache;
mod historical_block_processing;
Expand Down Expand Up @@ -195,6 +195,20 @@ async fn handle_streamer_message(
if !indexer_function.provisioned {
set_provisioned_flag(&mut indexer_registry_locked, &indexer_function);
}

storage::set(
context.redis_connection_manager,
&format!("{}:storage", indexer_function.get_full_name()),
serde_json::to_string(indexer_function)?,
)
.await?;

storage::add_to_registered_stream(
context.redis_connection_manager,
&format!("{}:stream", indexer_function.get_full_name()),
&[("block_height", block_height)],
)
.await?
}

stream::iter(indexer_function_messages.into_iter())
Expand Down
56 changes: 56 additions & 0 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs};

const STORAGE: &str = "storage_alertexer";

const STREAMS_SET_KEY: &str = "streams";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
redis::Client::open(redis_connection_str).expect("can create redis client")
}
Expand Down Expand Up @@ -50,6 +52,60 @@ pub async fn get<V: FromRedisValue + std::fmt::Debug>(
tracing::debug!(target: STORAGE, "GET: {:?}: {:?}", &key, &value,);
Ok(value)
}

async fn sadd(
redis_connection_manager: &ConnectionManager,
key: impl ToRedisArgs + std::fmt::Debug,
value: impl ToRedisArgs + std::fmt::Debug,
) -> anyhow::Result<()> {
tracing::debug!(target: STORAGE, "SADD: {:?}: {:?}", key, value);

redis::cmd("SADD")
.arg(key)
.arg(value)
.query_async(&mut redis_connection_manager.clone())
.await?;

Ok(())
}

async fn xadd(
redis_connection_manager: &ConnectionManager,
stream_key: &str,
fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)],
) -> anyhow::Result<()> {
tracing::debug!(target: STORAGE, "XADD: {}, {:?}", stream_key, fields);

// TODO: Remove stream cap when we finally start processing it
redis::cmd("XTRIM")
.arg("MAXLEN")
.arg(100)
.query_async(&mut redis_connection_manager.clone())
.await?;

let mut cmd = redis::cmd("XADD");
cmd.arg(stream_key).arg("*");

for (field, value) in fields {
cmd.arg(*field).arg(value);
}

cmd.query_async(&mut redis_connection_manager.clone())
.await?;

Ok(())
}

pub async fn add_to_registered_stream(
redis_connection_manager: &ConnectionManager,
key: &str,
fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)],
) -> anyhow::Result<()> {
sadd(redis_connection_manager, STREAMS_SET_KEY, key).await?;
xadd(redis_connection_manager, key, fields).await?;

Ok(())
}
/// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage.
/// Increments the counter `receipts_{transaction_hash}` by one.
/// The counter holds how many Receipts related to the Transaction are in watching list
Expand Down

0 comments on commit 0f2df83

Please sign in to comment.