Skip to content

Commit

Permalink
feat: Add version to block streams
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Dec 18, 2023
1 parent cba8500 commit 89f966e
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 39 deletions.
1 change: 1 addition & 0 deletions block-streamer/examples/start_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
start_block_height: 106700000,
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
version: 0,
rule: Some(Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "social.near".to_string(),
status: Status::Success.into(),
Expand Down
67 changes: 36 additions & 31 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,50 @@ syntax = "proto3";

package blockstreamer;

// The BlockStreamer service provides RPCs to manage BlockStream instances.
// The BlockStreamer service provides RPCs to manage BlockStream instances
service BlockStreamer {
// Starts a new BlockStream process.
// Starts a new BlockStream process
rpc StartStream (StartStreamRequest) returns (StartStreamResponse);

// Stops an existing BlockStream process.
// Stops an existing BlockStream process
rpc StopStream (StopStreamRequest) returns (StopStreamResponse);

// Lists all current BlockStream processes.
// Lists all current BlockStream processes
rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse);
}

// Request message for starting a BlockStream.
// Request message for starting a BlockStream
message StartStreamRequest {
// Which block height to start from.
// Which block height to start from
uint64 start_block_height = 1;
// The account ID which the indexer is defined under
// Account ID which the indexer is defined under
string account_id = 2;
// The name of the indexer
// Name of the indexer
string function_name = 3;
// The filter rule to apply to incoming blocks
// Block height corresponding to the created/updated height of the indexer
uint64 version = 4;
// Filter rule to apply to incoming blocks
oneof rule {
ActionAnyRule action_any_rule = 4;
ActionFunctionCallRule action_function_call_rule = 5;
ActionAnyRule action_any_rule = 5;
ActionFunctionCallRule action_function_call_rule = 6;
}
}

// Match any action against the specified account
message ActionAnyRule {
// The account ID pattern to match against
// Account ID pattern to match against
string affected_account_id = 1;
// The status of the action to match against
// Status of the action to match against
Status status = 2;
}

// Match a specific function call against the specified account
message ActionFunctionCallRule {
// The account ID pattern to match against
// Account ID pattern to match against
string affected_account_id = 1;
// The function name to match against
// Function name to match against
string function_name = 2;
// The status of the action to match against
// Status of the action to match against
Status status = 3;
}

Expand All @@ -54,40 +56,43 @@ enum Status {
STATUS_ANY = 3;
}

// Response message for starting a BlockStream.
// Response message for starting a BlockStream
message StartStreamResponse {
// ID or handle of the started BlockStream.
// ID or handle of the started BlockStream
string stream_id = 1;
}

// Request message for stopping a BlockStream.
// Request message for stopping a BlockStream
message StopStreamRequest {
// ID or handle of the BlockStream to stop.
// ID or handle of the BlockStream to stop
string stream_id = 1;
}

// Response message for stopping a BlockStream.
// Response message for stopping a BlockStream
message StopStreamResponse {
// Confirmation message or status.
// Confirmation message or status
string status = 1;
}

// Request message for listing BlockStreams.
// Request message for listing BlockStreams
message ListStreamsRequest {
// Optional filters or parameters for listing streams.
// Optional filters or parameters for listing streams
}

// Response message for listing BlockStreams.
// Response message for listing BlockStreams
message ListStreamsResponse {
// List of active BlockStreams.
// List of active BlockStreams
repeated StreamInfo streams = 1;
}

// Information about a single BlockStream instance.
// Information about a single BlockStream instance
message StreamInfo {
// ID or handle of the BlockStream
string stream_id = 1;
int64 start_block_height = 2;
string indexer_name = 3;
string chain_id = 4;
string status = 5;
// Account ID of the indexer
string account_id = 3;
// Function name of the indexer
string function_name = 4;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 5;
}
4 changes: 3 additions & 1 deletion block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ pub struct BlockStream {
task: Option<Task>,
pub indexer_config: IndexerConfig,
pub chain_id: ChainId,
pub version: u64,
}

impl BlockStream {
pub fn new(indexer_config: IndexerConfig, chain_id: ChainId) -> Self {
pub fn new(indexer_config: IndexerConfig, chain_id: ChainId, version: u64) -> Self {
Self {
task: None,
indexer_config,
chain_id,
version,
}
}

Expand Down
17 changes: 10 additions & 7 deletions block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,11 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
None => drop(lock),
}

let mut block_stream =
block_stream::BlockStream::new(indexer_config.clone(), self.chain_id.clone());
let mut block_stream = block_stream::BlockStream::new(
indexer_config.clone(),
self.chain_id.clone(),
request.version,
);

block_stream
.start(
Expand Down Expand Up @@ -169,11 +172,9 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
.values()
.map(|block_stream| StreamInfo {
stream_id: block_stream.indexer_config.get_hash_id(),
chain_id: self.chain_id.to_string(),
indexer_name: block_stream.indexer_config.get_full_name(),
start_block_height: 0,
status: "OK".to_string(),
// last_indexed_block
account_id: block_stream.indexer_config.account_id.to_string(),
function_name: block_stream.indexer_config.function_name.clone(),
version: block_stream.version,
})
.collect();

Expand Down Expand Up @@ -236,6 +237,7 @@ mod tests {
start_block_height: 0,
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
version: 0,
rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: 0,
Expand Down Expand Up @@ -268,6 +270,7 @@ mod tests {
start_block_height: 0,
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
version: 0,
rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: 0,
Expand Down

0 comments on commit 89f966e

Please sign in to comment.