Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add metrics for reliably measuring Block Stream/Executor health #843

Merged
merged 7 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions block-streamer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ graphql_client = { version = "0.14.0", features = ["reqwest"] }
lazy_static = "1.4.0"
mockall = "0.11.4"
near-lake-framework = "0.7.8"
pin-project = "1.1.5"
prometheus = "0.13.3"
prost = "0.12.3"
redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] }
Expand Down
96 changes: 70 additions & 26 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;

use anyhow::Context;
use near_lake_framework::near_indexer_primitives;
use tokio::task::JoinHandle;
Expand All @@ -14,6 +18,35 @@ const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100;
const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"];
const MAX_STREAM_SIZE: u64 = 100;

#[pin_project::pin_project]
pub struct PollCounter<F> {
#[pin]
inner: F,
indexer_name: String,
}

impl<F> PollCounter<F> {
pub fn new(inner: F, indexer_name: String) -> Self {
Self {
inner,
indexer_name,
}
}
}

impl<F: Future> Future for PollCounter<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
metrics::BLOCK_STREAM_UP
.with_label_values(&[&self.indexer_name])
.inc();

let this = self.project();
this.inner.poll(cx)
}
}

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
cancellation_token: tokio_util::sync::CancellationToken,
Expand Down Expand Up @@ -55,42 +88,49 @@ impl BlockStream {
}

let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
let redis_stream = self.redis_stream.clone();

let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
tracing::info!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Cancelling block stream task",
);

Ok(())
},
result = start_block_stream(

let handle = tokio::spawn({
let cancellation_token = cancellation_token.clone();
let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
let redis_stream = self.redis_stream.clone();

async move {
let block_stream_future = start_block_stream(
start_block_height,
&indexer_config,
redis,
delta_lake_client,
lake_s3_client,
&chain_id,
LAKE_PREFETCH_SIZE,
redis_stream
) => {
result.map_err(|err| {
tracing::error!(
redis_stream,
);

let block_stream_future =
PollCounter::new(block_stream_future, indexer_config.get_full_name());

tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::info!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Block stream task stopped due to error: {:?}",
err,
"Cancelling block stream task",
);
err
})

Ok(())
},
result = block_stream_future => {
result.map_err(|err| {
tracing::error!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Block stream task stopped due to error: {:?}",
err,
);
err
})
}
}
}
});
Expand All @@ -108,6 +148,10 @@ impl BlockStream {
task.cancellation_token.cancel();
let _ = task.handle.await?;

// Fails if metric doesn't exist, i.e. task was never polled
let _ = metrics::BLOCK_STREAM_UP
.remove_label_values(&[&self.indexer_config.get_full_name()]);

return Ok(());
}

Expand Down
6 changes: 6 additions & 0 deletions block-streamer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ lazy_static! {
&["level"]
)
.unwrap();
pub static ref BLOCK_STREAM_UP: IntCounterVec = register_int_counter_vec!(
"queryapi_block_streamer_block_stream_up",
"A continuously increasing counter to indicate the block stream is up",
&["indexer"]
)
.unwrap();
}

pub struct LogCounter;
Expand Down
16 changes: 11 additions & 5 deletions block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl BlockStreamerService {

#[tonic::async_trait]
impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService {
#[tracing::instrument(skip(self))]
async fn start_stream(
&self,
request: Request<blockstreamer::StartStreamRequest>,
Expand Down Expand Up @@ -117,7 +118,11 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
self.delta_lake_client.clone(),
self.lake_s3_client.clone(),
)
.map_err(|_| Status::internal("Failed to start block stream"))?;
.map_err(|err| {
tracing::error!(?err, "Failed to start block stream");

Status::internal("Failed to start block stream")
})?;

let mut lock = self.get_block_streams_lock()?;
lock.insert(indexer_config.get_hash_id(), block_stream);
Expand All @@ -127,6 +132,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
}))
}

#[tracing::instrument(skip(self))]
async fn stop_stream(
&self,
request: Request<blockstreamer::StopStreamRequest>,
Expand All @@ -148,10 +154,10 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
)))
}
Some(mut block_stream) => {
block_stream
.cancel()
.await
.map_err(|_| Status::internal("Failed to cancel block stream"))?;
block_stream.cancel().await.map_err(|err| {
tracing::error!(?err, "Failed to cancel block stream");
Status::internal("Failed to cancel block stream")
})?;
}
}

Expand Down
9 changes: 5 additions & 4 deletions coordinator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion runner/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ const LOGS_COUNT = new Counter({
labelNames: ['level'],
});

const EXECUTOR_UP = new Counter({
name: 'queryapi_runner_executor_up',
help: 'Incremented each time the executor loop runs to indicate whether the job is functional',
labelNames: ['indexer'],
});

const SUCCESSFUL_EXECUTIONS = new Counter({
name: 'queryapi_runner_successful_executions',
help: 'Count of successful executions of an indexer function',
labelNames: ['indexer'],
});

const FAILED_EXECUTIONS = new Counter({
name: 'queryapi_runner_failed_executions',
help: 'Count of failed executions of an indexer function',
labelNames: ['indexer'],
});

export const METRICS = {
HEAP_TOTAL_ALLOCATION,
HEAP_USED,
Expand All @@ -73,7 +91,10 @@ export const METRICS = {
UNPROCESSED_STREAM_MESSAGES,
LAST_PROCESSED_BLOCK_HEIGHT,
EXECUTION_DURATION,
LOGS_COUNT
LOGS_COUNT,
EXECUTOR_UP,
SUCCESSFUL_EXECUTIONS,
FAILED_EXECUTIONS,
};

const aggregatorRegistry = new AggregatorRegistry();
Expand Down
4 changes: 4 additions & 0 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise<void>
let currBlockHeight = 0;

while (true) {
METRICS.EXECUTOR_UP.labels({ indexer: indexerConfig.fullName() }).inc();

if (workerContext.queue.length === 0) {
await sleep(100);
continue;
Expand Down Expand Up @@ -150,9 +152,11 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise<void>

executionDurationTimer();

METRICS.SUCCESSFUL_EXECUTIONS.labels({ indexer: indexerConfig.fullName() }).inc();
METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerConfig.fullName() }).set(currBlockHeight);
postRunSpan.end();
} catch (err) {
METRICS.FAILED_EXECUTIONS.labels({ indexer: indexerConfig.fullName() }).inc();
parentSpan.setAttribute('status', 'failed');
parentPort?.postMessage({ type: WorkerMessageType.STATUS, data: { status: IndexerStatus.FAILING } });
const error = err as Error;
Expand Down
Loading