Skip to content

Commit

Permalink
feat: Limit Redis Stream length (#834)
Browse files Browse the repository at this point in the history
This PR introduces back pressure to the Redis Stream in Block Streamer,
ensuring that the stream does not exceed a specified maximum length.
This is achieved by blocking the `redis.publish_block()` call,
intermittently polling the Stream length, and publishing once it falls
below the configured limit.

To aid testing, the current `RedisClient` struct has been split in to
two:
- `RedisCommands` - thin wrapper around redis commands to make mocking
possible.
- `RedisClient` - provides higher-level redis functionality, e.g.
"publishing blocks", utilising the above.

In most cases, `RedisClient` will be used. The split just allows us to
test `RedisWrapper` itself.
  • Loading branch information
morgsmccauley authored Jun 25, 2024
1 parent 5a38775 commit f52dfa3
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 66 deletions.
2 changes: 1 addition & 1 deletion block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ serde_json = "1.0.55"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing-stackdriver = "0.10.0"
tokio = { version = "1.28.0", features = ["full"]}
tokio = { version = "1.28.0", features = ["full", "test-util"]}
tokio-util = "0.7.10"
tokio-stream = "0.1.14"
tonic = "0.10.2"
Expand Down
53 changes: 26 additions & 27 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use registry_types::Rule;
const LAKE_PREFETCH_SIZE: usize = 100;
const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100;
const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"];
const MAX_STREAM_SIZE: u64 = 100;

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
Expand Down Expand Up @@ -45,7 +46,7 @@ impl BlockStream {
pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -74,7 +75,7 @@ impl BlockStream {
result = start_block_stream(
start_block_height,
&indexer_config,
redis_client,
redis,
delta_lake_client,
lake_s3_client,
&chain_id,
Expand Down Expand Up @@ -129,7 +130,7 @@ impl BlockStream {
pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: &IndexerConfig,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
chain_id: &ChainId,
Expand All @@ -145,7 +146,7 @@ pub(crate) async fn start_block_stream(
let last_indexed_delta_lake_block = process_delta_lake_blocks(
start_block_height,
delta_lake_client,
redis_client.clone(),
redis.clone(),
indexer,
redis_stream.clone(),
)
Expand All @@ -156,7 +157,7 @@ pub(crate) async fn start_block_stream(
last_indexed_delta_lake_block,
lake_s3_client,
lake_prefetch_size,
redis_client,
redis,
indexer,
redis_stream,
chain_id,
Expand All @@ -175,7 +176,7 @@ pub(crate) async fn start_block_stream(
async fn process_delta_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
) -> anyhow::Result<u64> {
Expand Down Expand Up @@ -230,10 +231,10 @@ async fn process_delta_lake_blocks(

for block_height in &blocks_from_index {
let block_height = block_height.to_owned();
redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
redis
.publish_block(indexer, redis_stream.clone(), block_height, MAX_STREAM_SIZE)
.await?;
redis_client
redis
.set_last_processed_block(indexer, block_height)
.await?;
}
Expand All @@ -253,7 +254,7 @@ async fn process_near_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
lake_prefetch_size: usize,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
chain_id: &ChainId,
Expand All @@ -278,7 +279,7 @@ async fn process_near_lake_blocks(
let block_height = streamer_message.block.header.height;
last_indexed_block = block_height;

redis_client
redis
.set_last_processed_block(indexer, block_height)
.await?;

Expand All @@ -289,18 +290,14 @@ async fn process_near_lake_blocks(
);

if !matches.is_empty() {
if let Ok(Some(stream_length)) =
redis_client.get_stream_length(redis_stream.clone()).await
{
if let Ok(Some(stream_length)) = redis.get_stream_length(redis_stream.clone()).await {
if stream_length <= MAX_STREAM_SIZE_WITH_CACHE {
redis_client
.cache_streamer_message(&streamer_message)
.await?;
redis.cache_streamer_message(&streamer_message).await?;
}
}

redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
redis
.publish_block(indexer, redis_stream.clone(), block_height, MAX_STREAM_SIZE)
.await?;
}
}
Expand Down Expand Up @@ -355,29 +352,30 @@ mod tests {
.expect_list_matching_block_heights()
.returning(|_, _| Ok(vec![107503702, 107503703]));

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
let mut mock_redis = crate::redis::RedisClient::default();
mock_redis
.expect_publish_block()
.with(
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503702, 107503703, 107503705]),
predicate::always(),
)
.returning(|_, _, _| Ok(()))
.returning(|_, _, _, _| Ok(()))
.times(3);
mock_redis_client
mock_redis
.expect_set_last_processed_block()
.with(
predicate::always(),
predicate::in_iter([107503702, 107503703, 107503704, 107503705]),
)
.returning(|_, _| Ok(()))
.times(4);
mock_redis_client
mock_redis
.expect_cache_streamer_message()
.with(predicate::always())
.returning(|_| Ok(()));
mock_redis_client
mock_redis
.expect_get_stream_length()
.with(predicate::eq("stream key".to_string()))
.returning(|_| Ok(Some(10)));
Expand All @@ -397,7 +395,7 @@ mod tests {
start_block_stream(
91940840,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_redis),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_client,
&ChainId::Mainnet,
Expand Down Expand Up @@ -435,8 +433,9 @@ mod tests {
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503705]),
predicate::always(),
)
.returning(|_, _, _| Ok(()))
.returning(|_, _, _, _| Ok(()))
.times(1);
mock_redis_client
.expect_set_last_processed_block()
Expand Down
4 changes: 2 additions & 2 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
"Starting Block Streamer"
);

let redis_client = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);
let redis = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);

let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::Config::from(&aws_config);
Expand All @@ -58,7 +58,7 @@ async fn main() -> anyhow::Result<()> {

tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server"));

server::init(&grpc_port, redis_client, delta_lake_client, lake_s3_client).await?;
server::init(&grpc_port, redis, delta_lake_client, lake_s3_client).await?;

Ok(())
}
128 changes: 107 additions & 21 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@ use crate::metrics;
use crate::utils;

#[cfg(test)]
pub use MockRedisClientImpl as RedisClient;
use MockRedisCommandsImpl as RedisCommands;
#[cfg(not(test))]
pub use RedisClientImpl as RedisClient;
use RedisCommandsImpl as RedisCommands;

pub struct RedisClientImpl {
struct RedisCommandsImpl {
connection: ConnectionManager,
}

#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {
const STREAMER_MESSAGE_PREFIX: &'static str = "streamer_message:";

impl RedisCommandsImpl {
pub async fn connect(redis_url: &str) -> Result<Self, RedisError> {
let connection = redis::Client::open(redis_url)?
.get_tokio_connection_manager()
Expand Down Expand Up @@ -91,6 +89,26 @@ impl RedisClientImpl {

Ok(())
}
}

#[cfg(test)]
pub use MockRedisClientImpl as RedisClient;
#[cfg(not(test))]
pub use RedisClientImpl as RedisClient;

pub struct RedisClientImpl {
commands: RedisCommands,
}

#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {
const STREAMER_MESSAGE_PREFIX: &'static str = "streamer_message:";

pub async fn connect(redis_url: &str) -> Result<Self, RedisError> {
let commands = RedisCommands::connect(redis_url).await?;

Ok(Self { commands })
}

pub async fn set_last_processed_block(
&self,
Expand All @@ -109,13 +127,14 @@ impl RedisClientImpl {
.context("Failed to convert block height (u64) to metrics type (i64)")?,
);

self.set(indexer_config.last_processed_block_key(), height)
self.commands
.set(indexer_config.last_processed_block_key(), height)
.await
.context("Failed to set last processed block")
}

pub async fn get_stream_length(&self, stream: String) -> anyhow::Result<Option<u64>> {
self.xlen(stream).await
self.commands.xlen(stream).await
}

pub async fn cache_streamer_message(
Expand All @@ -128,30 +147,97 @@ impl RedisClientImpl {

utils::snake_to_camel(&mut streamer_message);

self.set_ex(
format!("{}{}", Self::STREAMER_MESSAGE_PREFIX, height),
serde_json::to_string(&streamer_message)?,
60,
)
.await
.context("Failed to cache streamer message")
self.commands
.set_ex(
format!("{}{}", Self::STREAMER_MESSAGE_PREFIX, height),
serde_json::to_string(&streamer_message)?,
60,
)
.await
.context("Failed to cache streamer message")
}

pub async fn publish_block(
&self,
indexer: &IndexerConfig,
stream: String,
block_height: u64,
max_size: u64,
) -> anyhow::Result<()> {
loop {
let stream_length = self.get_stream_length(stream.clone()).await?;

if stream_length.is_none() {
break;
}

if stream_length.unwrap() < max_size {
break;
}

println!("Waiting for stream to be consumed");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

metrics::PUBLISHED_BLOCKS_COUNT
.with_label_values(&[&indexer.get_full_name()])
.inc();

self.xadd(
stream.clone(),
&[(String::from("block_height"), block_height)],
)
.await
.context("Failed to add block to Redis Stream")
self.commands
.xadd(
stream.clone(),
&[(String::from("block_height"), block_height)],
)
.await
.context("Failed to add block to Redis Stream")
}
}

#[cfg(test)]
mod test {
use super::*;

use mockall::predicate;
use near_lake_framework::near_indexer_primitives;

#[tokio::test]
async fn limits_block_stream_length() {
let mut mock_redis_commands = RedisCommands::default();
mock_redis_commands
.expect_xadd::<String, u64>()
.with(predicate::eq("stream".to_string()), predicate::always())
.returning(|_, _| Ok(()))
.once();
let mut stream_len = 10;
mock_redis_commands
.expect_xlen::<String>()
.with(predicate::eq("stream".to_string()))
.returning(move |_| {
stream_len -= 1;
Ok(Some(stream_len))
});

let redis = RedisClientImpl {
commands: mock_redis_commands,
};

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
"morgs.near".to_string(),
)
.unwrap(),
function_name: "test".to_string(),
rule: registry_types::Rule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: registry_types::Status::Success,
},
};

tokio::time::pause();

redis
.publish_block(&indexer_config, "stream".to_string(), 0, 1)
.await
.unwrap();
}
}
Loading

0 comments on commit f52dfa3

Please sign in to comment.