diff --git a/config/sample.toml b/config/sample.toml index ada0d902..d35b0c73 100644 --- a/config/sample.toml +++ b/config/sample.toml @@ -1,8 +1,11 @@ log_format = "json" +[statsd] +prefix = "snaptest" + [fnames] disable = false -stop_at = 9999999999 +stop_at = 200 start_from = 0 url = "https://fnames.farcaster.xyz/transfers" diff --git a/src/bin/follow_blocks.rs b/src/bin/follow_blocks.rs index 45252e36..6acb8bd5 100644 --- a/src/bin/follow_blocks.rs +++ b/src/bin/follow_blocks.rs @@ -27,16 +27,20 @@ async fn main() { println!("Block {}", block_number); for chunk in &block.shard_chunks { + let height = chunk.header.as_ref().unwrap().height.unwrap(); + let shard_id = height.shard_index; + let shard_height = height.block_number; + for tx in &chunk.transactions { for msg in &tx.user_messages { - show_msg(msg); + show_msg(shard_id, shard_height, msg); } } } } } -fn show_msg(msg: &Message) { +fn show_msg(shard_id: u32, _shard_height: u64, msg: &Message) { let hash = hex::encode(&msg.hash); let data = match msg.data.as_ref() { @@ -58,7 +62,7 @@ fn show_msg(msg: &Message) { match mt { MessageType::CastAdd => { if let Some(message_data::Body::CastAddBody(body)) = data.body.as_ref() { - println!(" - {} {} {}", hash, msg.fid(), body.text); + println!(" - {} {} {} {}", hash, shard_id, msg.fid(), body.text); } else { panic!("Body is not CastAddBody"); } diff --git a/src/lib.rs b/src/lib.rs index 9d13e8c9..c00ab6be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod cfg; pub mod connectors; pub mod consensus; pub mod core; +pub mod mempool; pub mod network; pub mod node; pub mod perf; diff --git a/src/main.rs b/src/main.rs index 5b50b8e4..e4ad47d8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -154,7 +154,11 @@ async fn main() -> Result<(), Box> { ) .await; - let admin_service = MyAdminService::new(db_manager, node.shard_senders.clone()); + let admin_service = MyAdminService::new( + db_manager, + node.shard_senders.clone(), + app_config.consensus.num_shards, + ); let rpc_shard_stores = node.shard_stores.clone(); let rpc_shard_senders = node.shard_senders.clone(); @@ -166,6 +170,7 @@ async fn main() -> Result<(), Box> { rpc_shard_stores, rpc_shard_senders, statsd_client.clone(), + app_config.consensus.num_shards, ); let resp = Server::builder() diff --git a/src/mempool/mod.rs b/src/mempool/mod.rs new file mode 100644 index 00000000..e78cad06 --- /dev/null +++ b/src/mempool/mod.rs @@ -0,0 +1 @@ +pub mod routing; diff --git a/src/mempool/routing.rs b/src/mempool/routing.rs new file mode 100644 index 00000000..66c2a8a9 --- /dev/null +++ b/src/mempool/routing.rs @@ -0,0 +1,7 @@ +use sha2::{Digest, Sha256}; + +pub fn route_message(fid: u32, num_shards: u32) -> u32 { + let hash = Sha256::digest(fid.to_be_bytes()); + let hash_u32 = u32::from_be_bytes(hash[..4].try_into().unwrap()); + (hash_u32 % num_shards) + 1 +} diff --git a/src/network/admin_server.rs b/src/network/admin_server.rs index 1088e441..181d6cea 100644 --- a/src/network/admin_server.rs +++ b/src/network/admin_server.rs @@ -1,3 +1,4 @@ +use crate::mempool::routing; use crate::proto::admin_service_server::AdminService; use crate::proto::ValidatorMessage; use crate::proto::{self, OnChainEvent}; @@ -6,7 +7,6 @@ use rocksdb; use std::collections::HashMap; use std::{io, path, process}; use thiserror::Error; -use tokio::sync::mpsc; use tonic::{Request, Response, Status}; use tracing::{info, warn}; @@ -62,7 +62,8 @@ impl DbManager { pub struct MyAdminService { db_manager: DbManager, - message_tx: mpsc::Sender, + num_shards: u32, + pub shard_senders: HashMap, } #[derive(Debug, Error)] @@ -77,12 +78,15 @@ pub enum AdminServiceError { const DB_DESTROY_KEY: &[u8] = b"__destroy_all_databases_on_start__"; impl MyAdminService { - pub fn new(db_manager: DbManager, shard_senders: HashMap) -> Self { - // TODO(aditi): This logic will change once a mempool exists - let message_tx = shard_senders.get(&1u32).unwrap().messages_tx.clone(); + pub fn new( + db_manager: DbManager, + shard_senders: HashMap, + num_shards: u32, + ) -> Self { Self { db_manager, - message_tx, + shard_senders, + num_shards, } } } @@ -122,13 +126,32 @@ impl AdminService for MyAdminService { let onchain_event = request.into_inner(); - let result = self - .message_tx + let fid = onchain_event.fid as u32; + if fid == 0 { + return Err(Status::invalid_argument( + "no fid or invalid fid".to_string(), + )); + } + + let dst_shard = routing::route_message(fid, self.num_shards); + + let sender = match self.shard_senders.get(&dst_shard) { + Some(sender) => sender, + None => { + return Err(Status::invalid_argument( + "no shard sender for fid".to_string(), + )) + } + }; + + let result = sender + .messages_tx .send(MempoolMessage::ValidatorMessage(ValidatorMessage { on_chain_event: Some(onchain_event.clone()), fname_transfer: None, })) .await; + match result { Ok(()) => { let response = Response::new(onchain_event); diff --git a/src/network/server.rs b/src/network/server.rs index 00ff7148..034de45e 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -1,6 +1,5 @@ -use std::collections::HashMap; - use crate::core::error::HubError; +use crate::mempool::routing; use crate::proto; use crate::proto::hub_service_server::HubService; use crate::proto::Block; @@ -12,6 +11,7 @@ use crate::storage::store::stores::{StoreLimits, Stores}; use crate::storage::store::BlockStore; use crate::utils::statsd_wrapper::StatsdClientWrapper; use hex::ToHex; +use std::collections::HashMap; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; @@ -21,7 +21,7 @@ pub struct MyHubService { block_store: BlockStore, shard_stores: HashMap, shard_senders: HashMap, - message_tx: mpsc::Sender, + num_shards: u32, statsd_client: StatsdClientWrapper, } @@ -31,54 +31,71 @@ impl MyHubService { shard_stores: HashMap, shard_senders: HashMap, statsd_client: StatsdClientWrapper, + num_shards: u32, ) -> Self { - // TODO(aditi): This logic will change once a mempool exists - let message_tx = shard_senders.get(&1u32).unwrap().messages_tx.clone(); - Self { block_store, shard_senders, shard_stores, - message_tx, statsd_client, + num_shards, } } -} -#[tonic::async_trait] -impl HubService for MyHubService { - async fn submit_message( + async fn submit_message_internal( &self, - request: Request, - ) -> Result, Status> { - let start_time = std::time::Instant::now(); + message: proto::Message, + bypass_validation: bool, + ) -> Result { + let fid = message.fid(); + if fid == 0 { + return Err(Status::invalid_argument( + "no fid or invalid fid".to_string(), + )); + } - let hash = request.get_ref().hash.encode_hex::(); - info!(hash, "Received call to [submit_message] RPC"); + let dst_shard = routing::route_message(fid, self.num_shards); - let message = request.into_inner(); + let sender = match self.shard_senders.get(&dst_shard) { + Some(sender) => sender, + None => { + return Err(Status::invalid_argument( + "no shard sender for fid".to_string(), + )) + } + }; - let stores = self.shard_stores.get(&1u32).unwrap(); - // TODO: This is a hack to get around the fact that self cannot be made mutable - let mut readonly_engine = ShardEngine::new( - stores.db.clone(), - stores.trie.clone(), - 1, - StoreLimits::default(), - self.statsd_client.clone(), - 100, - ); - let result = readonly_engine.simulate_message(&message); + let stores = match self.shard_stores.get(&dst_shard) { + Some(sender) => sender, + None => { + return Err(Status::invalid_argument( + "no shard store for fid".to_string(), + )) + } + }; - if let Err(err) = result { - return Err(Status::invalid_argument(format!( - "Invalid message: {}", - err.to_string() - ))); + if !bypass_validation { + // TODO: This is a hack to get around the fact that self cannot be made mutable + let mut readonly_engine = ShardEngine::new( + stores.db.clone(), + stores.trie.clone(), + 1, + StoreLimits::default(), + self.statsd_client.clone(), + 100, + ); + let result = readonly_engine.simulate_message(&message); + + if let Err(err) = result { + return Err(Status::invalid_argument(format!( + "Invalid message: {}", + err.to_string() + ))); + } } - let result = self - .message_tx + let result = sender + .messages_tx .send(MempoolMessage::UserMessage(message.clone())) .await; @@ -94,14 +111,70 @@ impl HubService for MyHubService { } } - let elapsed = start_time.elapsed().as_millis(); + Ok(message) + } +} + +#[tonic::async_trait] +impl HubService for MyHubService { + async fn submit_message_with_options( + &self, + request: Request, + ) -> Result, Status> { + let start_time = std::time::Instant::now(); + + let hash = request + .get_ref() + .message + .as_ref() + .map(|msg| msg.hash.encode_hex::()) + .unwrap_or_default(); + info!(%hash, "Received call to [submit_message_with_options] RPC"); + + let proto::SubmitMessageRequest { + message, + bypass_validation, + } = request.into_inner(); + + let message = match message { + Some(msg) => msg, + None => return Err(Status::invalid_argument("Message is required")), + }; + + let response_message = self + .submit_message_internal(message, bypass_validation.unwrap_or(false)) + .await?; + + let response = proto::SubmitMessageResponse { + message: Some(response_message), + }; + + self.statsd_client.time( + "rpc.submit_message_with_options.duration", + start_time.elapsed().as_millis() as u64, + ); + + Ok(Response::new(response)) + } + + async fn submit_message( + &self, + request: Request, + ) -> Result, Status> { + let start_time = std::time::Instant::now(); + + let hash = request.get_ref().hash.encode_hex::(); + info!(hash, "Received call to [submit_message] RPC"); - let response = Response::new(message); + let message = request.into_inner(); + let response_message = self.submit_message_internal(message, false).await?; - self.statsd_client - .time("rpc.submit_message.duration", elapsed as u64); + self.statsd_client.time( + "rpc.submit_message.duration", + start_time.elapsed().as_millis() as u64, + ); - Ok(response) + Ok(Response::new(response_message)) } type GetBlocksStream = ReceiverStream>; @@ -198,7 +271,7 @@ impl HubService for MyHubService { // TODO(aditi): Rethink the channel size let (server_tx, client_rx) = mpsc::channel::>(100); let events_txs = match request.get_ref().shard_index { - Some(shard_id) => match self.shard_senders.get(&(shard_id as u32)) { + Some(shard_id) => match self.shard_senders.get(&(shard_id)) { None => { return Err(Status::from_error(Box::new( HubError::invalid_internal_state("Missing shard event tx"), diff --git a/src/network/server_tests.rs b/src/network/server_tests.rs index bf7b74d9..edb52386 100644 --- a/src/network/server_tests.rs +++ b/src/network/server_tests.rs @@ -109,6 +109,7 @@ mod tests { let shard2_senders = Senders::new(msgs_tx.clone()); let stores = HashMap::from([(1, shard1_stores), (2, shard2_stores)]); let senders = HashMap::from([(1, shard1_senders), (2, shard2_senders)]); + let num_shards = senders.len() as u32; ( stores.clone(), @@ -118,6 +119,7 @@ mod tests { stores, senders, statsd_client, + num_shards, ), ) } diff --git a/src/perf/perftest.rs b/src/perf/perftest.rs index 3f8fe6e7..95d537ee 100644 --- a/src/perf/perftest.rs +++ b/src/perf/perftest.rs @@ -4,8 +4,8 @@ use crate::perf::{gen_single, generate}; use crate::proto; use crate::proto::hub_service_client::HubServiceClient; use crate::proto::Block; +use crate::utils::cli::follow_blocks; use crate::utils::cli::send_on_chain_event; -use crate::utils::cli::{follow_blocks, send_message}; use crate::{consensus::proposer::current_time, proto::admin_service_client::AdminServiceClient}; use clap::Parser; use figment::{ @@ -113,8 +113,15 @@ fn start_submit_messages( match msg { generate::NextMessage::Message(message) => { - let response = send_message(&mut client, &message).await.unwrap(); - messages_tx.send(response).await.unwrap(); + let response = client + .submit_message_with_options(proto::SubmitMessageRequest { + message: Some(message), + bypass_validation: Some(true), + }) + .await + .unwrap(); + let sent = response.into_inner().message.unwrap(); + messages_tx.send(sent).await.unwrap(); } generate::NextMessage::OnChainEvent(event) => { send_on_chain_event(&mut admin_client, &event) diff --git a/src/proto/rpc.proto b/src/proto/rpc.proto index 5fc90bb3..17d09b26 100644 --- a/src/proto/rpc.proto +++ b/src/proto/rpc.proto @@ -28,8 +28,19 @@ message SubscribeRequest { optional uint32 shard_index = 5; } +message SubmitMessageRequest { + Message message = 1; + optional bool bypass_validation = 99; +} + +message SubmitMessageResponse { + Message message = 1; +} + + service HubService { rpc SubmitMessage(Message) returns (Message); + rpc SubmitMessageWithOptions(SubmitMessageRequest) returns (SubmitMessageResponse); rpc GetBlocks(BlocksRequest) returns (stream Block); rpc GetShardChunks(ShardChunksRequest) returns (ShardChunksResponse); rpc Subscribe(SubscribeRequest) returns (stream HubEvent); diff --git a/tests/consensus_test.rs b/tests/consensus_test.rs index cb121705..be04a1fc 100644 --- a/tests/consensus_test.rs +++ b/tests/consensus_test.rs @@ -116,6 +116,7 @@ impl NodeForTest { grpc_shard_stores, grpc_shard_senders, statsd_client.clone(), + num_shards, ); let grpc_socket_addr: SocketAddr = addr.parse().unwrap();