diff --git a/src/bin/follow_blocks.rs b/src/bin/follow_blocks.rs index 861d9767..45252e36 100644 --- a/src/bin/follow_blocks.rs +++ b/src/bin/follow_blocks.rs @@ -1,5 +1,6 @@ use clap::Parser; use hex; +use snapchain::proto::{message_data, Message, MessageType}; use snapchain::utils::cli::follow_blocks; use tokio::sync::mpsc; @@ -28,10 +29,41 @@ async fn main() { for chunk in &block.shard_chunks { for tx in &chunk.transactions { for msg in &tx.user_messages { - let hash = hex::encode(&msg.hash); - println!(" - {}", hash); + show_msg(msg); } } } } } + +fn show_msg(msg: &Message) { + let hash = hex::encode(&msg.hash); + + let data = match msg.data.as_ref() { + Some(data) => data, + None => { + println!("- {} ** {} **", hash, "no data"); + return; + } + }; + + let mt = match MessageType::try_from(data.r#type) { + Ok(mt) => mt, + Err(_) => { + println!("- {} ** {} **", hash, "invalid message type"); + return; + } + }; + + match mt { + MessageType::CastAdd => { + if let Some(message_data::Body::CastAddBody(body)) = data.body.as_ref() { + println!(" - {} {} {}", hash, msg.fid(), body.text); + } else { + panic!("Body is not CastAddBody"); + } + } + + _ => println!(" - {}", hash), + } +} diff --git a/src/perf/gen_multi.rs b/src/perf/gen_multi.rs new file mode 100644 index 00000000..d872bfde --- /dev/null +++ b/src/perf/gen_multi.rs @@ -0,0 +1,55 @@ +use crate::perf::generate::{MessageGenerator, NextMessage}; +use crate::proto; +use crate::storage::store::test_helper; +use crate::utils::cli; +use crate::utils::factory::events_factory; +use ed25519_dalek::SigningKey; +use rand::Rng; +use std::collections::HashSet; + +pub struct MultiUser { + initialized_fids: HashSet, + private_key: SigningKey, + thread_id: u32, +} + +impl MultiUser { + pub fn new(thread_id: u32) -> Self { + Self { + initialized_fids: HashSet::new(), + private_key: test_helper::default_signer(), + thread_id, + } + } +} + +impl MessageGenerator for MultiUser { + fn next(&mut self, seq: u64) -> Vec { + let mut rng = rand::thread_rng(); + + let fid: u32 = rng.gen_range(1..=5000) + 1_000_000 * self.thread_id; + let mut messages = Vec::new(); + + // If the FID has not been initialized, return initial messages + if !self.initialized_fids.contains(&fid) { + let private_key = self.private_key.clone(); + + messages.push(NextMessage::OnChainEvent(cli::compose_rent_event(fid))); + messages.push(NextMessage::OnChainEvent( + events_factory::create_id_register_event(fid, proto::IdRegisterEventType::Register), + )); + messages.push(NextMessage::OnChainEvent( + events_factory::create_signer_event(fid, private_key, proto::SignerEventType::Add), + )); + + self.initialized_fids.insert(fid); + } + + // Add a benchmarking message + let text = format!("For benchmarking {}", seq); + let msg = cli::compose_message(fid, &text, None, Some(&self.private_key)); + messages.push(NextMessage::Message(msg)); + + messages + } +} diff --git a/src/perf/gen_single.rs b/src/perf/gen_single.rs new file mode 100644 index 00000000..680543a0 --- /dev/null +++ b/src/perf/gen_single.rs @@ -0,0 +1,53 @@ +use crate::perf::generate::{MessageGenerator, NextMessage}; +use crate::proto; +use crate::storage::store::test_helper; +use crate::utils::cli; +use crate::utils::factory::events_factory; +use ed25519_dalek::SigningKey; + +pub struct SingleUser { + private_key: SigningKey, + initialized: bool, + thread_id: u32, +} + +impl SingleUser { + pub fn new(thread_id: u32) -> Self { + Self { + initialized: false, + private_key: test_helper::default_signer(), + thread_id, + } + } +} + +impl MessageGenerator for SingleUser { + fn next(&mut self, seq: u64) -> Vec { + let mut messages = Vec::new(); + + let fid = self.thread_id * 1_000_000; + + if !self.initialized { + self.initialized = true; + messages.push(NextMessage::OnChainEvent(cli::compose_rent_event(fid))); + messages.push(NextMessage::OnChainEvent( + events_factory::create_id_register_event(fid, proto::IdRegisterEventType::Register), + )); + messages.push(NextMessage::OnChainEvent( + events_factory::create_signer_event( + fid, + self.private_key.clone(), + proto::SignerEventType::Add, + ), + )); + } + + let text = format!("For benchmarking {}", seq); + let msg = cli::compose_message(fid, &text, None, Some(&self.private_key)); + messages.push(NextMessage::Message(msg)); + + messages + } +} + +pub fn assert_send() {} diff --git a/src/perf/generate.rs b/src/perf/generate.rs new file mode 100644 index 00000000..980c85a4 --- /dev/null +++ b/src/perf/generate.rs @@ -0,0 +1,25 @@ +use crate::perf::gen_multi::MultiUser; +use crate::perf::gen_single::SingleUser; +use crate::proto; + +pub enum NextMessage { + OnChainEvent(proto::OnChainEvent), + Message(proto::Message), +} + +pub trait MessageGenerator: Send { + fn next(&mut self, seq: u64) -> Vec; +} + +#[derive(Clone)] +pub enum GeneratorTypes { + SingleUser, + MultiUser, +} + +pub fn new_generator(typ: GeneratorTypes, thread_id: u32) -> Box { + match typ { + GeneratorTypes::SingleUser => Box::new(SingleUser::new(thread_id)), + GeneratorTypes::MultiUser => Box::new(MultiUser::new(thread_id)), + } +} diff --git a/src/perf/mod.rs b/src/perf/mod.rs index 8ff28ffa..508aeaa3 100644 --- a/src/perf/mod.rs +++ b/src/perf/mod.rs @@ -1,2 +1,5 @@ pub mod engine_only_perftest; +mod gen_multi; +mod gen_single; +pub mod generate; pub mod perftest; diff --git a/src/perf/perftest.rs b/src/perf/perftest.rs index c4d8973a..3f8fe6e7 100644 --- a/src/perf/perftest.rs +++ b/src/perf/perftest.rs @@ -1,24 +1,20 @@ +use crate::perf::gen_single::SingleUser; +use crate::perf::generate::{new_generator, GeneratorTypes}; +use crate::perf::{gen_single, generate}; use crate::proto; use crate::proto::hub_service_client::HubServiceClient; use crate::proto::Block; -use crate::storage::store::test_helper; use crate::utils::cli::send_on_chain_event; -use crate::utils::cli::{compose_message, follow_blocks, send_message}; -use crate::utils::factory::events_factory; -use crate::{ - consensus::proposer::current_time, proto::admin_service_client::AdminServiceClient, - utils::cli::compose_rent_event, -}; +use crate::utils::cli::{follow_blocks, send_message}; +use crate::{consensus::proposer::current_time, proto::admin_service_client::AdminServiceClient}; use clap::Parser; -use ed25519_dalek::{SecretKey, SigningKey}; use figment::{ providers::{Env, Format, Toml}, Figment, }; use hex; -use hex::FromHex; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; use std::error::Error; use std::path::Path; use std::sync::{atomic, Arc}; @@ -75,76 +71,59 @@ pub fn load_and_merge_config(config_path: &str) -> Result fn start_submit_messages( messages_tx: mpsc::Sender, config: Config, + gen_type: GeneratorTypes, ) -> Vec> { + gen_single::assert_send::(); + let mut submit_message_handles = vec![]; - const FID: u32 = 6833; - let i = Arc::new(atomic::AtomicU64::new(1)); + let seq = Arc::new(atomic::AtomicU64::new(0)); - for rpc_addr in config.submit_message.rpc_addrs { + for (index, rpc_addr) in config.submit_message.rpc_addrs.into_iter().enumerate() { + let thread_id = (index + 1) as u32; let messages_tx = messages_tx.clone(); - let private_key = SigningKey::from_bytes( - &SecretKey::from_hex( - "1000000000000000000000000000000000000000000000000000000000000000", - ) - .unwrap(), - ); + let seq = seq.clone(); + let gen_type = gen_type.clone(); - let i = i.clone(); + submit_message_handles.push(tokio::spawn(async move { + let mut generator = new_generator(gen_type, thread_id); - let submit_message_handle = tokio::spawn(async move { let mut submit_message_timer = time::interval(config.submit_message.interval); println!("connecting to {}", &rpc_addr); - let mut client = match HubServiceClient::connect(rpc_addr.clone()).await { - Ok(client) => client, - Err(e) => { - panic!("Error connecting to {}: {}", &rpc_addr, e); - } - }; - - let mut admin_client = match AdminServiceClient::connect(rpc_addr.clone()).await { - Ok(client) => client, - Err(e) => { - panic!("Error connecting to {}: {}", &rpc_addr, e); - } - }; - - let rent_event = compose_rent_event(FID); - send_on_chain_event(&mut admin_client, rent_event) - .await - .unwrap(); - - let id_register_event = - events_factory::create_id_register_event(FID, proto::IdRegisterEventType::Register); - - send_on_chain_event(&mut admin_client, id_register_event) + let mut client = HubServiceClient::connect(rpc_addr.clone()) .await - .unwrap(); + .unwrap_or_else(|e| panic!("Error connecting to {}: {}", &rpc_addr, e)); - let signer_event = events_factory::create_signer_event( - FID, - test_helper::default_signer(), - proto::SignerEventType::Add, - ); - - send_on_chain_event(&mut admin_client, signer_event) + let mut admin_client = AdminServiceClient::connect(rpc_addr.clone()) .await - .unwrap(); + .unwrap_or_else(|e| panic!("Error connecting to {}: {}", &rpc_addr, e)); + let mut message_queue: VecDeque = VecDeque::new(); loop { submit_message_timer.tick().await; + let seq = seq.fetch_add(1, atomic::Ordering::SeqCst); - let current_i = i.fetch_add(1, atomic::Ordering::SeqCst); - let text = format!("For benchmarking {}", current_i); + if message_queue.is_empty() { + let msgs = generator.next(seq); + message_queue.extend(msgs); + } - let msg = compose_message(FID, text.as_str(), None, Some(&private_key)); - let message = send_message(&mut client, &msg).await.unwrap(); - messages_tx.send(message).await.unwrap(); - } - }); + let msg = message_queue.pop_front().expect("message queue was empty"); - submit_message_handles.push(submit_message_handle); + match msg { + generate::NextMessage::Message(message) => { + let response = send_message(&mut client, &message).await.unwrap(); + messages_tx.send(response).await.unwrap(); + } + generate::NextMessage::OnChainEvent(event) => { + send_on_chain_event(&mut admin_client, &event) + .await + .expect("Failed to send on-chain event"); + } + } + } + })); } submit_message_handles @@ -159,6 +138,8 @@ pub async fn run() -> Result<(), Box> { process::exit(1); })); + let gen_type = GeneratorTypes::MultiUser; + let cli_args = CliArgs::parse(); let cfg = load_and_merge_config(&cli_args.config_path)?; @@ -166,7 +147,8 @@ pub async fn run() -> Result<(), Box> { let (blocks_tx, mut blocks_rx) = mpsc::channel::(10_000_000); let (messages_tx, mut messages_rx) = mpsc::channel::(10_000_000); - let submit_message_handles = start_submit_messages(messages_tx, cfg.clone()); + + let submit_message_handles = start_submit_messages(messages_tx, cfg.clone(), gen_type); let follow_blocks_handle = tokio::spawn(async move { if !cfg.follow_blocks.enable { diff --git a/src/utils/cli.rs b/src/utils/cli.rs index ecb351d8..872c7329 100644 --- a/src/utils/cli.rs +++ b/src/utils/cli.rs @@ -27,7 +27,7 @@ pub async fn send_message( pub async fn send_on_chain_event( client: &mut AdminServiceClient, - onchain_event: OnChainEvent, + onchain_event: &OnChainEvent, ) -> Result> { let request = tonic::Request::new(onchain_event.clone()); let response = client.submit_on_chain_event(request).await?;