Skip to content

Commit

Permalink
fix: Cap Block Streamer Caching and Merge Redis Impl (#599)
Browse files Browse the repository at this point in the history
Redis memory usage is pegged at 100% due to Block Streamer backfilling *
contract filter indexers.

In addition, Redis implementation of Coordinator and Block Streamer have
diverged. Coordinator is also not able to make successful calls to
Redis. While not a targeted fix, the hope is that merging the
implementation fixes the issue as Block Streamer is able to make calls
to Redis.

Before release, some manual intervention is necessary in dev and prod to
update versions of indexers to the current, to prevent unwanted behavior
from coordinator. I will do dev first, release the change, and then do
prod before a prod release.
  • Loading branch information
darunrs authored Mar 8, 2024
1 parent 6c5b6fb commit 6038fe6
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 161 deletions.
87 changes: 84 additions & 3 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use registry_types::Rule;
/// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but
/// we need this configurable for testing purposes.
const LAKE_PREFETCH_SIZE: usize = 100;
const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100;
const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"];

pub struct Task {
Expand Down Expand Up @@ -285,9 +286,15 @@ async fn process_near_lake_blocks(
);

if !matches.is_empty() {
redis_client
.cache_streamer_message(&streamer_message)
.await?;
if let Ok(Some(stream_length)) =
redis_client.get_stream_length(redis_stream.clone()).await
{
if stream_length <= MAX_STREAM_SIZE_WITH_CACHE {
redis_client
.cache_streamer_message(&streamer_message)
.await?;
}
}

redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
Expand Down Expand Up @@ -346,6 +353,10 @@ mod tests {
.expect_cache_streamer_message()
.with(predicate::always())
.returning(|_| Ok(()));
mock_redis_client
.expect_get_stream_length()
.with(predicate::eq("stream key".to_string()))
.returning(|_| Ok(Some(10)));

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
Expand Down Expand Up @@ -375,6 +386,76 @@ mod tests {
.unwrap();
}

#[tokio::test]
async fn skips_caching_of_lake_block_over_stream_size_limit() {
let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default();
mock_delta_lake_client
.expect_get_latest_block_metadata()
.returning(|| {
Ok(crate::delta_lake_client::LatestBlockMetadata {
last_indexed_block: "107503700".to_string(),
processed_at_utc: "".to_string(),
first_indexed_block: "".to_string(),
last_indexed_block_date: "".to_string(),
first_indexed_block_date: "".to_string(),
})
});

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
.expect_publish_block()
.with(
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503705]),
)
.returning(|_, _, _| Ok(()))
.times(1);
mock_redis_client
.expect_set_last_processed_block()
.with(
predicate::always(),
predicate::in_iter([107503704, 107503705]),
)
.returning(|_, _| Ok(()))
.times(2);
mock_redis_client
.expect_cache_streamer_message()
.with(predicate::always())
.never();
mock_redis_client
.expect_get_stream_length()
.with(predicate::eq("stream key".to_string()))
.returning(|_| Ok(Some(200)));

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
"morgs.near".to_string(),
)
.unwrap(),
function_name: "test".to_string(),
rule: registry_types::Rule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: registry_types::Status::Success,
},
};

let lake_s3_config = crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

start_block_stream(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
lake_s3_config,
&ChainId::Mainnet,
1,
"stream key".to_string(),
)
.await
.unwrap();
}

#[tokio::test]
async fn skips_delta_lake_for_star_filter() {
let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default();
Expand Down
21 changes: 21 additions & 0 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@ impl RedisClientImpl {
Ok(())
}

pub async fn xlen<T>(&self, stream_key: T) -> anyhow::Result<Option<u64>>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("XLEN: {:?}", stream_key);

let mut cmd = redis::cmd("XLEN");
cmd.arg(&stream_key);

let stream_length = cmd
.query_async(&mut self.connection.clone())
.await
.context(format!("XLEN {stream_key:?}"))?;

Ok(stream_length)
}

