Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Expose health info from Block Stream/Executor RPC #889

Merged
merged 12 commits into from
Jul 18, 2024
Merged
22 changes: 22 additions & 0 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,26 @@ message StreamInfo {
string function_name = 4;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 5;
// Contains health information for the Block Stream
Health health = 6;
}

// Contains health information for the Block Stream
message Health {
// The processing state of the block stream
ProcessingState processing_state = 1;
// When the health info was last updated
uint64 updated_at_timestamp_secs = 2;
}

enum ProcessingState {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you just want to represent any possible states of the service from the get go? I imagine scenarios for UNSPECIFIED and WAITING are vague right now. Specifically for WAITING, I'm confused what that refers to. Does Runner back pressure count as WAITING? If it does, then it could swing between WAITING and RUNNING repeatedly. Otherwise, I'm not sure. I'm also trying to think what action items Coordinator would intend to have when receiving each of these states.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you just want to represent any possible states of the service from the get go?

Yes essentially. While we only really need Stalled, it was pretty trivial to add these other states.

And yes, Waiting was created specifically for the back pressure case. Doesn't really serve any purpose now. But it could be beneficial to expose these as metrics from Coordinator so we can view what state each indexer is in 🤔

UNSPECIFIED = 0;
// Not started, or has been stopped
IDLE = 1;
// Running as expected
RUNNING = 2;
// Waiting for some internal condition to be met before continuing
WAITING = 3;
// Stopped due to some unknown error
STALLED = 4;
}
170 changes: 155 additions & 15 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::cmp::Ordering;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::time::SystemTime;

use anyhow::Context;
use futures::StreamExt;
Expand Down Expand Up @@ -52,16 +54,43 @@ impl<F: Future> Future for PollCounter<F> {
}

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
stream_handle: JoinHandle<anyhow::Result<()>>,
monitor_handle: JoinHandle<()>,
cancellation_token: tokio_util::sync::CancellationToken,
}

/// Represents the processing state of a block stream
#[derive(Clone)]
pub enum ProcessingState {
/// Block Stream is not currently active but can be started. Either has not been started or was
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A block stream which wasn't started wouldn't even return a state right? It wouldn't be present in the list of block streams.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and no. The RPC logic is to only add "started" BlockStreams. But it's still possible to create a BlockStream and not start it, and it would make sense to default to Running

/// stopped.
Idle,

/// Block Stream is actively processing blocks.
Running,

/// Block Stream has been intentionally/internally paused due to some condition, i.e. back pressure on
/// the Redis Stream.
Waiting,

/// Block Stream has stalled due to an error or other condition. Must be manually
/// restarted.
Stalled,
}

#[derive(Clone)]
pub struct BlockStreamHealth {
pub processing_state: ProcessingState,
pub last_updated: SystemTime,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike Runner which pushes the information up, Block Streamer polls it. For this reason I've added a timestamp, so that we can determine whether the data is stale or not. Stale will probably result in restart.

}

pub struct BlockStream {
task: Option<Task>,
pub indexer_config: IndexerConfig,
pub chain_id: ChainId,
pub version: u64,
pub redis_stream: String,
health: Arc<Mutex<BlockStreamHealth>>,
}

impl BlockStream {
Expand All @@ -77,23 +106,107 @@ impl BlockStream {
chain_id,
version,
redis_stream,
health: Arc::new(Mutex::new(BlockStreamHealth {
processing_state: ProcessingState::Idle,
last_updated: SystemTime::now(),
})),
}
}

pub fn start(
&mut self,
pub fn health(&self) -> anyhow::Result<BlockStreamHealth> {
match self.health.lock() {
Ok(health) => Ok(health.clone()),
Err(e) => Err(anyhow::anyhow!("Failed to acquire health lock: {:?}", e)),
}
}

fn start_health_monitoring_task(&self, redis: Arc<RedisClient>) -> JoinHandle<()> {
tokio::spawn({
let config = self.indexer_config.clone();
let health = self.health.clone();
let redis_stream = self.redis_stream.clone();

async move {
let mut last_processed_block =
redis.get_last_processed_block(&config).await.unwrap();

loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;

let new_last_processed_block =
if let Ok(block) = redis.get_last_processed_block(&config).await {
block
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to fetch last processed block"
);
continue;
};

let stream_size = if let Ok(stream_size) =
redis.get_stream_length(redis_stream.clone()).await
{
stream_size
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to fetch stream size"
);
continue;
};

let mut health_lock = if let Ok(health) = health.lock() {
health
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to acquire health lock"
);
continue;
};

match new_last_processed_block.cmp(&last_processed_block) {
Ordering::Less => {
tracing::error!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Last processed block should not decrease"
);

health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Equal if stream_size >= Some(MAX_STREAM_SIZE) => {
health_lock.processing_state = ProcessingState::Waiting;
}
Ordering::Equal => {
health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Greater => {
health_lock.processing_state = ProcessingState::Running;
}
};

health_lock.last_updated = SystemTime::now();

last_processed_block = new_last_processed_block;
}
}
})
}

