Skip to content

Commit

Permalink
refactor: Configure coordinator/block-streamer via environment (#503
Browse files Browse the repository at this point in the history
)

- refactor: Configure `coordinator` via environment
- refactor: Configure `block-streamer` via environment
  • Loading branch information
morgsmccauley committed Jan 16, 2024
1 parent 13f7094 commit a4d8c34
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 23 deletions.
7 changes: 5 additions & 2 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ async fn main() -> anyhow::Result<()> {
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();

let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set");
let server_port = std::env::var("SERVER_PORT").expect("SERVER_PORT is not set");

tracing::info!("Starting Block Streamer Service...");

let redis_client = std::sync::Arc::new(redis::RedisClient::connect("redis://127.0.0.1").await?);
let redis_client = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);

let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::Config::from(&aws_config);
Expand All @@ -29,7 +32,7 @@ async fn main() -> anyhow::Result<()> {
let delta_lake_client =
std::sync::Arc::new(crate::delta_lake_client::DeltaLakeClient::new(s3_client));

server::init(redis_client, delta_lake_client, s3_config).await?;
server::init(&server_port, redis_client, delta_lake_client, s3_config).await?;

Ok(())
}
4 changes: 2 additions & 2 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ pub struct RedisClientImpl {

#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {
pub async fn connect(redis_connection_str: &str) -> Result<Self, RedisError> {
let connection = redis::Client::open(redis_connection_str)?
pub async fn connect(redis_url: &str) -> Result<Self, RedisError> {
let connection = redis::Client::open(redis_url)?
.get_tokio_connection_manager()
.await?;

Expand Down
5 changes: 2 additions & 3 deletions block-streamer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ pub mod blockstreamer {
}

pub async fn init(
port: &str,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_config: aws_sdk_s3::Config,
) -> anyhow::Result<()> {
let addr = "[::1]:10000"
.parse()
.expect("Failed to parse RPC socket address");
let addr = format!("[::1]:{}", port).parse()?;

tracing::info!("Starting RPC server at {}", addr);

Expand Down
4 changes: 2 additions & 2 deletions coordinator/src/block_streams_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub struct BlockStreamsHandlerImpl {

#[cfg_attr(test, mockall::automock)]
impl BlockStreamsHandlerImpl {
pub async fn connect() -> anyhow::Result<Self> {
let client = BlockStreamerClient::connect("http://[::1]:10000")
pub async fn connect(block_streamer_url: String) -> anyhow::Result<Self> {
let client = BlockStreamerClient::connect(block_streamer_url)
.await
.context("Unable to connect to Block Streamer")?;

Expand Down
4 changes: 2 additions & 2 deletions coordinator/src/executors_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub struct ExecutorsHandlerImpl {

#[cfg_attr(test, mockall::automock)]
impl ExecutorsHandlerImpl {
pub async fn connect() -> anyhow::Result<Self> {
let client = RunnerClient::connect("http://localhost:50007")
pub async fn connect(runner_url: String) -> anyhow::Result<Self> {
let client = RunnerClient::connect(runner_url)
.await
.context("Unable to connect to Runner")?;

Expand Down
18 changes: 14 additions & 4 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@ async fn main() -> anyhow::Result<()> {
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();

let registry = Registry::connect("https://rpc.mainnet.near.org");
let redis_client = RedisClient::connect("redis://127.0.0.1").await?;
let mut block_streams_handler = BlockStreamsHandler::connect().await?;
let mut executors_handler = ExecutorsHandler::connect().await?;
let rpc_url = std::env::var("RPC_URL").expect("RPC_URL is not set");
let registry_contract_id = std::env::var("REGISTRY_CONTRACT_ID")
.expect("REGISTRY_CONTRACT_ID is not set")
.parse()
.expect("REGISTRY_CONTRACT_ID is not a valid account ID");
let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set");
let block_streamer_url =
std::env::var("BLOCK_STREAMER_URL").expect("BLOCK_STREAMER_URL is not set");
let runner_url = std::env::var("RUNNER_URL").expect("RUNNER_URL is not set");

let registry = Registry::connect(registry_contract_id, &rpc_url);
let redis_client = RedisClient::connect(&redis_url).await?;
let mut block_streams_handler = BlockStreamsHandler::connect(block_streamer_url).await?;
let mut executors_handler = ExecutorsHandler::connect(runner_url).await?;

loop {
let indexer_registry = registry.fetch().await?;
Expand Down
4 changes: 2 additions & 2 deletions coordinator/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub struct RedisClientImpl {

#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {
pub async fn connect(redis_connection_str: &str) -> Result<Self, RedisError> {
let connection = redis::Client::open(redis_connection_str)?
pub async fn connect(redis_url: &str) -> Result<Self, RedisError> {
let connection = redis::Client::open(redis_url)?
.get_connection_manager()
.await?;

Expand Down
13 changes: 7 additions & 6 deletions coordinator/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ pub use RegistryImpl as Registry;

pub struct RegistryImpl {
json_rpc_client: JsonRpcClient,
registry_contract_id: AccountId,
}

#[cfg_attr(test, mockall::automock)]
impl RegistryImpl {
pub fn connect(rpc_url: &str) -> Self {
pub fn connect(registry_contract_id: AccountId, rpc_url: &str) -> Self {
let json_rpc_client = JsonRpcClient::connect(rpc_url);

Self { json_rpc_client }
Self {
registry_contract_id,
json_rpc_client,
}
}

fn enrich_indexer_registry(
Expand Down Expand Up @@ -87,10 +91,7 @@ impl RegistryImpl {
block_reference: BlockReference::Finality(Finality::Final),
request: QueryRequest::CallFunction {
method_name: "list_indexer_functions".to_string(),
account_id: "dev-queryapi.dataplatform.near"
.to_string()
.try_into()
.unwrap(),
account_id: self.registry_contract_id.clone(),
args: FunctionArgs::from("{}".as_bytes().to_vec()),
},
})
Expand Down

0 comments on commit a4d8c34

Please sign in to comment.