From a4d8c3402322400ff03916b0c22aa082052be280 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Wed, 17 Jan 2024 09:36:43 +1300 Subject: [PATCH] refactor: Configure `coordinator`/`block-streamer` via environment (#503) - refactor: Configure `coordinator` via environment - refactor: Configure `block-streamer` via environment --- block-streamer/src/main.rs | 7 +++++-- block-streamer/src/redis.rs | 4 ++-- block-streamer/src/server/mod.rs | 5 ++--- coordinator/src/block_streams_handler.rs | 4 ++-- coordinator/src/executors_handler.rs | 4 ++-- coordinator/src/main.rs | 18 ++++++++++++++---- coordinator/src/redis.rs | 4 ++-- coordinator/src/registry.rs | 13 +++++++------ 8 files changed, 36 insertions(+), 23 deletions(-) diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index 38560737b..39d114a55 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -18,9 +18,12 @@ async fn main() -> anyhow::Result<()> { .with(tracing_subscriber::EnvFilter::from_default_env()) .init(); + let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set"); + let server_port = std::env::var("SERVER_PORT").expect("SERVER_PORT is not set"); + tracing::info!("Starting Block Streamer Service..."); - let redis_client = std::sync::Arc::new(redis::RedisClient::connect("redis://127.0.0.1").await?); + let redis_client = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?); let aws_config = aws_config::from_env().load().await; let s3_config = aws_sdk_s3::Config::from(&aws_config); @@ -29,7 +32,7 @@ async fn main() -> anyhow::Result<()> { let delta_lake_client = std::sync::Arc::new(crate::delta_lake_client::DeltaLakeClient::new(s3_client)); - server::init(redis_client, delta_lake_client, s3_config).await?; + server::init(&server_port, redis_client, delta_lake_client, s3_config).await?; Ok(()) } diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index 583742bbc..dfaee7d78 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -13,8 +13,8 @@ pub struct RedisClientImpl { #[cfg_attr(test, mockall::automock)] impl RedisClientImpl { - pub async fn connect(redis_connection_str: &str) -> Result { - let connection = redis::Client::open(redis_connection_str)? + pub async fn connect(redis_url: &str) -> Result { + let connection = redis::Client::open(redis_url)? .get_tokio_connection_manager() .await?; diff --git a/block-streamer/src/server/mod.rs b/block-streamer/src/server/mod.rs index f5855cbb3..f72dc3c1d 100644 --- a/block-streamer/src/server/mod.rs +++ b/block-streamer/src/server/mod.rs @@ -5,13 +5,12 @@ pub mod blockstreamer { } pub async fn init( + port: &str, redis_client: std::sync::Arc, delta_lake_client: std::sync::Arc, lake_s3_config: aws_sdk_s3::Config, ) -> anyhow::Result<()> { - let addr = "[::1]:10000" - .parse() - .expect("Failed to parse RPC socket address"); + let addr = format!("[::1]:{}", port).parse()?; tracing::info!("Starting RPC server at {}", addr); diff --git a/coordinator/src/block_streams_handler.rs b/coordinator/src/block_streams_handler.rs index a62e9b20d..93f2cd45f 100644 --- a/coordinator/src/block_streams_handler.rs +++ b/coordinator/src/block_streams_handler.rs @@ -19,8 +19,8 @@ pub struct BlockStreamsHandlerImpl { #[cfg_attr(test, mockall::automock)] impl BlockStreamsHandlerImpl { - pub async fn connect() -> anyhow::Result { - let client = BlockStreamerClient::connect("http://[::1]:10000") + pub async fn connect(block_streamer_url: String) -> anyhow::Result { + let client = BlockStreamerClient::connect(block_streamer_url) .await .context("Unable to connect to Block Streamer")?; diff --git a/coordinator/src/executors_handler.rs b/coordinator/src/executors_handler.rs index 44405c29b..d15a4e28d 100644 --- a/coordinator/src/executors_handler.rs +++ b/coordinator/src/executors_handler.rs @@ -16,8 +16,8 @@ pub struct ExecutorsHandlerImpl { #[cfg_attr(test, mockall::automock)] impl ExecutorsHandlerImpl { - pub async fn connect() -> anyhow::Result { - let client = RunnerClient::connect("http://localhost:50007") + pub async fn connect(runner_url: String) -> anyhow::Result { + let client = RunnerClient::connect(runner_url) .await .context("Unable to connect to Runner")?; diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 67c6c60f4..4192db35c 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -17,10 +17,20 @@ async fn main() -> anyhow::Result<()> { .with(tracing_subscriber::EnvFilter::from_default_env()) .init(); - let registry = Registry::connect("https://rpc.mainnet.near.org"); - let redis_client = RedisClient::connect("redis://127.0.0.1").await?; - let mut block_streams_handler = BlockStreamsHandler::connect().await?; - let mut executors_handler = ExecutorsHandler::connect().await?; + let rpc_url = std::env::var("RPC_URL").expect("RPC_URL is not set"); + let registry_contract_id = std::env::var("REGISTRY_CONTRACT_ID") + .expect("REGISTRY_CONTRACT_ID is not set") + .parse() + .expect("REGISTRY_CONTRACT_ID is not a valid account ID"); + let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set"); + let block_streamer_url = + std::env::var("BLOCK_STREAMER_URL").expect("BLOCK_STREAMER_URL is not set"); + let runner_url = std::env::var("RUNNER_URL").expect("RUNNER_URL is not set"); + + let registry = Registry::connect(registry_contract_id, &rpc_url); + let redis_client = RedisClient::connect(&redis_url).await?; + let mut block_streams_handler = BlockStreamsHandler::connect(block_streamer_url).await?; + let mut executors_handler = ExecutorsHandler::connect(runner_url).await?; loop { let indexer_registry = registry.fetch().await?; diff --git a/coordinator/src/redis.rs b/coordinator/src/redis.rs index 4b37fa42b..daaf8a481 100644 --- a/coordinator/src/redis.rs +++ b/coordinator/src/redis.rs @@ -14,8 +14,8 @@ pub struct RedisClientImpl { #[cfg_attr(test, mockall::automock)] impl RedisClientImpl { - pub async fn connect(redis_connection_str: &str) -> Result { - let connection = redis::Client::open(redis_connection_str)? + pub async fn connect(redis_url: &str) -> Result { + let connection = redis::Client::open(redis_url)? .get_connection_manager() .await?; diff --git a/coordinator/src/registry.rs b/coordinator/src/registry.rs index 8f3205665..7936a8524 100644 --- a/coordinator/src/registry.rs +++ b/coordinator/src/registry.rs @@ -39,14 +39,18 @@ pub use RegistryImpl as Registry; pub struct RegistryImpl { json_rpc_client: JsonRpcClient, + registry_contract_id: AccountId, } #[cfg_attr(test, mockall::automock)] impl RegistryImpl { - pub fn connect(rpc_url: &str) -> Self { + pub fn connect(registry_contract_id: AccountId, rpc_url: &str) -> Self { let json_rpc_client = JsonRpcClient::connect(rpc_url); - Self { json_rpc_client } + Self { + registry_contract_id, + json_rpc_client, + } } fn enrich_indexer_registry( @@ -87,10 +91,7 @@ impl RegistryImpl { block_reference: BlockReference::Finality(Finality::Final), request: QueryRequest::CallFunction { method_name: "list_indexer_functions".to_string(), - account_id: "dev-queryapi.dataplatform.near" - .to_string() - .try_into() - .unwrap(), + account_id: self.registry_contract_id.clone(), args: FunctionArgs::from("{}".as_bytes().to_vec()), }, })