Skip to content

Commit

Permalink
feat: Start block stream from desired height
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Dec 18, 2023
1 parent bfedf1e commit 8bab9ba
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 4 deletions.
40 changes: 39 additions & 1 deletion coordinator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ edition = "2021"

[dependencies]
anyhow = "1.0.75"
redis = { version = "0.24", features = ["tokio-comp", "connection-manager"] }
tokio = "1.28"
tonic = "0.10.2"
tracing = "0.1.40"
serde_json = "1.0.108"

registry-types = { path = "../registry/types" }
block-streamer = { path = "../block-streamer" }
registry-types = { path = "../registry/types" }

near-jsonrpc-client = "0.6.0"
near-primitives = "0.17.0"
near-jsonrpc-primitives = "0.17.0"

[dev-dependencies]
mockall = "0.11.4"
23 changes: 21 additions & 2 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::{start_stream_request::Rule, ActionAnyRule, StartStreamRequest, Status};
use registry::IndexerConfig;

mod redis;
mod registry;

async fn start_stream(
block_streamer_client: &mut BlockStreamerClient<Channel>,
redis_client: &redis::RedisClient,
indexer_config: &IndexerConfig,
) -> anyhow::Result<()> {
let rule = match &indexer_config.filter.matching_rule {
Expand All @@ -27,9 +29,25 @@ async fn start_stream(
_ => anyhow::bail!("Encountered unsupported indexer rule"),
};

let start_block_height = if let Some(start_block_height) = indexer_config.start_block_height {
start_block_height
} else if let Ok(last_indexed_block) = redis_client
.get::<String, u64>(format!(
"{}:last_indexed_block",
indexer_config.get_full_name()
))
.await
{
last_indexed_block
} else if let Some(updated_at_block_height) = indexer_config.updated_at_block_height {
updated_at_block_height
} else {
indexer_config.created_at_block_height
};

let _ = block_streamer_client
.start_stream(Request::new(StartStreamRequest {
start_block_height: 106700000,
start_block_height,
account_id: indexer_config.account_id.to_string(),
function_name: indexer_config.function_name.to_string(),
rule: Some(rule),
Expand All @@ -42,13 +60,14 @@ async fn start_stream(
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let json_rpc_client = JsonRpcClient::connect("https://rpc.mainnet.near.org");
let redis_client = redis::RedisClient::connect("redis://127.0.0.1").await?;
let mut block_streamer_client = BlockStreamerClient::connect("http://[::1]:10000").await?;

let registry = registry::fetch_registry(&json_rpc_client).await?;

for indexers in registry.values() {
for indexer_config in indexers.values() {
start_stream(&mut block_streamer_client, indexer_config).await?;
start_stream(&mut block_streamer_client, &redis_client, indexer_config).await?;
}
}

Expand Down

0 comments on commit 8bab9ba

Please sign in to comment.