Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prod Release 19/07/24 #897

Merged
merged 6 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ service BlockStreamer {

// Lists all current BlockStream processes
rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse);

// Get info for an existing BlockStream process
rpc GetStream (GetStreamRequest) returns (StreamInfo);
}

// Request message for getting a BlockStream
message GetStreamRequest {
// Account ID which the indexer is defined under
string account_id = 1;
// Name of the indexer
string function_name = 2;
}

// Request message for starting a BlockStream
Expand Down Expand Up @@ -97,4 +108,26 @@ message StreamInfo {
string function_name = 4;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 5;
// Contains health information for the Block Stream
Health health = 6;
}

// Contains health information for the Block Stream
message Health {
// The processing state of the block stream
ProcessingState processing_state = 1;
// When the health info was last updated
uint64 updated_at_timestamp_secs = 2;
}

enum ProcessingState {
UNSPECIFIED = 0;
// Not started, or has been stopped
IDLE = 1;
// Running as expected
RUNNING = 2;
// Waiting for some internal condition to be met before continuing
WAITING = 3;
// Stopped due to some unknown error
STALLED = 4;
}
170 changes: 155 additions & 15 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::cmp::Ordering;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::time::SystemTime;

use anyhow::Context;
use futures::StreamExt;
Expand Down Expand Up @@ -52,16 +54,43 @@ impl<F: Future> Future for PollCounter<F> {
}

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
stream_handle: JoinHandle<anyhow::Result<()>>,
monitor_handle: JoinHandle<()>,
cancellation_token: tokio_util::sync::CancellationToken,
}

/// Represents the processing state of a block stream
#[derive(Clone)]
pub enum ProcessingState {
/// Block Stream is not currently active but can be started. Either has not been started or was
/// stopped.
Idle,

/// Block Stream is actively processing blocks.
Running,

/// Block Stream has been intentionally/internally paused due to some condition, i.e. back pressure on
/// the Redis Stream.
Waiting,

/// Block Stream has stalled due to an error or other condition. Must be manually
/// restarted.
Stalled,
}

#[derive(Clone)]
pub struct BlockStreamHealth {
pub processing_state: ProcessingState,
pub last_updated: SystemTime,
}

pub struct BlockStream {
task: Option<Task>,
pub indexer_config: IndexerConfig,
pub chain_id: ChainId,
pub version: u64,
pub redis_stream: String,
health: Arc<Mutex<BlockStreamHealth>>,
}

impl BlockStream {
Expand All @@ -77,23 +106,107 @@ impl BlockStream {
chain_id,
version,
redis_stream,
health: Arc::new(Mutex::new(BlockStreamHealth {
processing_state: ProcessingState::Idle,
last_updated: SystemTime::now(),
})),
}
}

pub fn start(
&mut self,
pub fn health(&self) -> anyhow::Result<BlockStreamHealth> {
match self.health.lock() {
Ok(health) => Ok(health.clone()),
Err(e) => Err(anyhow::anyhow!("Failed to acquire health lock: {:?}", e)),
}
}

fn start_health_monitoring_task(&self, redis: Arc<RedisClient>) -> JoinHandle<()> {
tokio::spawn({
let config = self.indexer_config.clone();
let health = self.health.clone();
let redis_stream = self.redis_stream.clone();

async move {
let mut last_processed_block =
redis.get_last_processed_block(&config).await.unwrap();

loop {
tokio::time::sleep(std::time::Duration::from_secs(15)).await;

let new_last_processed_block =
if let Ok(block) = redis.get_last_processed_block(&config).await {
block
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to fetch last processed block"
);
continue;
};

let stream_size = if let Ok(stream_size) =
redis.get_stream_length(redis_stream.clone()).await
{
stream_size
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to fetch stream size"
);
continue;
};

let mut health_lock = if let Ok(health) = health.lock() {
health
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to acquire health lock"
);
continue;
};

match new_last_processed_block.cmp(&last_processed_block) {
Ordering::Less => {
tracing::error!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Last processed block should not decrease"
);

health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Equal if stream_size >= Some(MAX_STREAM_SIZE) => {
health_lock.processing_state = ProcessingState::Waiting;
}
Ordering::Equal => {
health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Greater => {
health_lock.processing_state = ProcessingState::Running;
}
};

health_lock.last_updated = SystemTime::now();

last_processed_block = new_last_processed_block;
}
}
})
}

fn start_block_stream_task(
&self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis: Arc<RedisClient>,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
lake_s3_client: SharedLakeS3Client,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();

let handle = tokio::spawn({
cancellation_token: tokio_util::sync::CancellationToken,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn({
let cancellation_token = cancellation_token.clone();
let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
Expand Down Expand Up @@ -137,10 +250,35 @@ impl BlockStream {
}
}
}
});
})
}

pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis: Arc<RedisClient>,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
lake_s3_client: SharedLakeS3Client,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();

let monitor_handle = self.start_health_monitoring_task(redis.clone());

let stream_handle = self.start_block_stream_task(
start_block_height,
redis,
reciever_blocks_processor,
lake_s3_client,
cancellation_token.clone(),
);

self.task = Some(Task {
handle,
stream_handle,
monitor_handle,
cancellation_token,
});

Expand All @@ -149,8 +287,9 @@ impl BlockStream {

pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(task) = self.task.take() {
task.monitor_handle.abort();
task.cancellation_token.cancel();
let _ = task.handle.await?;
let _ = task.stream_handle.await?;

// Fails if metric doesn't exist, i.e. task was never polled
let _ = metrics::BLOCK_STREAM_UP
Expand All @@ -167,6 +306,7 @@ impl BlockStream {

#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "block_stream"
skip_all,
fields(
account_id = indexer.account_id.as_str(),
Expand Down
22 changes: 22 additions & 0 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ impl RedisCommandsImpl {
Ok(Self { connection })
}

pub async fn get<T>(&self, key: T) -> Result<Option<u64>, RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("GET: {:?}", key);

redis::cmd("GET")
.arg(key)
.query_async(&mut self.connection.clone())
.await
}

pub async fn xadd<T, U>(&self, stream_key: T, fields: &[(String, U)]) -> Result<(), RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
Expand Down Expand Up @@ -133,6 +145,16 @@ impl RedisClientImpl {
.context("Failed to set last processed block")
}

pub async fn get_last_processed_block(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<u64>> {
self.commands
.get(indexer_config.last_processed_block_key())
.await
.context("Failed to set last processed block")
}

pub async fn get_stream_length(&self, stream: String) -> anyhow::Result<Option<u64>> {
self.commands.xlen(stream).await
}
Expand Down
Loading
Loading