diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index 38560737b..39d114a55 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -18,9 +18,12 @@ async fn main() -> anyhow::Result<()> { .with(tracing_subscriber::EnvFilter::from_default_env()) .init(); + let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set"); + let server_port = std::env::var("SERVER_PORT").expect("SERVER_PORT is not set"); + tracing::info!("Starting Block Streamer Service..."); - let redis_client = std::sync::Arc::new(redis::RedisClient::connect("redis://127.0.0.1").await?); + let redis_client = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?); let aws_config = aws_config::from_env().load().await; let s3_config = aws_sdk_s3::Config::from(&aws_config); @@ -29,7 +32,7 @@ async fn main() -> anyhow::Result<()> { let delta_lake_client = std::sync::Arc::new(crate::delta_lake_client::DeltaLakeClient::new(s3_client)); - server::init(redis_client, delta_lake_client, s3_config).await?; + server::init(&server_port, redis_client, delta_lake_client, s3_config).await?; Ok(()) } diff --git a/block-streamer/src/redis.rs b/block-streamer/src/redis.rs index 583742bbc..dfaee7d78 100644 --- a/block-streamer/src/redis.rs +++ b/block-streamer/src/redis.rs @@ -13,8 +13,8 @@ pub struct RedisClientImpl { #[cfg_attr(test, mockall::automock)] impl RedisClientImpl { - pub async fn connect(redis_connection_str: &str) -> Result { - let connection = redis::Client::open(redis_connection_str)? + pub async fn connect(redis_url: &str) -> Result { + let connection = redis::Client::open(redis_url)? .get_tokio_connection_manager() .await?; diff --git a/block-streamer/src/server/mod.rs b/block-streamer/src/server/mod.rs index f5855cbb3..f72dc3c1d 100644 --- a/block-streamer/src/server/mod.rs +++ b/block-streamer/src/server/mod.rs @@ -5,13 +5,12 @@ pub mod blockstreamer { } pub async fn init( + port: &str, redis_client: std::sync::Arc, delta_lake_client: std::sync::Arc, lake_s3_config: aws_sdk_s3::Config, ) -> anyhow::Result<()> { - let addr = "[::1]:10000" - .parse() - .expect("Failed to parse RPC socket address"); + let addr = format!("[::1]:{}", port).parse()?; tracing::info!("Starting RPC server at {}", addr);