Skip to content

Commit

Permalink
refactor: Update redis struct names
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jun 25, 2024
1 parent 6a75362 commit 9bd813f
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 34 deletions.
18 changes: 9 additions & 9 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl BlockStream {
pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis: std::sync::Arc<crate::redis::RedisWrapper>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -130,7 +130,7 @@ impl BlockStream {
pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: &IndexerConfig,
redis: std::sync::Arc<crate::redis::RedisWrapper>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
chain_id: &ChainId,
Expand Down Expand Up @@ -176,7 +176,7 @@ pub(crate) async fn start_block_stream(
async fn process_delta_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
redis: std::sync::Arc<crate::redis::RedisWrapper>,
redis: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
) -> anyhow::Result<u64> {
Expand Down Expand Up @@ -254,7 +254,7 @@ async fn process_near_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
lake_prefetch_size: usize,
redis: std::sync::Arc<crate::redis::RedisWrapper>,
redis: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
chain_id: &ChainId,
Expand Down Expand Up @@ -352,7 +352,7 @@ mod tests {
.expect_list_matching_block_heights()
.returning(|_, _| Ok(vec![107503702, 107503703]));

let mut mock_redis_wrapper = crate::redis::RedisWrapper::default();
let mut mock_redis_wrapper = crate::redis::RedisClient::default();
mock_redis_wrapper
.expect_publish_block()
.with(
Expand Down Expand Up @@ -426,7 +426,7 @@ mod tests {
})
});

let mut mock_redis_client = crate::redis::RedisWrapper::default();
let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_publish_block()
.with(
Expand Down Expand Up @@ -498,7 +498,7 @@ mod tests {
.expect_list_matching_block_heights()
.never();

let mut mock_redis_client = crate::redis::RedisWrapper::default();
let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client.expect_publish_block().never();
mock_redis_client.expect_set_last_processed_block().never();

Expand Down Expand Up @@ -543,7 +543,7 @@ mod tests {
.expect_list_matching_block_heights()
.never();

let mut mock_redis_client = crate::redis::RedisWrapper::default();
let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client.expect_publish_block().never();
mock_redis_client.expect_set_last_processed_block().never();

Expand Down Expand Up @@ -588,7 +588,7 @@ mod tests {
.expect_list_matching_block_heights()
.never();

let mut mock_redis_client = crate::redis::RedisWrapper::default();
let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client.expect_publish_block().never();
mock_redis_client.expect_set_last_processed_block().never();

Expand Down
2 changes: 1 addition & 1 deletion block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
"Starting Block Streamer"
);

let redis = std::sync::Arc::new(redis::RedisWrapper::connect(&redis_url).await?);
let redis = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);

let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::Config::from(&aws_config);
Expand Down
40 changes: 20 additions & 20 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ use crate::metrics;
use crate::utils;

#[cfg(test)]
pub use MockRedisClientImpl as RedisClient;
use MockRedisCommandsImpl as RedisCommands;
#[cfg(not(test))]
pub use RedisClientImpl as RedisClient;
use RedisCommandsImpl as RedisCommands;

pub struct RedisClientImpl {
struct RedisCommandsImpl {
connection: ConnectionManager,
}

#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {
impl RedisCommandsImpl {
pub async fn connect(redis_url: &str) -> Result<Self, RedisError> {
let connection = redis::Client::open(redis_url)?
.get_tokio_connection_manager()
Expand Down Expand Up @@ -92,22 +92,22 @@ impl RedisClientImpl {
}

#[cfg(test)]
pub use MockRedisWrapperImpl as RedisWrapper;
pub use MockRedisClientImpl as RedisClient;
#[cfg(not(test))]
pub use RedisWrapperImpl as RedisWrapper;
pub use RedisClientImpl as RedisClient;

pub struct RedisWrapperImpl {
client: RedisClient,
pub struct RedisClientImpl {
commands: RedisCommands,
}

#[cfg_attr(test, mockall::automock)]
impl RedisWrapperImpl {
impl RedisClientImpl {
const STREAMER_MESSAGE_PREFIX: &'static str = "streamer_message:";

pub async fn connect(redis_url: &str) -> Result<Self, RedisError> {
let client = RedisClient::connect(redis_url).await?;
let commands = RedisCommands::connect(redis_url).await?;

Ok(Self { client })
Ok(Self { commands })
}

pub async fn set_last_processed_block(
Expand All @@ -127,14 +127,14 @@ impl RedisWrapperImpl {
.context("Failed to convert block height (u64) to metrics type (i64)")?,
);

self.client
self.commands
.set(indexer_config.last_processed_block_key(), height)
.await
.context("Failed to set last processed block")
}

pub async fn get_stream_length(&self, stream: String) -> anyhow::Result<Option<u64>> {
self.client.xlen(stream).await
self.commands.xlen(stream).await
}

pub async fn cache_streamer_message(
Expand All @@ -147,7 +147,7 @@ impl RedisWrapperImpl {

utils::snake_to_camel(&mut streamer_message);

self.client
self.commands
.set_ex(
format!("{}{}", Self::STREAMER_MESSAGE_PREFIX, height),
serde_json::to_string(&streamer_message)?,
Expand Down Expand Up @@ -183,7 +183,7 @@ impl RedisWrapperImpl {
.with_label_values(&[&indexer.get_full_name()])
.inc();

self.client
self.commands
.xadd(
stream.clone(),
&[(String::from("block_height"), block_height)],
Expand All @@ -202,23 +202,23 @@ mod test {

#[tokio::test]
async fn limits_block_stream_length() {
let mut mock_redis_client = RedisClient::default();
mock_redis_client
let mut mock_redis_commands = RedisCommands::default();
mock_redis_commands
.expect_xadd::<String, u64>()
.with(predicate::eq("stream".to_string()), predicate::always())
.returning(|_, _| Ok(()))
.once();
let mut stream_len = 10;
mock_redis_client
mock_redis_commands
.expect_xlen::<String>()
.with(predicate::eq("stream".to_string()))
.returning(move |_| {
stream_len -= 1;
Ok(Some(stream_len))
});

let redis = RedisWrapperImpl {
client: mock_redis_client,
let redis = RedisClientImpl {
commands: mock_redis_commands,
};

let indexer_config = crate::indexer_config::IndexerConfig {
Expand Down
6 changes: 3 additions & 3 deletions block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::server::blockstreamer;
use blockstreamer::*;

pub struct BlockStreamerService {
redis: std::sync::Arc<crate::redis::RedisWrapper>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
chain_id: ChainId,
Expand All @@ -22,7 +22,7 @@ pub struct BlockStreamerService {

impl BlockStreamerService {
pub fn new(
redis: std::sync::Arc<crate::redis::RedisWrapper>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
) -> Self {
Expand Down Expand Up @@ -206,7 +206,7 @@ mod tests {
.expect_list_matching_block_heights()
.returning(|_, _| Ok(vec![]));

let mock_redis_wrapper = crate::redis::RedisWrapper::default();
let mock_redis_wrapper = crate::redis::RedisClient::default();

let mut mock_lake_s3_client = crate::lake_s3_client::SharedLakeS3Client::default();
mock_lake_s3_client
Expand Down
2 changes: 1 addition & 1 deletion block-streamer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod blockstreamer {

pub async fn init(
port: &str,
redis: std::sync::Arc<crate::redis::RedisWrapper>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
) -> anyhow::Result<()> {
Expand Down

0 comments on commit 9bd813f

Please sign in to comment.