diff --git a/block-streamer/examples/start_stream.rs b/block-streamer/examples/start_stream.rs index 3802037dd..62dc053c8 100644 --- a/block-streamer/examples/start_stream.rs +++ b/block-streamer/examples/start_stream.rs @@ -12,6 +12,7 @@ async fn main() -> Result<(), Box> { start_block_height: 106700000, account_id: "morgs.near".to_string(), function_name: "test".to_string(), + version: 0, rule: Some(Rule::ActionAnyRule(ActionAnyRule { affected_account_id: "social.near".to_string(), status: Status::Success.into(), diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index 6ad4b35f0..21393577e 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -2,48 +2,50 @@ syntax = "proto3"; package blockstreamer; -// The BlockStreamer service provides RPCs to manage BlockStream instances. +// The BlockStreamer service provides RPCs to manage BlockStream instances service BlockStreamer { - // Starts a new BlockStream process. + // Starts a new BlockStream process rpc StartStream (StartStreamRequest) returns (StartStreamResponse); - // Stops an existing BlockStream process. + // Stops an existing BlockStream process rpc StopStream (StopStreamRequest) returns (StopStreamResponse); - // Lists all current BlockStream processes. + // Lists all current BlockStream processes rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse); } -// Request message for starting a BlockStream. +// Request message for starting a BlockStream message StartStreamRequest { - // Which block height to start from. + // Which block height to start from uint64 start_block_height = 1; - // The account ID which the indexer is defined under + // Account ID which the indexer is defined under string account_id = 2; - // The name of the indexer + // Name of the indexer string function_name = 3; - // The filter rule to apply to incoming blocks + // Block height corresponding to the created/updated height of the indexer + uint64 version = 4; + // Filter rule to apply to incoming blocks oneof rule { - ActionAnyRule action_any_rule = 4; - ActionFunctionCallRule action_function_call_rule = 5; + ActionAnyRule action_any_rule = 5; + ActionFunctionCallRule action_function_call_rule = 6; } } // Match any action against the specified account message ActionAnyRule { - // The account ID pattern to match against + // Account ID pattern to match against string affected_account_id = 1; - // The status of the action to match against + // Status of the action to match against Status status = 2; } // Match a specific function call against the specified account message ActionFunctionCallRule { - // The account ID pattern to match against + // Account ID pattern to match against string affected_account_id = 1; - // The function name to match against + // Function name to match against string function_name = 2; - // The status of the action to match against + // Status of the action to match against Status status = 3; } @@ -54,40 +56,43 @@ enum Status { STATUS_ANY = 3; } -// Response message for starting a BlockStream. +// Response message for starting a BlockStream message StartStreamResponse { - // ID or handle of the started BlockStream. + // ID or handle of the started BlockStream string stream_id = 1; } -// Request message for stopping a BlockStream. +// Request message for stopping a BlockStream message StopStreamRequest { - // ID or handle of the BlockStream to stop. + // ID or handle of the BlockStream to stop string stream_id = 1; } -// Response message for stopping a BlockStream. +// Response message for stopping a BlockStream message StopStreamResponse { - // Confirmation message or status. + // Confirmation message or status string status = 1; } -// Request message for listing BlockStreams. +// Request message for listing BlockStreams message ListStreamsRequest { - // Optional filters or parameters for listing streams. + // Optional filters or parameters for listing streams } -// Response message for listing BlockStreams. +// Response message for listing BlockStreams message ListStreamsResponse { - // List of active BlockStreams. + // List of active BlockStreams repeated StreamInfo streams = 1; } -// Information about a single BlockStream instance. +// Information about a single BlockStream instance message StreamInfo { + // ID or handle of the BlockStream string stream_id = 1; - int64 start_block_height = 2; - string indexer_name = 3; - string chain_id = 4; - string status = 5; + // Account ID of the indexer + string account_id = 3; + // Function name of the indexer + string function_name = 4; + // Block height corresponding to the created/updated height of the indexer + uint64 version = 5; } diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index ecef2d632..b60685a2e 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -19,14 +19,16 @@ pub struct BlockStream { task: Option, pub indexer_config: IndexerConfig, pub chain_id: ChainId, + pub version: u64, } impl BlockStream { - pub fn new(indexer_config: IndexerConfig, chain_id: ChainId) -> Self { + pub fn new(indexer_config: IndexerConfig, chain_id: ChainId, version: u64) -> Self { Self { task: None, indexer_config, chain_id, + version, } } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 8cd9ce7b7..ba5cdb8bd 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -107,8 +107,11 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic None => drop(lock), } - let mut block_stream = - block_stream::BlockStream::new(indexer_config.clone(), self.chain_id.clone()); + let mut block_stream = block_stream::BlockStream::new( + indexer_config.clone(), + self.chain_id.clone(), + request.version, + ); block_stream .start( @@ -169,11 +172,9 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic .values() .map(|block_stream| StreamInfo { stream_id: block_stream.indexer_config.get_hash_id(), - chain_id: self.chain_id.to_string(), - indexer_name: block_stream.indexer_config.get_full_name(), - start_block_height: 0, - status: "OK".to_string(), - // last_indexed_block + account_id: block_stream.indexer_config.account_id.to_string(), + function_name: block_stream.indexer_config.function_name.clone(), + version: block_stream.version, }) .collect(); @@ -236,6 +237,7 @@ mod tests { start_block_height: 0, account_id: "morgs.near".to_string(), function_name: "test".to_string(), + version: 0, rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { affected_account_id: "queryapi.dataplatform.near".to_string(), status: 0, @@ -268,6 +270,7 @@ mod tests { start_block_height: 0, account_id: "morgs.near".to_string(), function_name: "test".to_string(), + version: 0, rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { affected_account_id: "queryapi.dataplatform.near".to_string(), status: 0,