Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Expose endpoint to control streams #430

Merged
merged 17 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 297 additions & 1 deletion block-streamer/Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ borsh = "0.10.2"
chrono = "0.4.25"
futures = "0.3.5"
mockall = "0.11.4"
prost = "0.12.3"
redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.55"
Expand All @@ -23,6 +24,10 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tokio = { version = "1.28.0", features = ["rt-multi-thread"]}
tokio-util = "0.7.10"
tokio-stream = "0.1.14"
tonic = "0.10.2"
wildmatch = "2.1.1"

near-lake-framework = "0.7.1"

[build-dependencies]
tonic-build = "0.10"
5 changes: 5 additions & 0 deletions block-streamer/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generates the Rust types from the proto file on cargo build

tonic_build::compile_protos("proto/block_streamer.proto")?;

Ok(())
}
17 changes: 17 additions & 0 deletions block-streamer/examples/list_streams.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use tonic::Request;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These examples use the exposed Rust types to interact with the gRPC service - these not only serve as examples, but also make it easy to quickly debug by running them


use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::{start_stream_request::Rule, ActionAnyRule, ListStreamsRequest, Status};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to create a client for each request? Can we just use one client instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a cargo example, a standalone binary which can be executed via cargo --example list_streams. I created it so we can easily send requests to the server, rather than creating verbose cli commands.


let response = client
.list_streams(Request::new(ListStreamsRequest {}))
.await?;

println!("RESPONSE = {:#?}", response);

Ok(())
}
25 changes: 25 additions & 0 deletions block-streamer/examples/start_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use tonic::Request;

use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::{start_stream_request::Rule, ActionAnyRule, StartStreamRequest, Status};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?;

let response = client
.start_stream(Request::new(StartStreamRequest {
start_block_height: 106700000,
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
rule: Some(Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "social.near".to_string(),
status: Status::Success.into(),
})),
}))
.await?;

println!("RESPONSE = {:?}", response);

Ok(())
}
20 changes: 20 additions & 0 deletions block-streamer/examples/stop_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use tonic::Request;

use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::StopStreamRequest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?;

let response = client
.stop_stream(Request::new(StopStreamRequest {
// ID for indexer morgs.near/test
stream_id: "16210176318434468568".to_string(),
}))
.await?;

println!("RESPONSE = {:?}", response);

Ok(())
}
93 changes: 93 additions & 0 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
syntax = "proto3";

package blockstreamer;

// The BlockStreamer service provides RPCs to manage BlockStream instances.
service BlockStreamer {
// Starts a new BlockStream process.
rpc StartStream (StartStreamRequest) returns (StartStreamResponse);

// Stops an existing BlockStream process.
rpc StopStream (StopStreamRequest) returns (StopStreamResponse);

// Lists all current BlockStream processes.
rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse);
}

// Request message for starting a BlockStream.
message StartStreamRequest {
// Which block height to start from.
uint64 start_block_height = 1;
// The account ID which the indexer is defined under
string account_id = 2;
// The name of the indexer
string function_name = 3;
// The filter rule to apply to incoming blocks
oneof rule {
ActionAnyRule action_any_rule = 4;
ActionFunctionCallRule action_function_call_rule = 5;
}
}

// Match any action against the specified account
message ActionAnyRule {
// The account ID pattern to match against
string affected_account_id = 1;
// The status of the action to match against
Status status = 2;
}

// Match a specific function call against the specified account
message ActionFunctionCallRule {
// The account ID pattern to match against
string affected_account_id = 1;
// The function name to match against
string function_name = 2;
// The status of the action to match against
Status status = 3;
}

enum Status {
STATUS_UNSPECIFIED = 0;
STATUS_SUCCESS = 1;
STATUS_FAILURE = 2;
STATUS_ANY = 3;
}

// Response message for starting a BlockStream.
message StartStreamResponse {
// ID or handle of the started BlockStream.
string stream_id = 1;
}

// Request message for stopping a BlockStream.
message StopStreamRequest {
// ID or handle of the BlockStream to stop.
string stream_id = 1;
}

// Response message for stopping a BlockStream.
message StopStreamResponse {
// Confirmation message or status.
string status = 1;
}

// Request message for listing BlockStreams.
message ListStreamsRequest {
// Optional filters or parameters for listing streams.
}

// Response message for listing BlockStreams.
message ListStreamsResponse {
// List of active BlockStreams.
repeated StreamInfo streams = 1;
}

