Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into replace-delta-lake-wi…
Browse files Browse the repository at this point in the history
…th-bitmap
  • Loading branch information
morgsmccauley committed Jul 4, 2024
2 parents 865fae2 + a9c25c3 commit 9c63b91
Show file tree
Hide file tree
Showing 102 changed files with 5,991 additions and 3,711 deletions.
1 change: 1 addition & 0 deletions block-streamer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ graphql_client = { version = "0.14.0", features = ["reqwest"] }
lazy_static = "1.4.0"
mockall = "0.11.4"
near-lake-framework = "0.7.8"
pin-project = "1.1.5"
prometheus = "0.13.3"
prost = "0.12.3"
redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] }
Expand All @@ -29,7 +30,7 @@ serde_json = "1.0.55"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing-stackdriver = "0.10.0"
tokio = { version = "1.28.0", features = ["full"]}
tokio = { version = "1.28.0", features = ["full", "test-util"]}
tokio-util = "0.7.10"
tokio-stream = "0.1.14"
tonic = "0.10.2"
Expand Down
148 changes: 95 additions & 53 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;

use anyhow::Context;
use futures::StreamExt;
Expand All @@ -17,6 +20,36 @@ use crate::rules::types::ChainId;
/// we need this configurable for testing purposes.
const LAKE_PREFETCH_SIZE: usize = 100;
const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100;
const MAX_STREAM_SIZE: u64 = 100;

#[pin_project::pin_project]
pub struct PollCounter<F> {
#[pin]
inner: F,
indexer_name: String,
}

impl<F> PollCounter<F> {
pub fn new(inner: F, indexer_name: String) -> Self {
Self {
inner,
indexer_name,
}
}
}

impl<F: Future> Future for PollCounter<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
metrics::BLOCK_STREAM_UP
.with_label_values(&[&self.indexer_name])
.inc();

let this = self.project();
this.inner.poll(cx)
}
}

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
Expand Down Expand Up @@ -50,7 +83,7 @@ impl BlockStream {
pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis_client: Arc<RedisClient>,
redis: Arc<RedisClient>,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
lake_s3_client: SharedLakeS3Client,
) -> anyhow::Result<()> {
Expand All @@ -59,42 +92,49 @@ impl BlockStream {
}

let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
let redis_stream = self.redis_stream.clone();

let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
tracing::info!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Cancelling block stream task",
);

Ok(())
},
result = start_block_stream(

let handle = tokio::spawn({
let cancellation_token = cancellation_token.clone();
let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
let redis_stream = self.redis_stream.clone();

async move {
let block_stream_future = start_block_stream(
start_block_height,
&indexer_config,
redis_client,
redis,
reciever_blocks_processor,
lake_s3_client,
&chain_id,
LAKE_PREFETCH_SIZE,
redis_stream
) => {
result.map_err(|err| {
tracing::error!(
redis_stream,
);

let block_stream_future =
PollCounter::new(block_stream_future, indexer_config.get_full_name());

tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::info!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Block stream task stopped due to error: {:?}",
err,
"Cancelling block stream task",
);
err
})

Ok(())
},
result = block_stream_future => {
result.map_err(|err| {
tracing::error!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Block stream task stopped due to error: {:?}",
err,
);
err
})
}
}
}
});
Expand All @@ -112,6 +152,10 @@ impl BlockStream {
task.cancellation_token.cancel();
let _ = task.handle.await?;

// Fails if metric doesn't exist, i.e. task was never polled
let _ = metrics::BLOCK_STREAM_UP
.remove_label_values(&[&self.indexer_config.get_full_name()]);

return Ok(());
}