pub async fn set<T, U>(&self, key: T, value: U) -> Result<(), RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
Expand Down Expand Up @@ -97,6 +114,10 @@ impl RedisClientImpl {
.context("Failed to set last processed block")
}

pub async fn get_stream_length(&self, stream: String) -> anyhow::Result<Option<u64>> {
self.xlen(stream).await
}

pub async fn cache_streamer_message(
&self,
streamer_message: &near_lake_framework::near_indexer_primitives::StreamerMessage,
Expand Down
7 changes: 5 additions & 2 deletions coordinator/src/block_streams/synchronise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ async fn synchronise_block_stream(
let start_block_height =
determine_start_block_height(&stream_status, indexer_config, redis_client).await?;

tracing::info!(
"Starting new block stream starting at block {}",
start_block_height
);

block_streams_handler
.start(start_block_height, indexer_config)
.await?;
Expand Down Expand Up @@ -164,8 +169,6 @@ async fn determine_start_block_height(
return get_continuation_block_height(indexer_config, redis_client).await;
}

tracing::info!(start_block = ?indexer_config.start_block, "Stating new block stream");

match indexer_config.start_block {
StartBlock::Latest => Ok(indexer_config.get_registry_version()),
StartBlock::Height(height) => Ok(height),
Expand Down
8 changes: 0 additions & 8 deletions coordinator/src/indexer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@ impl IndexerConfig {
format!("{}:block_stream", self.get_full_name())
}

pub fn get_historical_redis_stream_key(&self) -> String {
format!("{}:historical:stream", self.get_full_name())
}

pub fn get_real_time_redis_stream_key(&self) -> String {
format!("{}:real_time:stream", self.get_full_name())
}

pub fn get_last_published_block_key(&self) -> String {
format!("{}:last_published_block", self.get_full_name())
}
Expand Down
163 changes: 15 additions & 148 deletions coordinator/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
use std::fmt::Debug;

use anyhow::Context;
use redis::{
aio::ConnectionManager, streams, AsyncCommands, FromRedisValue, RedisResult, ToRedisArgs,
};
use redis::{aio::ConnectionManager, AsyncCommands, FromRedisValue, ToRedisArgs};

use crate::indexer_config::IndexerConfig;

Expand All @@ -21,9 +19,6 @@ pub struct RedisClientImpl {

#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {
pub const STREAMS_SET: &str = "streams";
pub const ALLOWLIST: &str = "allowlist";

pub async fn connect(redis_url: &str) -> anyhow::Result<Self> {
let connection = redis::Client::open(redis_url)?
.get_connection_manager()
Expand All @@ -41,171 +36,43 @@ impl RedisClientImpl {
T: ToRedisArgs + Debug + Send + Sync + 'static,
U: FromRedisValue + Debug + 'static,
{
let value: Option<U> = self
.connection
.clone()
.get(&key)
let mut cmd = redis::cmd("GET");
cmd.arg(&key);
let value = cmd
.query_async(&mut self.connection.clone())
.await
.context(format!("GET: {key:?}"))?;

tracing::debug!("GET: {:?}={:?}", key, value);
tracing::debug!("GET: {:?}={:?}", key, &value);

Ok(value)
}
pub async fn set<K, V>(&self, key: K, value: V) -> anyhow::Result<()>
where
K: ToRedisArgs + Debug + Send + Sync + 'static,
V: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("SET: {key:?} {value:?}");

self.connection
.clone()
.set(&key, &value)
.await
.context(format!("SET: {key:?} {value:?}"))?;

Ok(())
}

pub async fn rename<K, V>(&self, old_key: K, new_key: V) -> anyhow::Result<()>
pub async fn set<K, V>(&self, key: K, value: V) -> anyhow::Result<()>
where
K: ToRedisArgs + Debug + Send + Sync + 'static,
V: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("RENAME: {:?} -> {:?}", old_key, new_key);
tracing::debug!("SET: {:?}, {:?}", key, value);

self.connection
.clone()
.rename(&old_key, &new_key)
.await
.context(format!("RENAME: {old_key:?} {new_key:?}"))?;
let mut cmd = redis::cmd("SET");
cmd.arg(key).arg(value);
cmd.query_async(&mut self.connection.clone()).await?;

Ok(())
}

pub async fn srem<T, U>(&self, key: T, value: U) -> anyhow::Result<Option<()>>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
U: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("SREM: {:?}={:?}", key, value);

match self.connection.clone().srem(&key, &value).await {
Ok(1) => Ok(Some(())),
Ok(_) => Ok(None),
Err(e) => Err(anyhow::format_err!(e)),
}
.context(format!("SREM: {key:?} {value:?}"))
}

pub async fn xread<K, V>(
&self,
key: K,
start_id: V,
count: usize,
) -> anyhow::Result<Vec<streams::StreamId>>
where
K: ToRedisArgs + Debug + Send + Sync + 'static,
V: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("XREAD: {:?} {:?} {:?}", key, start_id, count);

let mut results: streams::StreamReadReply = self
.connection
.clone()
.xread_options(
&[&key],
&[&start_id],
&streams::StreamReadOptions::default().count(count),
)
.await
.context(format!("XREAD {key:?} {start_id:?} {count:?}"))?;

if results.keys.is_empty() {
return Ok([].to_vec());
}

Ok(results.keys.remove(0).ids)
}

pub async fn xadd<K, U>(&self, key: K, fields: &[(String, U)]) -> anyhow::Result<()>
where
K: ToRedisArgs + Debug + Send + Sync + 'static,
U: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("XADD: {:?} {:?} {:?}", key, "*", fields);

self.connection
.clone()
.xadd(&key, "*", fields)
.await
.context(format!("XADD {key:?} {fields:?}"))?;

Ok(())
}

pub async fn xdel<K, I>(&self, key: K, id: I) -> anyhow::Result<()>
where
K: ToRedisArgs + Debug + Send + Sync + 'static,
I: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("XDEL: {:?} {:?}", key, id);

self.connection
.clone()
.xdel(&key, &[&id])
.await
.context(format!("XDEL {key:?} {id:?}"))?;

Ok(())
}

