Skip to content

Commit

Permalink
feat: Expose endpoint to control streams (#430)
Browse files Browse the repository at this point in the history
This PR exposes a gRPC endpoint for the Block Streamer service to
provide control over individual Block Streams. There are currently 3
methods: start/stop/list. The functionality across these methods is
quite basic, I didn't want to spend too much time fleshing them out as
I'm still not certain on how exactly they will be used. I expect them to
Change once the Control Plane starts using them.
  • Loading branch information
morgsmccauley authored Dec 14, 2023
1 parent 1303013 commit 9c5ee15
Show file tree
Hide file tree
Showing 14 changed files with 721 additions and 57 deletions.
298 changes: 297 additions & 1 deletion block-streamer/Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ borsh = "0.10.2"
chrono = "0.4.25"
futures = "0.3.5"
mockall = "0.11.4"
prost = "0.12.3"
redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.55"
Expand All @@ -23,6 +24,10 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tokio = { version = "1.28.0", features = ["rt-multi-thread"]}
tokio-util = "0.7.10"
tokio-stream = "0.1.14"
tonic = "0.10.2"
wildmatch = "2.1.1"

near-lake-framework = "0.7.1"

[build-dependencies]
tonic-build = "0.10"
5 changes: 5 additions & 0 deletions block-streamer/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/block_streamer.proto")?;

Ok(())
}
17 changes: 17 additions & 0 deletions block-streamer/examples/list_streams.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use tonic::Request;

use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::{start_stream_request::Rule, ActionAnyRule, ListStreamsRequest, Status};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?;

let response = client
.list_streams(Request::new(ListStreamsRequest {}))
.await?;

println!("RESPONSE = {:#?}", response);

Ok(())
}
25 changes: 25 additions & 0 deletions block-streamer/examples/start_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use tonic::Request;

use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::{start_stream_request::Rule, ActionAnyRule, StartStreamRequest, Status};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?;

let response = client
.start_stream(Request::new(StartStreamRequest {
start_block_height: 106700000,
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
rule: Some(Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "social.near".to_string(),
status: Status::Success.into(),
})),
}))
.await?;

println!("RESPONSE = {:?}", response);

Ok(())
}
20 changes: 20 additions & 0 deletions block-streamer/examples/stop_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use tonic::Request;

use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::StopStreamRequest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?;

let response = client
.stop_stream(Request::new(StopStreamRequest {
// ID for indexer morgs.near/test
stream_id: "16210176318434468568".to_string(),
}))
.await?;

println!("RESPONSE = {:?}", response);

Ok(())
}
93 changes: 93 additions & 0 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
syntax = "proto3";

package blockstreamer;

// The BlockStreamer service provides RPCs to manage BlockStream instances.
service BlockStreamer {
// Starts a new BlockStream process.
rpc StartStream (StartStreamRequest) returns (StartStreamResponse);

// Stops an existing BlockStream process.
rpc StopStream (StopStreamRequest) returns (StopStreamResponse);

// Lists all current BlockStream processes.
rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse);
}

// Request message for starting a BlockStream.
message StartStreamRequest {
// Which block height to start from.
uint64 start_block_height = 1;
// The account ID which the indexer is defined under
string account_id = 2;
// The name of the indexer
string function_name = 3;
// The filter rule to apply to incoming blocks
oneof rule {
ActionAnyRule action_any_rule = 4;
ActionFunctionCallRule action_function_call_rule = 5;
}
}

// Match any action against the specified account
message ActionAnyRule {
// The account ID pattern to match against
string affected_account_id = 1;
// The status of the action to match against
Status status = 2;
}

// Match a specific function call against the specified account
message ActionFunctionCallRule {
// The account ID pattern to match against
string affected_account_id = 1;
// The function name to match against
string function_name = 2;
// The status of the action to match against
Status status = 3;
}

enum Status {
STATUS_UNSPECIFIED = 0;
STATUS_SUCCESS = 1;
STATUS_FAILURE = 2;
STATUS_ANY = 3;
}

// Response message for starting a BlockStream.
message StartStreamResponse {
// ID or handle of the started BlockStream.
string stream_id = 1;
}

// Request message for stopping a BlockStream.
message StopStreamRequest {
// ID or handle of the BlockStream to stop.
string stream_id = 1;
}

// Response message for stopping a BlockStream.
message StopStreamResponse {
// Confirmation message or status.
string status = 1;
}

// Request message for listing BlockStreams.
message ListStreamsRequest {
// Optional filters or parameters for listing streams.
}

// Response message for listing BlockStreams.
message ListStreamsResponse {
// List of active BlockStreams.
repeated StreamInfo streams = 1;
}

