diff --git a/coordinator/Cargo.lock b/coordinator/Cargo.lock index bd1246aa8..d8b34629e 100644 --- a/coordinator/Cargo.lock +++ b/coordinator/Cargo.lock @@ -727,7 +727,7 @@ dependencies = [ "mockall", "near-lake-framework", "prost 0.12.3", - "redis", + "redis 0.21.7", "registry-types", "serde", "serde_json", @@ -986,13 +986,16 @@ version = "0.1.0" dependencies = [ "anyhow", "block-streamer", + "mockall", "near-jsonrpc-client", "near-jsonrpc-primitives", "near-primitives", + "redis 0.24.0", "registry-types", "serde_json", "tokio", "tonic 0.10.2", + "tracing", ] [[package]] @@ -2954,6 +2957,30 @@ dependencies = [ "url", ] +[[package]] +name = "redis" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "futures", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2", + "tokio", + "tokio-retry", + "tokio-util 0.7.10", + "url", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -3728,6 +3755,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 1ba7ff799..deb5e2fe6 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -5,13 +5,18 @@ edition = "2021" [dependencies] anyhow = "1.0.75" +redis = { version = "0.24", features = ["tokio-comp", "connection-manager"] } tokio = "1.28" tonic = "0.10.2" +tracing = "0.1.40" serde_json = "1.0.108" -registry-types = { path = "../registry/types" } block-streamer = { path = "../block-streamer" } +registry-types = { path = "../registry/types" } near-jsonrpc-client = "0.6.0" near-primitives = "0.17.0" near-jsonrpc-primitives = "0.17.0" + +[dev-dependencies] +mockall = "0.11.4" diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 617f77238..f041db2e3 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -6,10 +6,12 @@ use block_streamer::block_streamer_client::BlockStreamerClient; use block_streamer::{start_stream_request::Rule, ActionAnyRule, StartStreamRequest, Status}; use registry::IndexerConfig; +mod redis; mod registry; async fn start_stream( block_streamer_client: &mut BlockStreamerClient, + redis_client: &redis::RedisClient, indexer_config: &IndexerConfig, ) -> anyhow::Result<()> { let rule = match &indexer_config.filter.matching_rule { @@ -27,9 +29,25 @@ async fn start_stream( _ => anyhow::bail!("Encountered unsupported indexer rule"), }; + let start_block_height = if let Some(start_block_height) = indexer_config.start_block_height { + start_block_height + } else if let Ok(last_indexed_block) = redis_client + .get::(format!( + "{}:last_indexed_block", + indexer_config.get_full_name() + )) + .await + { + last_indexed_block + } else if let Some(updated_at_block_height) = indexer_config.updated_at_block_height { + updated_at_block_height + } else { + indexer_config.created_at_block_height + }; + let _ = block_streamer_client .start_stream(Request::new(StartStreamRequest { - start_block_height: 106700000, + start_block_height, account_id: indexer_config.account_id.to_string(), function_name: indexer_config.function_name.to_string(), rule: Some(rule), @@ -42,13 +60,14 @@ async fn start_stream( #[tokio::main] async fn main() -> anyhow::Result<()> { let json_rpc_client = JsonRpcClient::connect("https://rpc.mainnet.near.org"); + let redis_client = redis::RedisClient::connect("redis://127.0.0.1").await?; let mut block_streamer_client = BlockStreamerClient::connect("http://[::1]:10000").await?; let registry = registry::fetch_registry(&json_rpc_client).await?; for indexers in registry.values() { for indexer_config in indexers.values() { - start_stream(&mut block_streamer_client, indexer_config).await?; + start_stream(&mut block_streamer_client, &redis_client, indexer_config).await?; } } diff --git a/coordinator/src/redis.rs b/coordinator/src/redis.rs new file mode 100644 index 000000000..fd40adb1f --- /dev/null +++ b/coordinator/src/redis.rs @@ -0,0 +1,36 @@ +use std::fmt::Debug; + +use redis::{aio::ConnectionManager, FromRedisValue, RedisError, ToRedisArgs}; + +#[cfg(test)] +pub use MockRedisClientImpl as RedisClient; +#[cfg(not(test))] +pub use RedisClientImpl as RedisClient; + +pub struct RedisClientImpl { + connection: ConnectionManager, +} + +#[cfg_attr(test, mockall::automock)] +impl RedisClientImpl { + pub async fn connect(redis_connection_str: &str) -> Result { + let connection = redis::Client::open(redis_connection_str)? + .get_connection_manager() + .await?; + + Ok(Self { connection }) + } + + pub async fn get(&self, key: T) -> Result + where + T: ToRedisArgs + Debug + Send + Sync + 'static, + U: FromRedisValue + Send + Sync + 'static, + { + tracing::debug!("GET: {:?}", key); + + redis::cmd("GET") + .arg(key) + .query_async(&mut self.connection.clone()) + .await + } +}