// Information about a single BlockStream instance.
message StreamInfo {
string stream_id = 1;
int64 start_block_height = 2;
string indexer_name = 3;
string chain_id = 4;
string status = 5;
}
25 changes: 16 additions & 9 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,24 @@ pub struct Task {

pub struct BlockStream {
task: Option<Task>,
pub indexer_config: IndexerConfig,
pub chain_id: ChainId,
}

impl BlockStream {
pub fn new() -> Self {
Self { task: None }
pub fn new(indexer_config: IndexerConfig, chain_id: ChainId) -> Self {
Self {
task: None,
indexer_config,
chain_id,
}
}

pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: IndexerConfig,
redis_connection_manager: crate::redis::ConnectionManager,
delta_lake_client: crate::delta_lake_client::DeltaLakeClient<crate::s3_client::S3Client>,
chain_id: ChainId,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
Expand All @@ -34,27 +38,30 @@ impl BlockStream {
let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();

let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
tracing::info!(
"Cancelling existing block stream task for indexer: {}",
indexer.get_full_name(),
"Cancelling block stream task for indexer: {}",
indexer_config.get_full_name(),
);

Ok(())
},
result = start_block_stream(
start_block_height,
indexer.clone(),
&indexer_config,
&redis_connection_manager,
&delta_lake_client,
&chain_id,
) => {
result.map_err(|err| {
tracing::error!(
"Block stream task for indexer: {} stopped due to error: {:?}",
indexer.get_full_name(),
indexer_config.get_full_name(),
err,
);
err
Expand Down Expand Up @@ -91,7 +98,7 @@ impl BlockStream {

pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: IndexerConfig,
indexer: &IndexerConfig,
redis_connection_manager: &crate::redis::ConnectionManager,
delta_lake_client: &crate::delta_lake_client::DeltaLakeClient<crate::s3_client::S3Client>,
chain_id: &ChainId,
Expand Down
3 changes: 2 additions & 1 deletion block-streamer/src/delta_lake_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct IndexFile {
pub actions: Vec<IndexFileAction>,
}

#[derive(Clone)]
pub struct DeltaLakeClient<T>
where
T: crate::s3_client::S3ClientTrait,
Expand All @@ -46,7 +47,7 @@ where
pub fn new(s3_client: T) -> Self {
DeltaLakeClient {
s3_client,
// hardcode to mainnet for
// hardcode to mainnet for now
chain_id: ChainId::Mainnet,
}
}
Expand Down
19 changes: 14 additions & 5 deletions block-streamer/src/indexer_config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::rules::IndexerRule;
use near_lake_framework::near_indexer_primitives::types::AccountId;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

use crate::rules::IndexerRule;

#[derive(
borsh::BorshSerialize,
Expand All @@ -12,15 +15,21 @@ use near_lake_framework::near_indexer_primitives::types::AccountId;
pub struct IndexerConfig {
pub account_id: AccountId,
pub function_name: String,
pub code: String,
pub start_block_height: Option<u64>,
pub schema: Option<String>,
pub provisioned: bool,
// pub code: String,
// pub start_block_height: Option<u64>,
// pub schema: Option<String>,
// pub provisioned: bool,
Comment on lines +18 to +21
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer used/needed - this is more for the Runner side of things which will be passed from the Control Plane directly

pub indexer_rule: IndexerRule,
}

impl IndexerConfig {
pub fn get_full_name(&self) -> String {
format!("{}/{}", self.account_id, self.function_name)
}

pub fn get_hash_id(&self) -> String {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consistent/deterministic ID to use for the BlockStream so it can be persisted across restarts

let mut hasher = DefaultHasher::new();
self.get_full_name().hash(&mut hasher);
hasher.finish().to_string()
}
}
5 changes: 5 additions & 0 deletions block-streamer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod blockstreamer {
tonic::include_proto!("blockstreamer");
}

pub use blockstreamer::*;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposes the Rust Client types

44 changes: 3 additions & 41 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
use tracing_subscriber::prelude::*;

use crate::indexer_config::IndexerConfig;
use crate::rules::types::indexer_rule_match::ChainId;
use crate::rules::{IndexerRule, IndexerRuleKind, MatchingRule, Status};

mod block_stream;
mod delta_lake_client;
mod indexer_config;
mod redis;
mod rules;
mod s3_client;

pub(crate) const LOG_TARGET: &str = "block_streamer";
mod server;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -20,7 +15,7 @@ async fn main() -> anyhow::Result<()> {
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();

tracing::info!("Starting {}", crate::LOG_TARGET);
tracing::info!("Starting Block Streamer Service...");

let redis_connection_manager = redis::connect("redis://127.0.0.1").await?;

Expand All @@ -29,40 +24,7 @@ async fn main() -> anyhow::Result<()> {

let delta_lake_client = crate::delta_lake_client::DeltaLakeClient::new(s3_client);

let contract = "queryapi.dataplatform.near";
let matching_rule = MatchingRule::ActionAny {
affected_account_id: contract.to_string(),
status: Status::Any,
};
let filter_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule,
id: None,
name: None,
};
let indexer = IndexerConfig {
account_id: "buildnear.testnet".to_string().parse().unwrap(),
function_name: "index_stuff".to_string(),
code: "".to_string(),
start_block_height: Some(85376002),
schema: None,
provisioned: false,
indexer_rule: filter_rule,
};

let mut streamer = block_stream::BlockStream::new();

streamer.start(
106000000,
indexer,
redis_connection_manager,
delta_lake_client,
ChainId::Mainnet,
)?;

streamer.take_handle().unwrap().await??;

println!("done");
server::init(redis_connection_manager, delta_lake_client).await?;

Ok(())
}
Loading