Skip to content

Commit

Permalink
fix: Add metrics for reliably measuring Block Stream/Executor health (#…
Browse files Browse the repository at this point in the history
…843)

The current methods for determining both Block Stream and Executor
health is flawed. This PR addresses these flaws by adding new, more
reliable, metrics for use within Grafana.

### Block Streams

A Block Stream is considered healthy if `LAST_PROCESSED_BLOCK` is
continuously incremented, i.e. we are continuously downloading blocks
from S3. This is flawed for the following reasons:
1. When the Redis Stream if full, we halt the Block Stream, preventing
it from processing more blocks
2. When a Block Stream is intentionally stopped, we no longer process
blocks

To address these flaws, I've introduced a new dedicated metric:
`BLOCK_STREAM_UP`, which:
- is incremented every time the Block Stream future is polled, i.e. the
task is doing work. A static value means unhealthy.
- is removed when the Block Stream is stopped, so that it doesn't
trigger the false positive described above

### Executors

An Executor is considered unhealthy if: it has messages in the Redis
Stream, and no reported execution durations. The latter only being
recorded on success. The inverse of this is used to determine "healthy".
This is flawed for the following reasons:
1. We distinguish the difference between a genuinely broken Indexer, and
one broken due to system failures
2. "health" is only determined when there are messages in Redis, meaning
we catch the issue later than possible

To address these I have added the following metrics:
1. `EXECUTOR_UP` which is incremented on every Executor loop, like
above, a static value means unhealthy.
2. `SUCCESSFUL_EXECUTIONS`/`FAILED_EXECUTIONS` which track
successful/failed executions directly, rather than tracking using
durations. This will be useful for tracking health of specific Indexers,
e.g. the `staking` indexer should never have failed executions.
  • Loading branch information
morgsmccauley committed Jun 27, 2024
1 parent e3a835b commit adaadfc
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 36 deletions.
1 change: 1 addition & 0 deletions block-streamer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ graphql_client = { version = "0.14.0", features = ["reqwest"] }
lazy_static = "1.4.0"
mockall = "0.11.4"
near-lake-framework = "0.7.8"
pin-project = "1.1.5"
prometheus = "0.13.3"
prost = "0.12.3"
redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] }
Expand Down
96 changes: 70 additions & 26 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;

use anyhow::Context;
use near_lake_framework::near_indexer_primitives;
use tokio::task::JoinHandle;
Expand All @@ -14,6 +18,35 @@ const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100;
const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"];
const MAX_STREAM_SIZE: u64 = 100;

#[pin_project::pin_project]
pub struct PollCounter<F> {
#[pin]
inner: F,
indexer_name: String,
}

impl<F> PollCounter<F> {
pub fn new(inner: F, indexer_name: String) -> Self {
Self {
inner,
indexer_name,
}
}
}

impl<F: Future> Future for PollCounter<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
metrics::BLOCK_STREAM_UP
.with_label_values(&[&self.indexer_name])
.inc();

let this = self.project();
this.inner.poll(cx)
}
}

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
cancellation_token: tokio_util::sync::CancellationToken,
Expand Down Expand Up @@ -55,42 +88,49 @@ impl BlockStream {
}

let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
let redis_stream = self.redis_stream.clone();

let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
tracing::info!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Cancelling block stream task",
);

Ok(())
},
result = start_block_stream(

let handle = tokio::spawn({
let cancellation_token = cancellation_token.clone();
let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
let redis_stream = self.redis_stream.clone();

async move {
let block_stream_future = start_block_stream(
start_block_height,
&indexer_config,
redis,
delta_lake_client,
lake_s3_client,
&chain_id,
LAKE_PREFETCH_SIZE,
redis_stream
) => {
result.map_err(|err| {
tracing::error!(
redis_stream,
);

let block_stream_future =
PollCounter::new(block_stream_future, indexer_config.get_full_name());

tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::info!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Block stream task stopped due to error: {:?}",
err,
"Cancelling block stream task",
);
err
})

Ok(())
},
result = block_stream_future => {
result.map_err(|err| {
tracing::error!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Block stream task stopped due to error: {:?}",
err,
);
err
})
}
}
}
});
Expand All @@ -108,6 +148,10 @@ impl BlockStream {
task.cancellation_token.cancel();
let _ = task.handle.await?;

// Fails if metric doesn't exist, i.e. task was never polled
let _ = metrics::BLOCK_STREAM_UP
.remove_label_values(&[&self.indexer_config.get_full_name()]);

return Ok(());
}

