Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prod Release 03/07/24 #851

Merged
merged 10 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions block-streamer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ graphql_client = { version = "0.14.0", features = ["reqwest"] }
lazy_static = "1.4.0"
mockall = "0.11.4"
near-lake-framework = "0.7.8"
pin-project = "1.1.5"
prometheus = "0.13.3"
prost = "0.12.3"
redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] }
Expand All @@ -29,7 +30,7 @@ serde_json = "1.0.55"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing-stackdriver = "0.10.0"
tokio = { version = "1.28.0", features = ["full"]}
tokio = { version = "1.28.0", features = ["full", "test-util"]}
tokio-util = "0.7.10"
tokio-stream = "0.1.14"
tonic = "0.10.2"
Expand Down
149 changes: 96 additions & 53 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;

use anyhow::Context;
use near_lake_framework::near_indexer_primitives;
use tokio::task::JoinHandle;
Expand All @@ -12,6 +16,36 @@ use registry_types::Rule;
const LAKE_PREFETCH_SIZE: usize = 100;
const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100;
const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"];
const MAX_STREAM_SIZE: u64 = 100;

#[pin_project::pin_project]
pub struct PollCounter<F> {
#[pin]
inner: F,
indexer_name: String,
}

impl<F> PollCounter<F> {
pub fn new(inner: F, indexer_name: String) -> Self {
Self {
inner,
indexer_name,
}
}
}

impl<F: Future> Future for PollCounter<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
metrics::BLOCK_STREAM_UP
.with_label_values(&[&self.indexer_name])
.inc();

let this = self.project();
this.inner.poll(cx)
}
}

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
Expand Down Expand Up @@ -45,7 +79,7 @@ impl BlockStream {
pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
) -> anyhow::Result<()> {
Expand All @@ -54,42 +88,49 @@ impl BlockStream {
}

let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
let redis_stream = self.redis_stream.clone();

let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
tracing::info!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Cancelling block stream task",
);

Ok(())
},
result = start_block_stream(

let handle = tokio::spawn({
let cancellation_token = cancellation_token.clone();
let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
let redis_stream = self.redis_stream.clone();

async move {
let block_stream_future = start_block_stream(
start_block_height,
&indexer_config,
redis_client,
redis,
delta_lake_client,
lake_s3_client,
&chain_id,
LAKE_PREFETCH_SIZE,
redis_stream
) => {
result.map_err(|err| {
tracing::error!(
redis_stream,
);

let block_stream_future =
PollCounter::new(block_stream_future, indexer_config.get_full_name());

tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::info!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Block stream task stopped due to error: {:?}",
err,
"Cancelling block stream task",
);
err
})

Ok(())
},
result = block_stream_future => {
result.map_err(|err| {
tracing::error!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Block stream task stopped due to error: {:?}",
err,
);
err
})
}
}
}
});
Expand All @@ -107,6 +148,10 @@ impl BlockStream {
task.cancellation_token.cancel();
let _ = task.handle.await?;

// Fails if metric doesn't exist, i.e. task was never polled
let _ = metrics::BLOCK_STREAM_UP
.remove_label_values(&[&self.indexer_config.get_full_name()]);

return Ok(());
}