// Information about a single BlockStream instance.
message StreamInfo {
string stream_id = 1;
int64 start_block_height = 2;
string indexer_name = 3;
string chain_id = 4;
string status = 5;
}
25 changes: 16 additions & 9 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,24 @@ pub struct Task {

pub struct BlockStream {
task: Option<Task>,
pub indexer_config: IndexerConfig,
pub chain_id: ChainId,
}

impl BlockStream {
pub fn new() -> Self {
Self { task: None }
pub fn new(indexer_config: IndexerConfig, chain_id: ChainId) -> Self {
Self {
task: None,
indexer_config,
chain_id,
}
}

pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: IndexerConfig,
redis_connection_manager: crate::redis::ConnectionManager,
delta_lake_client: crate::delta_lake_client::DeltaLakeClient<crate::s3_client::S3Client>,
chain_id: ChainId,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
Expand All @@ -34,27 +38,30 @@ impl BlockStream {
let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();

let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
tracing::info!(
"Cancelling existing block stream task for indexer: {}",
indexer.get_full_name(),
"Cancelling block stream task for indexer: {}",
indexer_config.get_full_name(),
);

Ok(())
},
result = start_block_stream(
start_block_height,
indexer.clone(),
&indexer_config,
&redis_connection_manager,
&delta_lake_client,
&chain_id,
) => {
result.map_err(|err| {
tracing::error!(
"Block stream task for indexer: {} stopped due to error: {:?}",
indexer.get_full_name(),
indexer_config.get_full_name(),
err,
);
err
Expand Down Expand Up @@ -91,7 +98,7 @@ impl BlockStream {

pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: IndexerConfig,
indexer: &IndexerConfig,
redis_connection_manager: &crate::redis::ConnectionManager,
delta_lake_client: &crate::delta_lake_client::DeltaLakeClient<crate::s3_client::S3Client>,
chain_id: &ChainId,
Expand Down
3 changes: 2 additions & 1 deletion block-streamer/src/delta_lake_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct IndexFile {
pub actions: Vec<IndexFileAction>,
}

#[derive(Clone)]
pub struct DeltaLakeClient<T>
where
T: crate::s3_client::S3ClientTrait,
Expand All @@ -46,7 +47,7 @@ where
pub fn new(s3_client: T) -> Self {
DeltaLakeClient {
s3_client,
// hardcode to mainnet for
// hardcode to mainnet for now
chain_id: ChainId::Mainnet,
}
}
Expand Down
19 changes: 14 additions & 5 deletions block-streamer/src/indexer_config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::rules::IndexerRule;
use near_lake_framework::near_indexer_primitives::types::AccountId;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

use crate::rules::IndexerRule;

#[derive(
borsh::BorshSerialize,
Expand All @@ -12,15 +15,21 @@ use near_lake_framework::near_indexer_primitives::types::AccountId;
pub struct IndexerConfig {
pub account_id: AccountId,
pub function_name: String,
pub code: String,
pub start_block_height: Option<u64>,
pub schema: Option<String>,
pub provisioned: bool,
// pub code: String,
// pub start_block_height: Option<u64>,
// pub schema: Option<String>,
// pub provisioned: bool,
pub indexer_rule: IndexerRule,
}

impl IndexerConfig {
pub fn get_full_name(&self) -> String {
format!("{}/{}", self.account_id, self.function_name)
}

pub fn get_hash_id(&self) -> String {
let mut hasher = DefaultHasher::new();
self.get_full_name().hash(&mut hasher);
hasher.finish().to_string()
}
}
5 changes: 5 additions & 0 deletions block-streamer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod blockstreamer {
tonic::include_proto!("blockstreamer");
}

pub use blockstreamer::*;
44 changes: 3 additions & 41 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
use tracing_subscriber::prelude::*;

use crate::indexer_config::IndexerConfig;
use crate::rules::types::indexer_rule_match::ChainId;
use crate::rules::{IndexerRule, IndexerRuleKind, MatchingRule, Status};

mod block_stream;
mod delta_lake_client;
mod indexer_config;
mod redis;
mod rules;
mod s3_client;

pub(crate) const LOG_TARGET: &str = "block_streamer";
mod server;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -20,7 +15,7 @@ async fn main() -> anyhow::Result<()> {
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();

tracing::info!("Starting {}", crate::LOG_TARGET);
tracing::info!("Starting Block Streamer Service...");

let redis_connection_manager = redis::connect("redis://127.0.0.1").await?;

Expand All @@ -29,40 +24,7 @@ async fn main() -> anyhow::Result<()> {

let delta_lake_client = crate::delta_lake_client::DeltaLakeClient::new(s3_client);

let contract = "queryapi.dataplatform.near";
let matching_rule = MatchingRule::ActionAny {
affected_account_id: contract.to_string(),
status: Status::Any,
};
let filter_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule,
id: None,
name: None,
};
let indexer = IndexerConfig {
account_id: "buildnear.testnet".to_string().parse().unwrap(),
function_name: "index_stuff".to_string(),
code: "".to_string(),
start_block_height: Some(85376002),
schema: None,
provisioned: false,
indexer_rule: filter_rule,
};

let mut streamer = block_stream::BlockStream::new();

streamer.start(
106000000,
indexer,
redis_connection_manager,
delta_lake_client,
ChainId::Mainnet,
)?;

streamer.take_handle().unwrap().await??;

println!("done");
server::init(redis_connection_manager, delta_lake_client).await?;

Ok(())
}
Loading

0 comments on commit 9c5ee15

Please sign in to comment.