Skip to content

Commit

Permalink
refactor: Abstract block stream handling away
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Dec 18, 2023
1 parent f397950 commit 784368b
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 56 deletions.
56 changes: 56 additions & 0 deletions coordinator/src/block_stream_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use tonic::transport::channel::Channel;
use tonic::Request;

use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::{start_stream_request::Rule, ActionAnyRule, StartStreamRequest, Status};

// This will be good for abstracting away the 'transport' layer, but also provide a struct which
// can be mocked, making for easy testing
pub struct BlockStreamHandler {
block_streamer_client: BlockStreamerClient<Channel>,
}

impl BlockStreamHandler {
pub async fn connect() -> anyhow::Result<Self> {
let block_streamer_client = BlockStreamerClient::connect("http://[::1]:10000").await?;

Ok(Self {
block_streamer_client,
})
}

pub async fn start(
&mut self,
start_block_height: u64,
account_id: String,
function_name: String,
rule: registry_types::MatchingRule,
) -> anyhow::Result<()> {
let rule = match &rule {
registry_types::MatchingRule::ActionAny {
affected_account_id,
status,
} => Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: affected_account_id.to_owned(),
status: match status {
registry_types::Status::Success => Status::Success.into(),
registry_types::Status::Fail => Status::Failure.into(),
registry_types::Status::Any => Status::Any.into(),
},
}),
_ => anyhow::bail!("Encountered unsupported indexer rule"),
};

let _ = self
.block_streamer_client
.start_stream(Request::new(StartStreamRequest {
start_block_height,
account_id,
function_name,
rule: Some(rule),
}))
.await?;

Ok(())
}
}
84 changes: 28 additions & 56 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,73 +1,45 @@
use near_jsonrpc_client::JsonRpcClient;
use tonic::transport::channel::Channel;
use tonic::Request;

use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::{start_stream_request::Rule, ActionAnyRule, StartStreamRequest, Status};
use registry::IndexerConfig;

mod block_stream_handler;
mod redis;
mod registry;

async fn start_stream(
block_streamer_client: &mut BlockStreamerClient<Channel>,
redis_client: &redis::RedisClient,
indexer_config: &IndexerConfig,
) -> anyhow::Result<()> {
let rule = match &indexer_config.filter.matching_rule {
registry_types::MatchingRule::ActionAny {
affected_account_id,
status,
} => Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: affected_account_id.to_owned(),
status: match status {
registry_types::Status::Success => Status::Success.into(),
registry_types::Status::Fail => Status::Failure.into(),
registry_types::Status::Any => Status::Any.into(),
},
}),
_ => anyhow::bail!("Encountered unsupported indexer rule"),
};

let start_block_height = if let Some(start_block_height) = indexer_config.start_block_height {
start_block_height
} else if let Ok(last_indexed_block) = redis_client
.get::<String, u64>(format!(
"{}:last_indexed_block",
indexer_config.get_full_name()
))
.await
{
last_indexed_block
} else if let Some(updated_at_block_height) = indexer_config.updated_at_block_height {
updated_at_block_height
} else {
indexer_config.created_at_block_height
};

let _ = block_streamer_client
.start_stream(Request::new(StartStreamRequest {
start_block_height,
account_id: indexer_config.account_id.to_string(),
function_name: indexer_config.function_name.to_string(),
rule: Some(rule),
}))
.await?;

Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let json_rpc_client = JsonRpcClient::connect("https://rpc.mainnet.near.org");
let redis_client = redis::RedisClient::connect("redis://127.0.0.1").await?;
let mut block_streamer_client = BlockStreamerClient::connect("http://[::1]:10000").await?;
let mut block_stream_handler = block_stream_handler::BlockStreamHandler::connect().await?;

let registry = registry::fetch_registry(&json_rpc_client).await?;

for indexers in registry.values() {
for indexer_config in indexers.values() {
start_stream(&mut block_streamer_client, &redis_client, indexer_config).await?;
let start_block_height = if let Some(start_block_height) =
indexer_config.start_block_height
{
start_block_height
} else if let Ok(last_indexed_block) = redis_client
.get::<String, u64>(format!(
"{}:last_indexed_block",
indexer_config.get_full_name()
))
.await
{
last_indexed_block
} else if let Some(updated_at_block_height) = indexer_config.updated_at_block_height {
updated_at_block_height
} else {
indexer_config.created_at_block_height
};

block_stream_handler
.start(
start_block_height,
indexer_config.account_id.to_string(),
indexer_config.function_name.clone(),
indexer_config.filter.matching_rule.clone(),
)
.await?;
}
}

Expand Down

0 comments on commit 784368b

Please sign in to comment.