Expand All @@ -134,7 +178,7 @@ impl BlockStream {
pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: &IndexerConfig,
redis_client: Arc<RedisClient>,
redis: Arc<RedisClient>,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
lake_s3_client: SharedLakeS3Client,
chain_id: &ChainId,
Expand All @@ -150,7 +194,7 @@ pub(crate) async fn start_block_stream(
let last_bitmap_indexer_block = process_bitmap_indexer_blocks(
start_block_height,
reciever_blocks_processor,
redis_client.clone(),
redis.clone(),
indexer,
redis_stream.clone(),
)
Expand All @@ -161,7 +205,7 @@ pub(crate) async fn start_block_stream(
last_bitmap_indexer_block,
lake_s3_client,
lake_prefetch_size,
redis_client,
redis,
indexer,
redis_stream,
chain_id,
Expand All @@ -180,7 +224,7 @@ pub(crate) async fn start_block_stream(
async fn process_bitmap_indexer_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
redis_client: Arc<RedisClient>,
redis: Arc<RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
) -> anyhow::Result<u64> {
Expand Down Expand Up @@ -226,10 +270,10 @@ async fn process_bitmap_indexer_blocks(
let mut last_published_block_height: u64 = start_block_height;

while let Some(Ok(block_height)) = matching_block_heights.next().await {
redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
redis
.publish_block(indexer, redis_stream.clone(), block_height, MAX_STREAM_SIZE)
.await?;
redis_client
redis
.set_last_processed_block(indexer, block_height)
.await?;

Expand All @@ -243,7 +287,7 @@ async fn process_near_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
lake_s3_client: SharedLakeS3Client,
lake_prefetch_size: usize,
redis_client: Arc<RedisClient>,
redis: Arc<RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
chain_id: &ChainId,
Expand All @@ -268,7 +312,7 @@ async fn process_near_lake_blocks(
let block_height = streamer_message.block.header.height;
last_indexed_block = block_height;

redis_client
redis
.set_last_processed_block(indexer, block_height)
.await?;

Expand All @@ -279,18 +323,14 @@ async fn process_near_lake_blocks(
);

if !matches.is_empty() {
if let Ok(Some(stream_length)) =
redis_client.get_stream_length(redis_stream.clone()).await
{
if let Ok(Some(stream_length)) = redis.get_stream_length(redis_stream.clone()).await {
if stream_length <= MAX_STREAM_SIZE_WITH_CACHE {
redis_client
.cache_streamer_message(&streamer_message)
.await?;
redis.cache_streamer_message(&streamer_message).await?;
}
}

redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
redis
.publish_block(indexer, redis_stream.clone(), block_height, MAX_STREAM_SIZE)
.await?;
}
}
Expand Down Expand Up @@ -368,29 +408,30 @@ mod tests {
let mock_reciever_blocks_processor =
ReceiverBlocksProcessor::new(mock_graphql_client, mock_s3_client);

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
let mut mock_redis = crate::redis::RedisClient::default();
mock_redis
.expect_publish_block()
.with(
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503702, 107503703, 107503705]),
predicate::always(),
)
.returning(|_, _, _| Ok(()))
.returning(|_, _, _, _| Ok(()))
.times(3);
mock_redis_client
mock_redis
.expect_set_last_processed_block()
.with(
predicate::always(),
predicate::in_iter([107503702, 107503703, 107503704, 107503705]),
)
.returning(|_, _| Ok(()))
.times(4);
mock_redis_client
mock_redis
.expect_cache_streamer_message()
.with(predicate::always())
.returning(|_| Ok(()));
mock_redis_client
mock_redis
.expect_get_stream_length()
.with(predicate::eq("stream key".to_string()))
.returning(|_| Ok(Some(10)));
Expand All @@ -410,7 +451,7 @@ mod tests {
start_block_stream(
91940840,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_redis),
std::sync::Arc::new(mock_reciever_blocks_processor),
mock_lake_s3_client,
&ChainId::Mainnet,
Expand Down Expand Up @@ -470,8 +511,9 @@ mod tests {
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503705]),
predicate::always(),
)
.returning(|_, _, _| Ok(()))
.returning(|_, _, _, _| Ok(()))
.times(1);
mock_redis_client
.expect_set_last_processed_block()
Expand Down
10 changes: 2 additions & 8 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn main() -> anyhow::Result<()> {
"Starting Block Streamer"
);

let redis_client = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);
let redis = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);

let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::Config::from(&aws_config);
Expand All @@ -61,13 +61,7 @@ async fn main() -> anyhow::Result<()> {

tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server"));

server::init(
&grpc_port,
redis_client,
receiver_blocks_processor,
lake_s3_client,
)
.await?;
server::init(&grpc_port, redis, receiver_blocks_processor, lake_s3_client).await?;

Ok(())
}
6 changes: 6 additions & 0 deletions block-streamer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ lazy_static! {
&["level"]
)
.unwrap();
pub static ref BLOCK_STREAM_UP: IntCounterVec = register_int_counter_vec!(
"queryapi_block_streamer_block_stream_up",
"A continuously increasing counter to indicate the block stream is up",
&["indexer"]
)
.unwrap();
}

pub struct LogCounter;
Expand Down
Loading

0 comments on commit 9c63b91

Please sign in to comment.