Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Expose health data from Block Streams/Executors #888

Closed
22 changes: 22 additions & 0 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,26 @@ message StreamInfo {
string function_name = 4;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 5;
// Contains health information for the Block Stream
Health health = 6;
}

// Contains health information for the Block Stream
message Health {
// The processing state of the block stream
ProcessingState processing_state = 1;
// When the health info was last updated
uint64 updated_at_timestamp_secs = 2;
}

enum ProcessingState {
UNSPECIFIED = 0;
// Not started, or has been stopped
IDLE = 1;
// Running as expected
RUNNING = 2;
// Intentionally paused due to some internal condition
PAUSED = 3;
// Stopped due to some unknown error
STALLED = 4;
}
170 changes: 155 additions & 15 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::cmp::Ordering;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::time::SystemTime;

use anyhow::Context;
use futures::StreamExt;
Expand Down Expand Up @@ -52,16 +54,43 @@ impl<F: Future> Future for PollCounter<F> {
}

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
stream_handle: JoinHandle<anyhow::Result<()>>,
monitor_handle: JoinHandle<()>,
cancellation_token: tokio_util::sync::CancellationToken,
}

/// Represents the processing state of a block stream
#[derive(Clone)]
pub enum ProcessingState {
/// Block Stream is not currently active but can be started. Either has not been started or was
/// stopped.
Idle,

/// Block Stream is actively processing blocks.
Running,

/// Block Stream has been intentionally/internally paused due to some condition, i.e. back pressure on
/// the Redis Stream.
Paused,

/// Block Stream has stalled due to an error or other condition. Must be manually
/// restarted.
Stalled,
}

#[derive(Clone)]
pub struct BlockStreamHealth {
pub processing_state: ProcessingState,
pub last_updated: SystemTime,
}

pub struct BlockStream {
task: Option<Task>,
pub indexer_config: IndexerConfig,
pub chain_id: ChainId,
pub version: u64,
pub redis_stream: String,
health: Arc<Mutex<BlockStreamHealth>>,
}

impl BlockStream {
Expand All @@ -77,23 +106,107 @@ impl BlockStream {
chain_id,
version,
redis_stream,
health: Arc::new(Mutex::new(BlockStreamHealth {
processing_state: ProcessingState::Idle,
last_updated: SystemTime::now(),
})),
}
}

pub fn start(
&mut self,
pub fn health(&self) -> anyhow::Result<BlockStreamHealth> {
match self.health.lock() {
Ok(health) => Ok(health.clone()),
Err(e) => Err(anyhow::anyhow!("Failed to acquire health lock: {:?}", e)),
}
}

fn start_health_monitoring_task(&self, redis: Arc<RedisClient>) -> JoinHandle<()> {
tokio::spawn({
let config = self.indexer_config.clone();
let health = self.health.clone();
let redis_stream = self.redis_stream.clone();

async move {
let mut last_processed_block =
redis.get_last_processed_block(&config).await.unwrap();

loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;

let new_last_processed_block =
if let Ok(block) = redis.get_last_processed_block(&config).await {
block
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to fetch last processed block"
);
continue;
};

let stream_size = if let Ok(stream_size) =
redis.get_stream_length(redis_stream.clone()).await
{
stream_size
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to fetch stream size"
);
continue;
};

let mut health_lock = if let Ok(health) = health.lock() {
health
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to acquire health lock"
);
continue;
};

match new_last_processed_block.cmp(&last_processed_block) {
Ordering::Less => {
tracing::error!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Last processed block should not decrease"
);

health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Equal if stream_size >= Some(MAX_STREAM_SIZE) => {
health_lock.processing_state = ProcessingState::Paused;
}
Ordering::Equal => {
health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Greater => {
health_lock.processing_state = ProcessingState::Running;
}
};

health_lock.last_updated = SystemTime::now();

last_processed_block = new_last_processed_block;
}
}
})
}

fn start_block_stream_task(
&self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis: Arc<RedisClient>,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
lake_s3_client: SharedLakeS3Client,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();

let handle = tokio::spawn({
cancellation_token: tokio_util::sync::CancellationToken,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn({
let cancellation_token = cancellation_token.clone();
let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
Expand Down Expand Up @@ -137,10 +250,35 @@ impl BlockStream {
}
}
}
});
})
}

pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis: Arc<RedisClient>,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
lake_s3_client: SharedLakeS3Client,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();

let monitor_handle = self.start_health_monitoring_task(redis.clone());

let stream_handle = self.start_block_stream_task(
start_block_height,
redis,
reciever_blocks_processor,
lake_s3_client,
cancellation_token.clone(),
);