Expand All @@ -129,7 +174,7 @@ impl BlockStream {
pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: &IndexerConfig,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
chain_id: &ChainId,
Expand All @@ -145,7 +190,7 @@ pub(crate) async fn start_block_stream(
let last_indexed_delta_lake_block = process_delta_lake_blocks(
start_block_height,
delta_lake_client,
redis_client.clone(),
redis.clone(),
indexer,
redis_stream.clone(),
)
Expand All @@ -156,7 +201,7 @@ pub(crate) async fn start_block_stream(
last_indexed_delta_lake_block,
lake_s3_client,
lake_prefetch_size,
redis_client,
redis,
indexer,
redis_stream,
chain_id,
Expand All @@ -175,7 +220,7 @@ pub(crate) async fn start_block_stream(
async fn process_delta_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
) -> anyhow::Result<u64> {
Expand Down Expand Up @@ -230,10 +275,10 @@ async fn process_delta_lake_blocks(

for block_height in &blocks_from_index {
let block_height = block_height.to_owned();
redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
redis
.publish_block(indexer, redis_stream.clone(), block_height, MAX_STREAM_SIZE)
.await?;
redis_client
redis
.set_last_processed_block(indexer, block_height)
.await?;
}
Expand All @@ -253,7 +298,7 @@ async fn process_near_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
lake_prefetch_size: usize,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
chain_id: &ChainId,
Expand All @@ -278,7 +323,7 @@ async fn process_near_lake_blocks(
let block_height = streamer_message.block.header.height;
last_indexed_block = block_height;

redis_client
redis
.set_last_processed_block(indexer, block_height)
.await?;

Expand All @@ -289,18 +334,14 @@ async fn process_near_lake_blocks(
);

if !matches.is_empty() {
if let Ok(Some(stream_length)) =
redis_client.get_stream_length(redis_stream.clone()).await
{
if let Ok(Some(stream_length)) = redis.get_stream_length(redis_stream.clone()).await {
if stream_length <= MAX_STREAM_SIZE_WITH_CACHE {
redis_client
.cache_streamer_message(&streamer_message)
.await?;
redis.cache_streamer_message(&streamer_message).await?;
}
}

redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
redis
.publish_block(indexer, redis_stream.clone(), block_height, MAX_STREAM_SIZE)
.await?;
}
}
Expand Down Expand Up @@ -355,29 +396,30 @@ mod tests {
.expect_list_matching_block_heights()
.returning(|_, _| Ok(vec![107503702, 107503703]));

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
let mut mock_redis = crate::redis::RedisClient::default();
mock_redis
.expect_publish_block()
.with(
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503702, 107503703, 107503705]),
predicate::always(),
)
.returning(|_, _, _| Ok(()))
.returning(|_, _, _, _| Ok(()))
.times(3);
mock_redis_client
mock_redis
.expect_set_last_processed_block()
.with(
predicate::always(),
predicate::in_iter([107503702, 107503703, 107503704, 107503705]),
)
.returning(|_, _| Ok(()))
.times(4);
mock_redis_client
mock_redis
.expect_cache_streamer_message()
.with(predicate::always())
.returning(|_| Ok(()));
mock_redis_client
mock_redis
.expect_get_stream_length()
.with(predicate::eq("stream key".to_string()))
.returning(|_| Ok(Some(10)));
Expand All @@ -397,7 +439,7 @@ mod tests {
start_block_stream(
91940840,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_redis),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_client,
&ChainId::Mainnet,
Expand Down Expand Up @@ -435,8 +477,9 @@ mod tests {
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503705]),
predicate::always(),
)
.returning(|_, _, _| Ok(()))
.returning(|_, _, _, _| Ok(()))
.times(1);
mock_redis_client
.expect_set_last_processed_block()
Expand Down
4 changes: 2 additions & 2 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
"Starting Block Streamer"
);

let redis_client = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);
let redis = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);

let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::Config::from(&aws_config);
Expand All @@ -58,7 +58,7 @@ async fn main() -> anyhow::Result<()> {

tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server"));

server::init(&grpc_port, redis_client, delta_lake_client, lake_s3_client).await?;
server::init(&grpc_port, redis, delta_lake_client, lake_s3_client).await?;

Ok(())
}
6 changes: 6 additions & 0 deletions block-streamer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ lazy_static! {
&["level"]
)
.unwrap();
pub static ref BLOCK_STREAM_UP: IntCounterVec = register_int_counter_vec!(
"queryapi_block_streamer_block_stream_up",
"A continuously increasing counter to indicate the block stream is up",
&["indexer"]
)
.unwrap();
}

pub struct LogCounter;
Expand Down
Loading
Loading