Skip to content

Commit

Permalink
Merge branch 'main' into dplt-1123_new_index_folder_structure
Browse files Browse the repository at this point in the history
  • Loading branch information
gabehamilton authored Aug 3, 2023
2 parents a829582 + 332b4a5 commit 5d9d1d4
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 2 deletions.
6 changes: 6 additions & 0 deletions indexer/queryapi_coordinator/src/indexer_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,9 @@ pub struct IndexerFunction {
pub provisioned: bool,
pub indexer_rule: IndexerRule,
}

impl IndexerFunction {
pub fn get_full_name(&self) -> String {
format!("{}/{}", self.account_id, self.function_name)
}
}
22 changes: 20 additions & 2 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use near_lake_framework::near_indexer_primitives::{types, StreamerMessage};
use crate::indexer_types::IndexerFunction;
use indexer_types::{IndexerQueueMessage, IndexerRegistry};
use opts::{Opts, Parser};
use storage::ConnectionManager;
use storage::{self, ConnectionManager};

pub(crate) mod cache;
mod historical_block_processing;
Expand Down Expand Up @@ -111,7 +111,11 @@ async fn main() -> anyhow::Result<()> {
})
.buffer_unordered(1usize);

while let Some(_handle_message) = handlers.next().await {}
while let Some(handle_message) = handlers.next().await {
if let Err(err) = handle_message {
tracing::error!(target: INDEXER, "{:#?}", err);
}
}
drop(handlers); // close the channel so the sender will stop

// propagate errors from the sender
Expand Down Expand Up @@ -195,6 +199,20 @@ async fn handle_streamer_message(
if !indexer_function.provisioned {
set_provisioned_flag(&mut indexer_registry_locked, &indexer_function);
}

storage::set(
context.redis_connection_manager,
&format!("{}:storage", indexer_function.get_full_name()),
serde_json::to_string(indexer_function)?,
)
.await?;

storage::add_to_registered_stream(
context.redis_connection_manager,
&format!("{}:stream", indexer_function.get_full_name()),
&[("block_height", block_height)],
)
.await?;
}

stream::iter(indexer_function_messages.into_iter())
Expand Down
57 changes: 57 additions & 0 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs};

const STORAGE: &str = "storage_alertexer";

const STREAMS_SET_KEY: &str = "streams";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
redis::Client::open(redis_connection_str).expect("can create redis client")
}
Expand Down Expand Up @@ -50,6 +52,61 @@ pub async fn get<V: FromRedisValue + std::fmt::Debug>(
tracing::debug!(target: STORAGE, "GET: {:?}: {:?}", &key, &value,);
Ok(value)
}

async fn sadd(
redis_connection_manager: &ConnectionManager,
key: impl ToRedisArgs + std::fmt::Debug,
value: impl ToRedisArgs + std::fmt::Debug,
) -> anyhow::Result<()> {
tracing::debug!(target: STORAGE, "SADD: {:?}: {:?}", key, value);

redis::cmd("SADD")
.arg(key)
.arg(value)
.query_async(&mut redis_connection_manager.clone())
.await?;

Ok(())
}

async fn xadd(
redis_connection_manager: &ConnectionManager,
stream_key: &str,
fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)],
) -> anyhow::Result<()> {
tracing::debug!(target: STORAGE, "XADD: {}, {:?}", stream_key, fields);

// TODO: Remove stream cap when we finally start processing it
redis::cmd("XTRIM")
.arg(stream_key)
.arg("MAXLEN")
.arg(100)
.query_async(&mut redis_connection_manager.clone())
.await?;

let mut cmd = redis::cmd("XADD");
cmd.arg(stream_key).arg("*");

for (field, value) in fields {
cmd.arg(*field).arg(value);
}

cmd.query_async(&mut redis_connection_manager.clone())
.await?;

Ok(())
}

pub async fn add_to_registered_stream(
redis_connection_manager: &ConnectionManager,
key: &str,
fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)],
) -> anyhow::Result<()> {
sadd(redis_connection_manager, STREAMS_SET_KEY, key).await?;
xadd(redis_connection_manager, key, fields).await?;

Ok(())
}
/// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage.
/// Increments the counter `receipts_{transaction_hash}` by one.
/// The counter holds how many Receipts related to the Transaction are in watching list
Expand Down

0 comments on commit 5d9d1d4

Please sign in to comment.