Skip to content

Commit

Permalink
Merge pull request #897 from near/main
Browse files Browse the repository at this point in the history
Prod Release 19/07/24
  • Loading branch information
morgsmccauley authored Jul 18, 2024
2 parents 8ff9f30 + 0177c26 commit 35fef33
Show file tree
Hide file tree
Showing 29 changed files with 2,064 additions and 1,162 deletions.
33 changes: 33 additions & 0 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ service BlockStreamer {

// Lists all current BlockStream processes
rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse);

// Get info for an existing BlockStream process
rpc GetStream (GetStreamRequest) returns (StreamInfo);
}

// Request message for getting a BlockStream
message GetStreamRequest {
// Account ID which the indexer is defined under
string account_id = 1;
// Name of the indexer
string function_name = 2;
}

// Request message for starting a BlockStream
Expand Down Expand Up @@ -97,4 +108,26 @@ message StreamInfo {
string function_name = 4;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 5;
// Contains health information for the Block Stream
Health health = 6;
}

// Contains health information for the Block Stream
message Health {
// The processing state of the block stream
ProcessingState processing_state = 1;
// When the health info was last updated
uint64 updated_at_timestamp_secs = 2;
}

enum ProcessingState {
UNSPECIFIED = 0;
// Not started, or has been stopped
IDLE = 1;
// Running as expected
RUNNING = 2;
// Waiting for some internal condition to be met before continuing
WAITING = 3;
// Stopped due to some unknown error
STALLED = 4;
}
170 changes: 155 additions & 15 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::cmp::Ordering;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::time::SystemTime;

use anyhow::Context;
use futures::StreamExt;
Expand Down Expand Up @@ -52,16 +54,43 @@ impl<F: Future> Future for PollCounter<F> {
}

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
stream_handle: JoinHandle<anyhow::Result<()>>,
monitor_handle: JoinHandle<()>,
cancellation_token: tokio_util::sync::CancellationToken,
}

/// Represents the processing state of a block stream
#[derive(Clone)]
pub enum ProcessingState {
/// Block Stream is not currently active but can be started. Either has not been started or was
/// stopped.
Idle,

/// Block Stream is actively processing blocks.
Running,

/// Block Stream has been intentionally/internally paused due to some condition, i.e. back pressure on
/// the Redis Stream.
Waiting,

/// Block Stream has stalled due to an error or other condition. Must be manually
/// restarted.
Stalled,
}

#[derive(Clone)]
pub struct BlockStreamHealth {
pub processing_state: ProcessingState,
pub last_updated: SystemTime,
}

pub struct BlockStream {
task: Option<Task>,
pub indexer_config: IndexerConfig,
pub chain_id: ChainId,
pub version: u64,
pub redis_stream: String,
health: Arc<Mutex<BlockStreamHealth>>,
}

impl BlockStream {
Expand All @@ -77,23 +106,107 @@ impl BlockStream {
chain_id,
version,
redis_stream,
health: Arc::new(Mutex::new(BlockStreamHealth {
processing_state: ProcessingState::Idle,
last_updated: SystemTime::now(),
})),
}
}

pub fn start(
&mut self,
pub fn health(&self) -> anyhow::Result<BlockStreamHealth> {
match self.health.lock() {
Ok(health) => Ok(health.clone()),
Err(e) => Err(anyhow::anyhow!("Failed to acquire health lock: {:?}", e)),
}
}

fn start_health_monitoring_task(&self, redis: Arc<RedisClient>) -> JoinHandle<()> {
tokio::spawn({
let config = self.indexer_config.clone();
let health = self.health.clone();
let redis_stream = self.redis_stream.clone();

async move {
let mut last_processed_block =
redis.get_last_processed_block(&config).await.unwrap();

loop {
tokio::time::sleep(std::time::Duration::from_secs(15)).await;

let new_last_processed_block =
if let Ok(block) = redis.get_last_processed_block(&config).await {
block
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to fetch last processed block"
);
continue;
};

let stream_size = if let Ok(stream_size) =
redis.get_stream_length(redis_stream.clone()).await
{
stream_size
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to fetch stream size"
);
continue;
};

let mut health_lock = if let Ok(health) = health.lock() {
health
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to acquire health lock"
);
continue;
};

match new_last_processed_block.cmp(&last_processed_block) {
Ordering::Less => {
tracing::error!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Last processed block should not decrease"
);

health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Equal if stream_size >= Some(MAX_STREAM_SIZE) => {
health_lock.processing_state = ProcessingState::Waiting;
}
Ordering::Equal => {
health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Greater => {
health_lock.processing_state = ProcessingState::Running;
}
};

health_lock.last_updated = SystemTime::now();

last_processed_block = new_last_processed_block;
}
}
})
}

fn start_block_stream_task(
&self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis: Arc<RedisClient>,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
lake_s3_client: SharedLakeS3Client,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();

let handle = tokio::spawn({
cancellation_token: tokio_util::sync::CancellationToken,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn({
let cancellation_token = cancellation_token.clone();
let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
Expand Down Expand Up @@ -137,10 +250,35 @@ impl BlockStream {
}
}
}
});
})
}

pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis: Arc<RedisClient>,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
lake_s3_client: SharedLakeS3Client,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();

let monitor_handle = self.start_health_monitoring_task(redis.clone());

let stream_handle = self.start_block_stream_task(
start_block_height,
redis,
reciever_blocks_processor,
lake_s3_client,
cancellation_token.clone(),
);

self.task = Some(Task {
handle,
stream_handle,
monitor_handle,
cancellation_token,
});

Expand All @@ -149,8 +287,9 @@ impl BlockStream {

pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(task) = self.task.take() {
task.monitor_handle.abort();
task.cancellation_token.cancel();
let _ = task.handle.await?;
let _ = task.stream_handle.await?;

// Fails if metric doesn't exist, i.e. task was never polled
let _ = metrics::BLOCK_STREAM_UP
Expand All @@ -167,6 +306,7 @@ impl BlockStream {

#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "block_stream"
skip_all,
fields(
account_id = indexer.account_id.as_str(),
Expand Down
22 changes: 22 additions & 0 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ impl RedisCommandsImpl {
Ok(Self { connection })
}

pub async fn get<T>(&self, key: T) -> Result<Option<u64>, RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("GET: {:?}", key);

redis::cmd("GET")
.arg(key)
.query_async(&mut self.connection.clone())
.await
}

pub async fn xadd<T, U>(&self, stream_key: T, fields: &[(String, U)]) -> Result<(), RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
Expand Down Expand Up @@ -133,6 +145,16 @@ impl RedisClientImpl {
.context("Failed to set last processed block")
}

pub async fn get_last_processed_block(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<u64>> {
self.commands
.get(indexer_config.last_processed_block_key())
.await
.context("Failed to set last processed block")
}

pub async fn get_stream_length(&self, stream: String) -> anyhow::Result<Option<u64>> {
self.commands.xlen(stream).await
}
Expand Down
Loading

0 comments on commit 35fef33

Please sign in to comment.