fn start_block_stream_task(
&self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis: Arc<RedisClient>,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
lake_s3_client: SharedLakeS3Client,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();

let handle = tokio::spawn({
cancellation_token: tokio_util::sync::CancellationToken,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn({
let cancellation_token = cancellation_token.clone();
let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
Expand Down Expand Up @@ -137,10 +250,35 @@ impl BlockStream {
}
}
}
});
})
}

pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis: Arc<RedisClient>,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
lake_s3_client: SharedLakeS3Client,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();

let monitor_handle = self.start_health_monitoring_task(redis.clone());

let stream_handle = self.start_block_stream_task(
start_block_height,
redis,
reciever_blocks_processor,
lake_s3_client,
cancellation_token.clone(),
);

self.task = Some(Task {
handle,
stream_handle,
monitor_handle,
cancellation_token,
});

Expand All @@ -149,8 +287,9 @@ impl BlockStream {

pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(task) = self.task.take() {
task.monitor_handle.abort();
task.cancellation_token.cancel();
let _ = task.handle.await?;
let _ = task.stream_handle.await?;

// Fails if metric doesn't exist, i.e. task was never polled
let _ = metrics::BLOCK_STREAM_UP
Expand All @@ -167,6 +306,7 @@ impl BlockStream {

#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "block_stream"
skip_all,
fields(
account_id = indexer.account_id.as_str(),
Expand Down
22 changes: 22 additions & 0 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ impl RedisCommandsImpl {
Ok(Self { connection })
}

pub async fn get<T>(&self, key: T) -> Result<Option<u64>, RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("GET: {:?}", key);

redis::cmd("GET")
.arg(key)
.query_async(&mut self.connection.clone())
.await
}

pub async fn xadd<T, U>(&self, stream_key: T, fields: &[(String, U)]) -> Result<(), RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
Expand Down Expand Up @@ -133,6 +145,16 @@ impl RedisClientImpl {
.context("Failed to set last processed block")
}

pub async fn get_last_processed_block(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<u64>> {
self.commands
.get(indexer_config.last_processed_block_key())
.await
.context("Failed to set last processed block")
}

pub async fn get_stream_length(&self, stream: String) -> anyhow::Result<Option<u64>> {
self.commands.xlen(stream).await
}
Expand Down
52 changes: 47 additions & 5 deletions block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::SystemTime;

use near_lake_framework::near_indexer_primitives;
use tonic::{Request, Response, Status};
Expand All @@ -21,6 +22,30 @@ pub struct BlockStreamerService {
block_streams: Mutex<HashMap<String, block_stream::BlockStream>>,
}

impl From<block_stream::BlockStreamHealth> for blockstreamer::Health {
fn from(health: block_stream::BlockStreamHealth) -> Self {
blockstreamer::Health {
processing_state: match health.processing_state {
block_stream::ProcessingState::Running => {
blockstreamer::ProcessingState::Running as i32
}
block_stream::ProcessingState::Idle => blockstreamer::ProcessingState::Idle as i32,
block_stream::ProcessingState::Stalled => {
blockstreamer::ProcessingState::Stalled as i32
}
block_stream::ProcessingState::Waiting => {
blockstreamer::ProcessingState::Waiting as i32
}
},
updated_at_timestamp_secs: health
.last_updated
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
}

impl BlockStreamerService {
pub fn new(
redis: std::sync::Arc<crate::redis::RedisClient>,
Expand Down Expand Up @@ -77,11 +102,17 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
});

if let Some((stream_id, stream)) = stream_entry {
let stream_health = stream.health().map_err(|err| {
tracing::error!(?err, "Failed to get health of block stream");
Status::internal("Failed to get health of block stream")
})?;

Ok(Response::new(StreamInfo {
stream_id: stream_id.to_string(),
account_id: stream.indexer_config.account_id.to_string(),
function_name: stream.indexer_config.function_name.to_string(),
version: stream.version,
health: Some(stream_health.into()),
}))
} else {
Err(Status::not_found(format!(
Expand Down Expand Up @@ -210,11 +241,22 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic

let block_streams: Vec<StreamInfo> = lock
.values()
.map(|block_stream| StreamInfo {
stream_id: block_stream.indexer_config.get_hash_id(),
account_id: block_stream.indexer_config.account_id.to_string(),
function_name: block_stream.indexer_config.function_name.clone(),
version: block_stream.version,
.map(|block_stream| {
let stream_health = block_stream
.health()
.map_err(|err| {
tracing::error!(?err, "Failed to get health of block stream");
Status::internal("Failed to get health of block stream")
})
.ok();

StreamInfo {
stream_id: block_stream.indexer_config.get_hash_id(),
account_id: block_stream.indexer_config.account_id.to_string(),
function_name: block_stream.indexer_config.function_name.to_string(),
version: block_stream.version,
health: stream_health.map(|health| health.into()),
}
})
.collect();

Expand Down
Loading
Loading