Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Limit Redis Stream length #834

Merged
merged 4 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ serde_json = "1.0.55"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing-stackdriver = "0.10.0"
tokio = { version = "1.28.0", features = ["full"]}
tokio = { version = "1.28.0", features = ["full", "test-util"]}
tokio-util = "0.7.10"
tokio-stream = "0.1.14"
tonic = "0.10.2"
Expand Down
53 changes: 26 additions & 27 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use registry_types::Rule;
const LAKE_PREFETCH_SIZE: usize = 100;
const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100;
const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"];
const MAX_STREAM_SIZE: u64 = 100;

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
Expand Down Expand Up @@ -45,7 +46,7 @@ impl BlockStream {
pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -74,7 +75,7 @@ impl BlockStream {
result = start_block_stream(
start_block_height,
&indexer_config,
redis_client,
redis,
delta_lake_client,
lake_s3_client,
&chain_id,
Expand Down Expand Up @@ -129,7 +130,7 @@ impl BlockStream {
pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: &IndexerConfig,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
chain_id: &ChainId,
Expand All @@ -145,7 +146,7 @@ pub(crate) async fn start_block_stream(
let last_indexed_delta_lake_block = process_delta_lake_blocks(
start_block_height,
delta_lake_client,
redis_client.clone(),
redis.clone(),
indexer,
redis_stream.clone(),
)
Expand All @@ -156,7 +157,7 @@ pub(crate) async fn start_block_stream(
last_indexed_delta_lake_block,
lake_s3_client,
lake_prefetch_size,
redis_client,
redis,
indexer,
redis_stream,
chain_id,
Expand All @@ -175,7 +176,7 @@ pub(crate) async fn start_block_stream(
async fn process_delta_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
) -> anyhow::Result<u64> {
Expand Down Expand Up @@ -230,10 +231,10 @@ async fn process_delta_lake_blocks(

for block_height in &blocks_from_index {
let block_height = block_height.to_owned();
redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
redis
.publish_block(indexer, redis_stream.clone(), block_height, MAX_STREAM_SIZE)
.await?;
redis_client
redis
.set_last_processed_block(indexer, block_height)
.await?;
}
Expand All @@ -253,7 +254,7 @@ async fn process_near_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
lake_prefetch_size: usize,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
chain_id: &ChainId,
Expand All @@ -278,7 +279,7 @@ async fn process_near_lake_blocks(
let block_height = streamer_message.block.header.height;
last_indexed_block = block_height;

redis_client
redis
.set_last_processed_block(indexer, block_height)
.await?;

Expand All @@ -289,18 +290,14 @@ async fn process_near_lake_blocks(
);

if !matches.is_empty() {
if let Ok(Some(stream_length)) =
redis_client.get_stream_length(redis_stream.clone()).await
{
if let Ok(Some(stream_length)) = redis.get_stream_length(redis_stream.clone()).await {
if stream_length <= MAX_STREAM_SIZE_WITH_CACHE {
redis_client
.cache_streamer_message(&streamer_message)
.await?;
redis.cache_streamer_message(&streamer_message).await?;
}
}

redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
redis
.publish_block(indexer, redis_stream.clone(), block_height, MAX_STREAM_SIZE)
.await?;
}
}
Expand Down Expand Up @@ -355,29 +352,30 @@ mod tests {
.expect_list_matching_block_heights()
.returning(|_, _| Ok(vec![107503702, 107503703]));

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
let mut mock_redis = crate::redis::RedisClient::default();
mock_redis
.expect_publish_block()
.with(
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503702, 107503703, 107503705]),
predicate::always(),
)
.returning(|_, _, _| Ok(()))
.returning(|_, _, _, _| Ok(()))
.times(3);
mock_redis_client
mock_redis
.expect_set_last_processed_block()
.with(
predicate::always(),
predicate::in_iter([107503702, 107503703, 107503704, 107503705]),
)
.returning(|_, _| Ok(()))
.times(4);
mock_redis_client
mock_redis
.expect_cache_streamer_message()
.with(predicate::always())
.returning(|_| Ok(()));
mock_redis_client
mock_redis
.expect_get_stream_length()
.with(predicate::eq("stream key".to_string()))
.returning(|_| Ok(Some(10)));
Expand All @@ -397,7 +395,7 @@ mod tests {
start_block_stream(
91940840,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_redis),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_client,
&ChainId::Mainnet,
Expand Down Expand Up @@ -435,8 +433,9 @@ mod tests {
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503705]),
predicate::always(),
)
.returning(|_, _, _| Ok(()))
.returning(|_, _, _, _| Ok(()))
.times(1);
mock_redis_client
.expect_set_last_processed_block()
Expand Down
4 changes: 2 additions & 2 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
"Starting Block Streamer"
);

let redis_client = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);
let redis = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);

let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::Config::from(&aws_config);
Expand All @@ -58,7 +58,7 @@ async fn main() -> anyhow::Result<()> {

tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server"));

server::init(&grpc_port, redis_client, delta_lake_client, lake_s3_client).await?;
server::init(&grpc_port, redis, delta_lake_client, lake_s3_client).await?;

Ok(())
}
128 changes: 107 additions & 21 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@ use crate::metrics;
use crate::utils;

#[cfg(test)]
pub use MockRedisClientImpl as RedisClient;
use MockRedisCommandsImpl as RedisCommands;
#[cfg(not(test))]
pub use RedisClientImpl as RedisClient;
use RedisCommandsImpl as RedisCommands;

pub struct RedisClientImpl {
struct RedisCommandsImpl {
connection: ConnectionManager,
}

#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {
const STREAMER_MESSAGE_PREFIX: &'static str = "streamer_message:";

impl RedisCommandsImpl {
pub async fn connect(redis_url: &str) -> Result<Self, RedisError> {
let connection = redis::Client::open(redis_url)?
.get_tokio_connection_manager()
Expand Down Expand Up @@ -91,6 +89,26 @@ impl RedisClientImpl {

Ok(())
}
}

#[cfg(test)]
pub use MockRedisClientImpl as RedisClient;
#[cfg(not(test))]
pub use RedisClientImpl as RedisClient;

pub struct RedisClientImpl {
commands: RedisCommands,
}

#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {
const STREAMER_MESSAGE_PREFIX: &'static str = "streamer_message:";

pub async fn connect(redis_url: &str) -> Result<Self, RedisError> {
let commands = RedisCommands::connect(redis_url).await?;

Ok(Self { commands })
}

pub async fn set_last_processed_block(
&self,
Expand All @@ -109,13 +127,14 @@ impl RedisClientImpl {
.context("Failed to convert block height (u64) to metrics type (i64)")?,
);

self.set(indexer_config.last_processed_block_key(), height)
self.commands
.set(indexer_config.last_processed_block_key(), height)
.await
.context("Failed to set last processed block")
}

pub async fn get_stream_length(&self, stream: String) -> anyhow::Result<Option<u64>> {
self.xlen(stream).await
self.commands.xlen(stream).await
}

pub async fn cache_streamer_message(
Expand All @@ -128,30 +147,97 @@ impl RedisClientImpl {

utils::snake_to_camel(&mut streamer_message);

self.set_ex(
format!("{}{}", Self::STREAMER_MESSAGE_PREFIX, height),
serde_json::to_string(&streamer_message)?,
60,
)
.await
.context("Failed to cache streamer message")
self.commands
.set_ex(
format!("{}{}", Self::STREAMER_MESSAGE_PREFIX, height),
serde_json::to_string(&streamer_message)?,
60,
)
.await
.context("Failed to cache streamer message")
}

pub async fn publish_block(
&self,
indexer: &IndexerConfig,
stream: String,
block_height: u64,
max_size: u64,
) -> anyhow::Result<()> {
loop {
let stream_length = self.get_stream_length(stream.clone()).await?;

if stream_length.is_none() {
break;
}

if stream_length.unwrap() < max_size {
break;
}

println!("Waiting for stream to be consumed");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

metrics::PUBLISHED_BLOCKS_COUNT
.with_label_values(&[&indexer.get_full_name()])
.inc();

self.xadd(
stream.clone(),
&[(String::from("block_height"), block_height)],
)
.await
.context("Failed to add block to Redis Stream")
self.commands
.xadd(
stream.clone(),
&[(String::from("block_height"), block_height)],
)
.await
.context("Failed to add block to Redis Stream")
}
}

#[cfg(test)]
mod test {
use super::*;

use mockall::predicate;
use near_lake_framework::near_indexer_primitives;

#[tokio::test]
async fn limits_block_stream_length() {
let mut mock_redis_commands = RedisCommands::default();
mock_redis_commands
.expect_xadd::<String, u64>()
.with(predicate::eq("stream".to_string()), predicate::always())
.returning(|_, _| Ok(()))
.once();
let mut stream_len = 10;
mock_redis_commands
.expect_xlen::<String>()
.with(predicate::eq("stream".to_string()))
.returning(move |_| {
stream_len -= 1;
Ok(Some(stream_len))
});

let redis = RedisClientImpl {
commands: mock_redis_commands,
};

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
"morgs.near".to_string(),
)
.unwrap(),
function_name: "test".to_string(),
rule: registry_types::Rule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: registry_types::Status::Success,
},
};

tokio::time::pause();

redis
.publish_block(&indexer_config, "stream".to_string(), 0, 1)
.await
.unwrap();
}
}
Loading
Loading