Skip to content

Commit

Permalink
refactor: Spawn dedicated control loops per Indexer (#866)
Browse files Browse the repository at this point in the history
This PR introduces dedicated/self-contained control loops per Indexer,
replacing the single/combined control loop. The motive for this ticket
is described in #811, you can read more about it there.

Overall, there is lots of clean up to be done, but I wanted to get this
out the door as quick as possible as to not block the features required
to build on top of this. I've discussed some of the major concerns
below.

## `LifecycleManager`
These dedicated control loops are managed by the `LifecycleManager`
struct. This is a state machine which progresses the Indexer through
different states depending on the context. The different states and
their transitions are described on the `LifecycleState` enum:
```rust
/// Represents the different lifecycle states of an Indexer
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum LifecycleState {
    /// Pre-requisite resources, i.e. Data Layer, are being created.
    ///
    /// Transitions:
    /// - `Running` on success
    /// - `Repairing` on Data Layer provisioning failure
    #[default]
    Initializing,
    /// Indexer is functional, Block Stream and Executors are continouously monitored to ensure
    /// they are running the latest version of the Indexer.
    ///
    /// Transitions:
    /// - `Stopping` if suspended
    /// - `Running` if Block Stream or Executor fails to synchronise, essentially triggering a
    /// retry
    /// - `Running` on success
    Running,
    /// Indexer is being stopped, Block Stream and Executors are being stopped.
    ///
    /// Transitions:
    /// - `Stopping` on failure, triggering a retry
    /// - `Stopped` on success
    Stopping,
    /// Indexer is stopped, Block Stream and Executors are not running.
    ///
    /// Transitions:
    /// - `Running` if unsuspended
    Stopped,
    /// Indexer is in a bad state, currently requires manual intervention, but should eventually
    /// self heal. This is a dead-end state
    ///
    /// Transitions:
    /// - `Repairing` continuously
    Repairing, // TODO Add `error` to enable reparation
    /// Indexer is being deleted, all resources are being cleaned up
    ///
    /// Transitions:
    /// - `Deleting` on failure, triggering a retry
    /// - `Deleted` on success
    Deleting,
    /// Indexer is deleted, all resources are cleaned up, lifecycle manager will exit
    Deleted,
}
```

The logic of this `struct` is very light, triggering high-level actions
required within each state, and then returning the next desired state.
Most of the "doing" logic has has been encapsulated in the other related
`structs` as discussed below.

The lifecycle state is stored in Redis so that the Indexer can pickup
where it left off. A migration has been added to accommodate this new
field, which replaces the existing `provisioned_state` field.

## `Handler`s
Previously, the "handlers", i.e. `BlockStreamsHandler`, were lightweight
`structs` which wrapped the gRPC client/methods. In this PR, I've moved
all "synchronisation" logic to these structs. So rather than calling the
e.g. `data_layer_handler.start_provisioning_task()` method, we can call
`ensure_provisioned()` which manages all related logic. I feel this has
been encapsulation, and allows the `LifecycleManager` to be light.

I've had to remove `automock`, so we don't have mocked versions for this
right now. Cloning mocked versions is tricky, and requires manual
mocking. Rather than bloat this PR, I've left this out. Eventually, I'll
separate the "sync" logic from the "client" logic, so that the latter
can be easily mocked, and the sync logic covered by unit tests.

Additionally, I've added `get` methods for both Block Streamer and
Executors RPC, as listing is no longer convenient given we are managing
Indexers individually. The getters use `account_id` and `function_name`
as opposed to IDs. I'm considering moving away from IDs as the only way
to get them is via list, which isn't helpful. Right now it's somewhat of
a transitory state.
  • Loading branch information
morgsmccauley committed Jul 18, 2024
1 parent 27ac9a1 commit f2cdc78
Show file tree
Hide file tree
Showing 15 changed files with 992 additions and 1,067 deletions.
11 changes: 11 additions & 0 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ service BlockStreamer {

// Lists all current BlockStream processes
rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse);

// Get info for an existing BlockStream process
rpc GetStream (GetStreamRequest) returns (StreamInfo);
}

// Request message for getting a BlockStream
message GetStreamRequest {
// Account ID which the indexer is defined under
string account_id = 1;
// Name of the indexer
string function_name = 2;
}

