diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index b96492ead..0035120f2 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -836,6 +836,51 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "base16ct" version = "0.1.1" @@ -879,6 +924,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" + [[package]] name = "blake2" version = "0.9.2" @@ -924,12 +975,15 @@ dependencies = [ "futures", "mockall", "near-lake-framework", + "prost", "redis", "serde", "serde_json", "tokio", "tokio-stream", "tokio-util", + "tonic", + "tonic-build", "tracing", "tracing-subscriber", "wildmatch", @@ -1472,6 +1526,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -1506,6 +1570,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "float-cmp" version = "0.9.0" @@ -1748,6 +1818,15 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "home" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb" +dependencies = [ + "windows-sys", +] + [[package]] name = "http" version = "0.2.11" @@ -1837,6 +1916,18 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "iana-time-zone" version = "0.1.58" @@ -1949,6 +2040,12 @@ version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +[[package]] +name = "linux-raw-sys" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829" + [[package]] name = "log" version = "0.4.20" @@ -1964,6 +2061,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -1980,6 +2083,12 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "mio" version = "0.8.9" @@ -2018,6 +2127,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "near-account-id" version = "0.17.0" @@ -2335,6 +2450,16 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "petgraph" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" +dependencies = [ + "fixedbitset", + "indexmap 2.1.0", +] + [[package]] name = "pin-project" version = "1.1.3" @@ -2419,6 +2544,16 @@ dependencies = [ "termtree", ] +[[package]] +name = "prettyplease" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" +dependencies = [ + "proc-macro2", + "syn 2.0.39", +] + [[package]] name = "primitive-types" version = "0.10.1" @@ -2447,6 +2582,60 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" +dependencies = [ + "bytes", + "heck", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.39", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.39", +] + +[[package]] +name = "prost-types" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e" +dependencies = [ + "prost", +] + [[package]] name = "quote" version = "1.0.33" @@ -2549,6 +2738,15 @@ dependencies = [ "url", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "reed-solomon-erasure" version = "4.0.2" @@ -2651,6 +2849,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.38.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc99bc2d4f1fed22595588a013687477aedf3cdcfb26558c559edb67b4d9b22e" +dependencies = [ + "bitflags 2.4.1", + "errno", + "libc", + "linux-raw-sys", + "windows-sys", +] + [[package]] name = "rustls" version = "0.20.9" @@ -2776,7 +2987,7 @@ version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -3090,6 +3301,25 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + +[[package]] +name = "tempfile" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" +dependencies = [ + "cfg-if", + "fastrand 2.0.1", + "redox_syscall", + "rustix", + "windows-sys", +] + [[package]] name = "termtree" version = "0.4.1" @@ -3188,6 +3418,16 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.1.0" @@ -3254,6 +3494,46 @@ dependencies = [ "serde", ] +[[package]] +name = "tonic" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.39", +] + [[package]] name = "tower" version = "0.4.13" @@ -3262,9 +3542,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand 0.8.5", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -3543,6 +3827,18 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "wildmatch" version = "2.1.1" diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index 2335b81f5..73db949bb 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -15,6 +15,7 @@ borsh = "0.10.2" chrono = "0.4.25" futures = "0.3.5" mockall = "0.11.4" +prost = "0.12.3" redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] } serde = { version = "1", features = ["derive"] } serde_json = "1.0.55" @@ -23,6 +24,10 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tokio = { version = "1.28.0", features = ["rt-multi-thread"]} tokio-util = "0.7.10" tokio-stream = "0.1.14" +tonic = "0.10.2" wildmatch = "2.1.1" near-lake-framework = "0.7.1" + +[build-dependencies] +tonic-build = "0.10" diff --git a/block-streamer/build.rs b/block-streamer/build.rs new file mode 100644 index 000000000..4683c5353 --- /dev/null +++ b/block-streamer/build.rs @@ -0,0 +1,5 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/block_streamer.proto")?; + + Ok(()) +} diff --git a/block-streamer/examples/list_streams.rs b/block-streamer/examples/list_streams.rs new file mode 100644 index 000000000..16cde7d0f --- /dev/null +++ b/block-streamer/examples/list_streams.rs @@ -0,0 +1,17 @@ +use tonic::Request; + +use block_streamer::block_streamer_client::BlockStreamerClient; +use block_streamer::{start_stream_request::Rule, ActionAnyRule, ListStreamsRequest, Status}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; + + let response = client + .list_streams(Request::new(ListStreamsRequest {})) + .await?; + + println!("RESPONSE = {:#?}", response); + + Ok(()) +} diff --git a/block-streamer/examples/start_stream.rs b/block-streamer/examples/start_stream.rs new file mode 100644 index 000000000..108f4e799 --- /dev/null +++ b/block-streamer/examples/start_stream.rs @@ -0,0 +1,25 @@ +use tonic::Request; + +use block_streamer::block_streamer_client::BlockStreamerClient; +use block_streamer::{start_stream_request::Rule, ActionAnyRule, StartStreamRequest, Status}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; + + let response = client + .start_stream(Request::new(StartStreamRequest { + start_block_height: 106700000, + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + rule: Some(Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "social.near".to_string(), + status: Status::Success.into(), + })), + })) + .await?; + + println!("RESPONSE = {:?}", response); + + Ok(()) +} diff --git a/block-streamer/examples/stop_stream.rs b/block-streamer/examples/stop_stream.rs new file mode 100644 index 000000000..e451f2ae3 --- /dev/null +++ b/block-streamer/examples/stop_stream.rs @@ -0,0 +1,20 @@ +use tonic::Request; + +use block_streamer::block_streamer_client::BlockStreamerClient; +use block_streamer::StopStreamRequest; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; + + let response = client + .stop_stream(Request::new(StopStreamRequest { + // ID for indexer morgs.near/test + stream_id: "16210176318434468568".to_string(), + })) + .await?; + + println!("RESPONSE = {:?}", response); + + Ok(()) +} diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto new file mode 100644 index 000000000..6ad4b35f0 --- /dev/null +++ b/block-streamer/proto/block_streamer.proto @@ -0,0 +1,93 @@ +syntax = "proto3"; + +package blockstreamer; + +// The BlockStreamer service provides RPCs to manage BlockStream instances. +service BlockStreamer { + // Starts a new BlockStream process. + rpc StartStream (StartStreamRequest) returns (StartStreamResponse); + + // Stops an existing BlockStream process. + rpc StopStream (StopStreamRequest) returns (StopStreamResponse); + + // Lists all current BlockStream processes. + rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse); +} + +// Request message for starting a BlockStream. +message StartStreamRequest { + // Which block height to start from. + uint64 start_block_height = 1; + // The account ID which the indexer is defined under + string account_id = 2; + // The name of the indexer + string function_name = 3; + // The filter rule to apply to incoming blocks + oneof rule { + ActionAnyRule action_any_rule = 4; + ActionFunctionCallRule action_function_call_rule = 5; + } +} + +// Match any action against the specified account +message ActionAnyRule { + // The account ID pattern to match against + string affected_account_id = 1; + // The status of the action to match against + Status status = 2; +} + +// Match a specific function call against the specified account +message ActionFunctionCallRule { + // The account ID pattern to match against + string affected_account_id = 1; + // The function name to match against + string function_name = 2; + // The status of the action to match against + Status status = 3; +} + +enum Status { + STATUS_UNSPECIFIED = 0; + STATUS_SUCCESS = 1; + STATUS_FAILURE = 2; + STATUS_ANY = 3; +} + +// Response message for starting a BlockStream. +message StartStreamResponse { + // ID or handle of the started BlockStream. + string stream_id = 1; +} + +// Request message for stopping a BlockStream. +message StopStreamRequest { + // ID or handle of the BlockStream to stop. + string stream_id = 1; +} + +// Response message for stopping a BlockStream. +message StopStreamResponse { + // Confirmation message or status. + string status = 1; +} + +// Request message for listing BlockStreams. +message ListStreamsRequest { + // Optional filters or parameters for listing streams. +} + +// Response message for listing BlockStreams. +message ListStreamsResponse { + // List of active BlockStreams. + repeated StreamInfo streams = 1; +} + +// Information about a single BlockStream instance. +message StreamInfo { + string stream_id = 1; + int64 start_block_height = 2; + string indexer_name = 3; + string chain_id = 4; + string status = 5; +} diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 39bb2b4e6..3942888d7 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -12,20 +12,24 @@ pub struct Task { pub struct BlockStream { task: Option, + pub indexer_config: IndexerConfig, + pub chain_id: ChainId, } impl BlockStream { - pub fn new() -> Self { - Self { task: None } + pub fn new(indexer_config: IndexerConfig, chain_id: ChainId) -> Self { + Self { + task: None, + indexer_config, + chain_id, + } } pub fn start( &mut self, start_block_height: near_indexer_primitives::types::BlockHeight, - indexer: IndexerConfig, redis_connection_manager: crate::redis::ConnectionManager, delta_lake_client: crate::delta_lake_client::DeltaLakeClient, - chain_id: ChainId, ) -> anyhow::Result<()> { if self.task.is_some() { return Err(anyhow::anyhow!("BlockStreamer has already been started",)); @@ -34,19 +38,22 @@ impl BlockStream { let cancellation_token = tokio_util::sync::CancellationToken::new(); let cancellation_token_clone = cancellation_token.clone(); + let indexer_config = self.indexer_config.clone(); + let chain_id = self.chain_id.clone(); + let handle = tokio::spawn(async move { tokio::select! { _ = cancellation_token_clone.cancelled() => { tracing::info!( - "Cancelling existing block stream task for indexer: {}", - indexer.get_full_name(), + "Cancelling block stream task for indexer: {}", + indexer_config.get_full_name(), ); Ok(()) }, result = start_block_stream( start_block_height, - indexer.clone(), + &indexer_config, &redis_connection_manager, &delta_lake_client, &chain_id, @@ -54,7 +61,7 @@ impl BlockStream { result.map_err(|err| { tracing::error!( "Block stream task for indexer: {} stopped due to error: {:?}", - indexer.get_full_name(), + indexer_config.get_full_name(), err, ); err @@ -91,7 +98,7 @@ impl BlockStream { pub(crate) async fn start_block_stream( start_block_height: near_indexer_primitives::types::BlockHeight, - indexer: IndexerConfig, + indexer: &IndexerConfig, redis_connection_manager: &crate::redis::ConnectionManager, delta_lake_client: &crate::delta_lake_client::DeltaLakeClient, chain_id: &ChainId, diff --git a/block-streamer/src/delta_lake_client.rs b/block-streamer/src/delta_lake_client.rs index 26fd2b144..bff283a35 100644 --- a/block-streamer/src/delta_lake_client.rs +++ b/block-streamer/src/delta_lake_client.rs @@ -31,6 +31,7 @@ pub struct IndexFile { pub actions: Vec, } +#[derive(Clone)] pub struct DeltaLakeClient where T: crate::s3_client::S3ClientTrait, @@ -46,7 +47,7 @@ where pub fn new(s3_client: T) -> Self { DeltaLakeClient { s3_client, - // hardcode to mainnet for + // hardcode to mainnet for now chain_id: ChainId::Mainnet, } } diff --git a/block-streamer/src/indexer_config.rs b/block-streamer/src/indexer_config.rs index 06328d69d..eba4485b0 100644 --- a/block-streamer/src/indexer_config.rs +++ b/block-streamer/src/indexer_config.rs @@ -1,5 +1,8 @@ -use crate::rules::IndexerRule; use near_lake_framework::near_indexer_primitives::types::AccountId; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; + +use crate::rules::IndexerRule; #[derive( borsh::BorshSerialize, @@ -12,10 +15,10 @@ use near_lake_framework::near_indexer_primitives::types::AccountId; pub struct IndexerConfig { pub account_id: AccountId, pub function_name: String, - pub code: String, - pub start_block_height: Option, - pub schema: Option, - pub provisioned: bool, + // pub code: String, + // pub start_block_height: Option, + // pub schema: Option, + // pub provisioned: bool, pub indexer_rule: IndexerRule, } @@ -23,4 +26,10 @@ impl IndexerConfig { pub fn get_full_name(&self) -> String { format!("{}/{}", self.account_id, self.function_name) } + + pub fn get_hash_id(&self) -> String { + let mut hasher = DefaultHasher::new(); + self.get_full_name().hash(&mut hasher); + hasher.finish().to_string() + } } diff --git a/block-streamer/src/lib.rs b/block-streamer/src/lib.rs new file mode 100644 index 000000000..a91817f41 --- /dev/null +++ b/block-streamer/src/lib.rs @@ -0,0 +1,5 @@ +mod blockstreamer { + tonic::include_proto!("blockstreamer"); +} + +pub use blockstreamer::*; diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index c6efd3b51..50e87ac06 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -1,17 +1,12 @@ use tracing_subscriber::prelude::*; -use crate::indexer_config::IndexerConfig; -use crate::rules::types::indexer_rule_match::ChainId; -use crate::rules::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; - mod block_stream; mod delta_lake_client; mod indexer_config; mod redis; mod rules; mod s3_client; - -pub(crate) const LOG_TARGET: &str = "block_streamer"; +mod server; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -20,7 +15,7 @@ async fn main() -> anyhow::Result<()> { .with(tracing_subscriber::EnvFilter::from_default_env()) .init(); - tracing::info!("Starting {}", crate::LOG_TARGET); + tracing::info!("Starting Block Streamer Service..."); let redis_connection_manager = redis::connect("redis://127.0.0.1").await?; @@ -29,40 +24,7 @@ async fn main() -> anyhow::Result<()> { let delta_lake_client = crate::delta_lake_client::DeltaLakeClient::new(s3_client); - let contract = "queryapi.dataplatform.near"; - let matching_rule = MatchingRule::ActionAny { - affected_account_id: contract.to_string(), - status: Status::Any, - }; - let filter_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule, - id: None, - name: None, - }; - let indexer = IndexerConfig { - account_id: "buildnear.testnet".to_string().parse().unwrap(), - function_name: "index_stuff".to_string(), - code: "".to_string(), - start_block_height: Some(85376002), - schema: None, - provisioned: false, - indexer_rule: filter_rule, - }; - - let mut streamer = block_stream::BlockStream::new(); - - streamer.start( - 106000000, - indexer, - redis_connection_manager, - delta_lake_client, - ChainId::Mainnet, - )?; - - streamer.take_handle().unwrap().await??; - - println!("done"); + server::init(redis_connection_manager, delta_lake_client).await?; Ok(()) } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs new file mode 100644 index 000000000..79fd1b541 --- /dev/null +++ b/block-streamer/src/server/block_streamer_service.rs @@ -0,0 +1,190 @@ +use std::collections::HashMap; +use std::sync::Mutex; + +use near_lake_framework::near_indexer_primitives; +use tonic::{Request, Response, Status}; + +use crate::indexer_config::IndexerConfig; +use crate::rules::types::indexer_rule_match::ChainId; +use crate::rules::{IndexerRule, IndexerRuleKind, MatchingRule}; + +use crate::block_stream; +use crate::server::blockstreamer; + +use blockstreamer::*; + +impl TryFrom for crate::rules::Status { + type Error = (); + + fn try_from(status: i32) -> Result { + match status { + 0 => Ok(crate::rules::Status::Success), + 1 => Ok(crate::rules::Status::Fail), + 2 => Ok(crate::rules::Status::Any), + _ => Err(()), + } + } +} + +pub struct BlockStreamerService { + redis_connection_manager: crate::redis::ConnectionManager, + delta_lake_client: crate::delta_lake_client::DeltaLakeClient, + chain_id: ChainId, + block_streams: Mutex>, +} + +impl BlockStreamerService { + pub fn new( + redis_connection_manager: crate::redis::ConnectionManager, + delta_lake_client: crate::delta_lake_client::DeltaLakeClient, + ) -> Self { + Self { + redis_connection_manager, + delta_lake_client, + chain_id: ChainId::Mainnet, + block_streams: Mutex::new(HashMap::new()), + } + } + + fn get_block_streams_lock( + &self, + ) -> Result>, Status> { + self.block_streams + .lock() + .map_err(|err| Status::internal(format!("Failed to acquire lock: {}", err))) + } +} + +#[tonic::async_trait] +impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService { + async fn start_stream( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + + let rule = request + .rule + .ok_or(Status::invalid_argument("Rule must be provided"))?; + + let matching_rule = match rule { + start_stream_request::Rule::ActionAnyRule(action_any_rule) => { + let affected_account_id = action_any_rule.affected_account_id; + let status = action_any_rule.status.try_into().map_err(|_| { + Status::invalid_argument("Invalid status value for ActionAnyRule") + })?; + + MatchingRule::ActionAny { + affected_account_id, + status, + } + } + _ => { + return Err(Status::unimplemented( + "Rules other than ActionAny are not supported yet", + )) + } + }; + let filter_rule = IndexerRule { + // TODO: Remove kind as it is unused + indexer_rule_kind: IndexerRuleKind::Action, + matching_rule, + id: None, + name: None, + }; + + let account_id = near_indexer_primitives::types::AccountId::try_from(request.account_id) + .map_err(|err| { + Status::invalid_argument(format!( + "Invalid account_id value for StartStreamRequest: {}", + err + )) + })?; + let indexer_config = IndexerConfig { + account_id, + function_name: request.function_name, + indexer_rule: filter_rule, + }; + + let lock = self.get_block_streams_lock()?; + match lock.get(&indexer_config.get_hash_id()) { + Some(_) => return Err(Status::already_exists("Block stream already exists")), + None => drop(lock), + } + + let mut block_stream = + block_stream::BlockStream::new(indexer_config.clone(), self.chain_id.clone()); + + block_stream + .start( + request.start_block_height, + self.redis_connection_manager.clone(), + self.delta_lake_client.clone(), + ) + .map_err(|_| Status::internal("Failed to start block stream"))?; + + let mut lock = self.get_block_streams_lock()?; + lock.insert(indexer_config.get_hash_id(), block_stream); + + Ok(Response::new(blockstreamer::StartStreamResponse { + stream_id: indexer_config.get_hash_id(), + })) + } + + async fn stop_stream( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + + let stream_id = request.stream_id; + + let exising_block_stream = { + let mut lock = self.get_block_streams_lock()?; + lock.remove(&stream_id) + }; + + match exising_block_stream { + None => { + return Err(Status::not_found(format!( + "Block stream with id {} not found", + stream_id + ))) + } + Some(mut block_stream) => { + block_stream + .cancel() + .await + .map_err(|_| Status::internal("Failed to cancel block stream"))?; + } + } + + Ok(Response::new(blockstreamer::StopStreamResponse { + status: "ok".to_string(), + })) + } + + async fn list_streams( + &self, + _request: Request, + ) -> Result, Status> { + let lock = self.block_streams.lock().unwrap(); + let block_streams: Vec = lock + .values() + .map(|block_stream| StreamInfo { + stream_id: block_stream.indexer_config.get_hash_id(), + chain_id: self.chain_id.to_string(), + indexer_name: block_stream.indexer_config.get_full_name(), + start_block_height: 0, + status: "OK".to_string(), + // last_indexed_block + }) + .collect(); + + let response = blockstreamer::ListStreamsResponse { + streams: block_streams, + }; + + Ok(Response::new(response)) + } +} diff --git a/block-streamer/src/server/mod.rs b/block-streamer/src/server/mod.rs new file mode 100644 index 000000000..544482dda --- /dev/null +++ b/block-streamer/src/server/mod.rs @@ -0,0 +1,29 @@ +mod block_streamer_service; + +pub mod blockstreamer { + tonic::include_proto!("blockstreamer"); +} + +pub async fn init( + redis_connection_manager: crate::redis::ConnectionManager, + delta_lake_client: crate::delta_lake_client::DeltaLakeClient, +) -> anyhow::Result<()> { + let addr = "[::1]:10000" + .parse() + .expect("Failed to parse RPC socket address"); + + tracing::info!("Starting RPC server at {}", addr); + + let block_streamer_service = block_streamer_service::BlockStreamerService::new( + redis_connection_manager, + delta_lake_client, + ); + let block_streamer_server = + blockstreamer::block_streamer_server::BlockStreamerServer::new(block_streamer_service); + + tonic::transport::Server::builder() + .add_service(block_streamer_server) + .serve(addr) + .await + .map_err(|err| err.into()) +}