self.task = Some(Task {
handle,
stream_handle,
monitor_handle,
cancellation_token,
});

Expand All @@ -149,8 +287,9 @@ impl BlockStream {

pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(task) = self.task.take() {
task.monitor_handle.abort();
task.cancellation_token.cancel();
let _ = task.handle.await?;
let _ = task.stream_handle.await?;

// Fails if metric doesn't exist, i.e. task was never polled
let _ = metrics::BLOCK_STREAM_UP
Expand All @@ -167,6 +306,7 @@ impl BlockStream {

#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "block_stream"
skip_all,
fields(
account_id = indexer.account_id.as_str(),
Expand Down
22 changes: 22 additions & 0 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ impl RedisCommandsImpl {
Ok(Self { connection })
}

pub async fn get<T>(&self, key: T) -> Result<Option<u64>, RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("GET: {:?}", key);

redis::cmd("GET")
.arg(key)
.query_async(&mut self.connection.clone())
.await
}

pub async fn xadd<T, U>(&self, stream_key: T, fields: &[(String, U)]) -> Result<(), RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
Expand Down Expand Up @@ -133,6 +145,16 @@ impl RedisClientImpl {
.context("Failed to set last processed block")
}

pub async fn get_last_processed_block(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<u64>> {
self.commands
.get(indexer_config.last_processed_block_key())
.await
.context("Failed to set last processed block")
}

pub async fn get_stream_length(&self, stream: String) -> anyhow::Result<Option<u64>> {
self.commands.xlen(stream).await
}
Expand Down
52 changes: 47 additions & 5 deletions block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::SystemTime;

use near_lake_framework::near_indexer_primitives;
use tonic::{Request, Response, Status};
Expand All @@ -21,6 +22,30 @@ pub struct BlockStreamerService {
block_streams: Mutex<HashMap<String, block_stream::BlockStream>>,
}

impl From<block_stream::BlockStreamHealth> for blockstreamer::Health {
fn from(health: block_stream::BlockStreamHealth) -> Self {
blockstreamer::Health {
processing_state: match health.processing_state {
block_stream::ProcessingState::Running => {
blockstreamer::ProcessingState::Running as i32
}
block_stream::ProcessingState::Idle => blockstreamer::ProcessingState::Idle as i32,
block_stream::ProcessingState::Stalled => {
blockstreamer::ProcessingState::Stalled as i32
}
block_stream::ProcessingState::Paused => {
blockstreamer::ProcessingState::Paused as i32
}
},
updated_at_timestamp_secs: health
.last_updated
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
}

impl BlockStreamerService {
pub fn new(
redis: std::sync::Arc<crate::redis::RedisClient>,
Expand Down Expand Up @@ -77,11 +102,17 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
});

if let Some((stream_id, stream)) = stream_entry {
let stream_health = stream.health().map_err(|err| {
tracing::error!(?err, "Failed to get health of block stream");
Status::internal("Failed to get health of block stream")
})?;

Ok(Response::new(StreamInfo {
stream_id: stream_id.to_string(),
account_id: stream.indexer_config.account_id.to_string(),
function_name: stream.indexer_config.function_name.to_string(),
version: stream.version,
health: Some(stream_health.into()),
}))
} else {
Err(Status::not_found(format!(
Expand Down Expand Up @@ -210,11 +241,22 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic

let block_streams: Vec<StreamInfo> = lock
.values()
.map(|block_stream| StreamInfo {
stream_id: block_stream.indexer_config.get_hash_id(),
account_id: block_stream.indexer_config.account_id.to_string(),
function_name: block_stream.indexer_config.function_name.clone(),
version: block_stream.version,
.map(|block_stream| {
let stream_health = block_stream
.health()
.map_err(|err| {
tracing::error!(?err, "Failed to get health of block stream");
Status::internal("Failed to get health of block stream")
})
.ok();

StreamInfo {
stream_id: block_stream.indexer_config.get_hash_id(),
account_id: block_stream.indexer_config.account_id.to_string(),
function_name: block_stream.indexer_config.function_name.to_string(),
version: block_stream.version,
health: stream_health.map(|health| health.into()),
}
})
.collect();

Expand Down
4 changes: 2 additions & 2 deletions runner/examples/list-executors.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Run with 'npx ts-node src/test-client.ts'

import runnerClient from '../src/server/runner-client';
import runnerClient from '../src/server/services/runner/runner-client';

void (async function main () {
runnerClient.ListExecutors({}, (err, response) => {
if (err) {
console.error('List request error: ', err);
} else {
console.log('Successful ListExecutors request: ', response);
console.log('list response: ', JSON.stringify({ response }, null, 2));
}
});
})();
Loading
Loading