Expand Down
6 changes: 6 additions & 0 deletions block-streamer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ lazy_static! {
&["level"]
)
.unwrap();
pub static ref BLOCK_STREAM_UP: IntCounterVec = register_int_counter_vec!(
"queryapi_block_streamer_block_stream_up",
"A continuously increasing counter to indicate the block stream is up",
&["indexer"]
)
.unwrap();
}

pub struct LogCounter;
Expand Down
16 changes: 11 additions & 5 deletions block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl BlockStreamerService {

#[tonic::async_trait]
impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService {
#[tracing::instrument(skip(self))]
async fn start_stream(
&self,
request: Request<blockstreamer::StartStreamRequest>,
Expand Down Expand Up @@ -117,7 +118,11 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
self.delta_lake_client.clone(),
self.lake_s3_client.clone(),
)
.map_err(|_| Status::internal("Failed to start block stream"))?;
.map_err(|err| {
tracing::error!(?err, "Failed to start block stream");

Status::internal("Failed to start block stream")
})?;

let mut lock = self.get_block_streams_lock()?;
lock.insert(indexer_config.get_hash_id(), block_stream);
Expand All @@ -127,6 +132,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
}))
}

#[tracing::instrument(skip(self))]
async fn stop_stream(
&self,
request: Request<blockstreamer::StopStreamRequest>,
Expand All @@ -148,10 +154,10 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
)))
}
Some(mut block_stream) => {
block_stream
.cancel()
.await
.map_err(|_| Status::internal("Failed to cancel block stream"))?;
block_stream.cancel().await.map_err(|err| {
tracing::error!(?err, "Failed to cancel block stream");
Status::internal("Failed to cancel block stream")
})?;
}
}

Expand Down
9 changes: 5 additions & 4 deletions coordinator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 22 additions & 1 deletion runner/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ const LOGS_COUNT = new Counter({
labelNames: ['level'],
});

const EXECUTOR_UP = new Counter({
name: 'queryapi_runner_executor_up',
help: 'Incremented each time the executor loop runs to indicate whether the job is functional',
labelNames: ['indexer'],
});

const SUCCESSFUL_EXECUTIONS = new Counter({
name: 'queryapi_runner_successful_executions',
help: 'Count of successful executions of an indexer function',
labelNames: ['indexer'],
});

const FAILED_EXECUTIONS = new Counter({
name: 'queryapi_runner_failed_executions',
help: 'Count of failed executions of an indexer function',
labelNames: ['indexer'],
});

export const METRICS = {
HEAP_TOTAL_ALLOCATION,
HEAP_USED,
Expand All @@ -73,7 +91,10 @@ export const METRICS = {
UNPROCESSED_STREAM_MESSAGES,
LAST_PROCESSED_BLOCK_HEIGHT,
EXECUTION_DURATION,
LOGS_COUNT
LOGS_COUNT,
EXECUTOR_UP,
SUCCESSFUL_EXECUTIONS,
FAILED_EXECUTIONS,
};

const aggregatorRegistry = new AggregatorRegistry();
Expand Down
4 changes: 4 additions & 0 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise<void>
let currBlockHeight = 0;

while (true) {
METRICS.EXECUTOR_UP.labels({ indexer: indexerConfig.fullName() }).inc();

if (workerContext.queue.length === 0) {
await sleep(100);
continue;
Expand Down Expand Up @@ -150,9 +152,11 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise<void>

executionDurationTimer();

METRICS.SUCCESSFUL_EXECUTIONS.labels({ indexer: indexerConfig.fullName() }).inc();
METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerConfig.fullName() }).set(currBlockHeight);
postRunSpan.end();
} catch (err) {
METRICS.FAILED_EXECUTIONS.labels({ indexer: indexerConfig.fullName() }).inc();
parentSpan.setAttribute('status', 'failed');
parentPort?.postMessage({ type: WorkerMessageType.STATUS, data: { status: IndexerStatus.FAILING } });
const error = err as Error;
Expand Down

0 comments on commit adaadfc

Please sign in to comment.