pub async fn exists<K>(&self, key: K) -> anyhow::Result<bool>
where
K: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("EXISTS {key:?}");

self.connection
.clone()
.exists(&key)
.await
.map_err(|e| anyhow::format_err!(e))
.context(format!("EXISTS {key:?}"))
}

pub async fn del<K>(&self, key: K) -> anyhow::Result<()>
where
K: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("DEL {key:?}");

self.connection
.clone()
.del(&key)
let mut cmd = redis::cmd("DEL");
cmd.arg(&key);
cmd.query_async(&mut self.connection.clone())
.await
.map_err(|e| anyhow::format_err!(e))
.context(format!("DEL {key:?}"))
}

// `redis::transaction`s currently don't work with async connections, so we have to create a _new_
// blocking connection to atmoically update a value.
pub fn atomic_update<K, O, N, F>(&self, key: K, update_fn: F) -> anyhow::Result<()>
where
K: ToRedisArgs + Copy + 'static,
O: FromRedisValue + 'static,
N: ToRedisArgs + 'static,
F: Fn(O) -> RedisResult<N> + 'static,
{
let mut conn = redis::Client::open(self.url.clone())?.get_connection()?;

redis::transaction(&mut conn, &[key], |conn, pipe| {
let old_value = redis::cmd("GET").arg(key).query(conn)?;
let new_value = update_fn(old_value)?;

pipe.cmd("SET").arg(key).arg(new_value).query(conn)
})?;
.context(format!("DEL {key:?}"))?;

Ok(())
}
Expand Down

0 comments on commit 6038fe6

Please sign in to comment.