// Request message for starting a BlockStream
Expand Down
95 changes: 94 additions & 1 deletion block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,38 @@ impl BlockStreamerService {

#[tonic::async_trait]
impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService {
#[tracing::instrument(skip(self))]
async fn get_stream(
&self,
request: Request<blockstreamer::GetStreamRequest>,
) -> Result<Response<blockstreamer::StreamInfo>, Status> {
let request = request.into_inner();

let lock = self.block_streams.lock().map_err(|err| {
tracing::error!(?err, "Failed to acquire `block_streams` lock");
tonic::Status::internal("Failed to acquire `block_streams` lock")
})?;

let stream_entry = lock.iter().find(|(_, block_stream)| {
block_stream.indexer_config.account_id == request.account_id
&& block_stream.indexer_config.function_name == request.function_name
});

if let Some((stream_id, stream)) = stream_entry {
Ok(Response::new(StreamInfo {
stream_id: stream_id.to_string(),
account_id: stream.indexer_config.account_id.to_string(),
function_name: stream.indexer_config.function_name.to_string(),
version: stream.version,
}))
} else {
Err(Status::not_found(format!(
"Block Stream for account {} and name {} does not exist",
request.account_id, request.function_name
)))
}
}

#[tracing::instrument(skip(self))]
async fn start_stream(
&self,
Expand Down Expand Up @@ -171,7 +203,11 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
&self,
_request: Request<blockstreamer::ListStreamsRequest>,
) -> Result<Response<blockstreamer::ListStreamsResponse>, Status> {
let lock = self.block_streams.lock().unwrap();
let lock = self.block_streams.lock().map_err(|err| {
tracing::error!(?err, "Failed to acquire `block_streams` lock");
tonic::Status::internal("Failed to acquire `block_streams` lock")
})?;

let block_streams: Vec<StreamInfo> = lock
.values()
.map(|block_stream| StreamInfo {
Expand Down Expand Up @@ -234,6 +270,63 @@ mod tests {
)
}

#[tokio::test]
async fn get_existing_block_stream() {
let block_streamer_service = create_block_streamer_service();

{
let lock = block_streamer_service.get_block_streams_lock().unwrap();
assert_eq!(lock.len(), 0);
}

block_streamer_service
.start_stream(Request::new(StartStreamRequest {
start_block_height: 0,
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
version: 0,
redis_stream: "stream".to_string(),
rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: 1,
})),
}))
.await
.unwrap();

let stream = block_streamer_service
.get_stream(Request::new(GetStreamRequest {
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
}))
.await
.unwrap();

assert_eq!(
stream.into_inner().stream_id,
"16210176318434468568".to_string()
);
}

#[tokio::test]
async fn get_non_existant_block_stream() {
let block_streamer_service = create_block_streamer_service();

{
let lock = block_streamer_service.get_block_streams_lock().unwrap();
assert_eq!(lock.len(), 0);
}

let stream_response = block_streamer_service
.get_stream(Request::new(GetStreamRequest {
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
}))
.await;

assert_eq!(stream_response.err().unwrap().code(), tonic::Code::NotFound);
}

#[tokio::test]
async fn starts_a_block_stream() {
let block_streamer_service = create_block_streamer_service();
Expand Down
176 changes: 164 additions & 12 deletions coordinator/src/handlers/block_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,36 @@ pub use block_streamer::StreamInfo;
use anyhow::Context;
use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::{
start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, ListStreamsRequest,
StartStreamRequest, Status, StopStreamRequest,
start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest,
ListStreamsRequest, StartStreamRequest, Status, StopStreamRequest,
};
use near_primitives::types::AccountId;
use registry_types::StartBlock;
use tonic::transport::channel::Channel;
use tonic::Request;

use crate::indexer_config::IndexerConfig;
use crate::redis::KeyProvider;
use crate::redis::{KeyProvider, RedisClient};
use crate::utils::exponential_retry;

#[cfg(not(test))]
pub use BlockStreamsHandlerImpl as BlockStreamsHandler;
#[cfg(test)]
pub use MockBlockStreamsHandlerImpl as BlockStreamsHandler;

pub struct BlockStreamsHandlerImpl {
#[derive(Clone)]
pub struct BlockStreamsHandler {
client: BlockStreamerClient<Channel>,
redis_client: RedisClient,
}

#[cfg_attr(test, mockall::automock)]
impl BlockStreamsHandlerImpl {
pub fn connect(block_streamer_url: &str) -> anyhow::Result<Self> {
impl BlockStreamsHandler {
pub fn connect(block_streamer_url: &str, redis_client: RedisClient) -> anyhow::Result<Self> {
let channel = Channel::from_shared(block_streamer_url.to_string())
.context("Block Streamer URL is invalid")?
.connect_lazy();
let client = BlockStreamerClient::new(channel);

Ok(Self { client })
Ok(Self {
client,
redis_client,
})
}

pub async fn list(&self) -> anyhow::Result<Vec<StreamInfo>> {
Expand Down Expand Up @@ -79,6 +81,26 @@ impl BlockStreamsHandlerImpl {
.into()
}

pub async fn get(
&self,
account_id: AccountId,
function_name: String,
) -> anyhow::Result<Option<StreamInfo>> {
let request = GetStreamRequest {
account_id: account_id.to_string(),
function_name: function_name.clone(),
};

match self.client.clone().get_stream(Request::new(request)).await {
Ok(response) => Ok(Some(response.into_inner())),
Err(status) if status.code() == tonic::Code::NotFound => Ok(None),
Err(err) => Err(err).context(format!(
"Failed to get stream for account {} and name {}",
account_id, function_name
)),
}
}

pub async fn start(
&self,
start_block_height: u64,
Expand Down Expand Up @@ -139,4 +161,134 @@ impl BlockStreamsHandlerImpl {

Ok(())
}

async fn reconfigure_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> {
if matches!(
config.start_block,
StartBlock::Latest | StartBlock::Height(..)
) {
self.redis_client.clear_block_stream(config).await?;
}

let height = match config.start_block {
StartBlock::Latest => config.get_registry_version(),
StartBlock::Height(height) => height,
StartBlock::Continue => self.get_continuation_block_height(config).await?,
};

tracing::info!(
start_block = ?config.start_block,
height,
"Starting block stream"
);

self.start(height, config).await?;

Ok(())
}

async fn start_new_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> {
let height = match config.start_block {
StartBlock::Height(height) => height,
StartBlock::Latest => config.get_registry_version(),
StartBlock::Continue => {
tracing::warn!(
"Attempted to start new Block Stream with CONTINUE, using LATEST instead"
);
config.get_registry_version()
}
};

tracing::info!(
start_block = ?config.start_block,
height,
"Starting block stream"
);

self.start(height, config).await
}

async fn get_continuation_block_height(&self, config: &IndexerConfig) -> anyhow::Result<u64> {
let height = self
.redis_client
.get_last_published_block(config)
.await?
.map(|height| height + 1)
.unwrap_or_else(|| {
tracing::warn!(
"Failed to get continuation block height, using registry version instead"
);

config.get_registry_version()
});

Ok(height)
}

async fn resume_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> {
let height = self.get_continuation_block_height(config).await?;

tracing::info!(height, "Resuming block stream");

self.start(height, config).await?;

Ok(())
}

pub async fn synchronise_block_stream(
&self,
config: &IndexerConfig,
previous_sync_version: Option<u64>,
) -> anyhow::Result<()> {
let block_stream = self
.get(config.account_id.clone(), config.function_name.clone())
.await?;

if let Some(block_stream) = block_stream {
if block_stream.version == config.get_registry_version() {
return Ok(());
}

tracing::info!(
previous_version = block_stream.version,
"Stopping outdated block stream"
);

self.stop(block_stream.stream_id.clone()).await?;

self.reconfigure_block_stream(config).await?;

return Ok(());
}

if previous_sync_version.is_none() {
self.start_new_block_stream(config).await?;

return Ok(());
}

if previous_sync_version.unwrap() != config.get_registry_version() {
self.reconfigure_block_stream(config).await?;

return Ok(());
}

self.resume_block_stream(config).await?;

Ok(())
}

pub async fn stop_if_needed(
&self,
account_id: AccountId,
function_name: String,
) -> anyhow::Result<()> {
if let Some(block_stream) = self.get(account_id, function_name).await? {
tracing::info!("Stopping block stream");

self.stop(block_stream.stream_id).await?;
}

Ok(())
}
}
Loading

0 comments on commit f2cdc78

Please sign in to comment.