Skip to content

Commit

Permalink
different message generators for perftest
Browse files Browse the repository at this point in the history
  • Loading branch information
suurkivi committed Dec 6, 2024
1 parent ff99cf1 commit 87a9cd5
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 65 deletions.
36 changes: 34 additions & 2 deletions src/bin/follow_blocks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use clap::Parser;
use hex;
use snapchain::proto::{message_data, Message, MessageType};
use snapchain::utils::cli::follow_blocks;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -28,10 +29,41 @@ async fn main() {
for chunk in &block.shard_chunks {
for tx in &chunk.transactions {
for msg in &tx.user_messages {
let hash = hex::encode(&msg.hash);
println!(" - {}", hash);
show_msg(msg);
}
}
}
}
}

fn show_msg(msg: &Message) {
let hash = hex::encode(&msg.hash);

let data = match msg.data.as_ref() {
Some(data) => data,
None => {
println!("- {} ** {} **", hash, "no data");
return;
}
};

let mt = match MessageType::try_from(data.r#type) {
Ok(mt) => mt,
Err(_) => {
println!("- {} ** {} **", hash, "invalid message type");
return;
}
};

match mt {
MessageType::CastAdd => {
if let Some(message_data::Body::CastAddBody(body)) = data.body.as_ref() {
println!(" - {} {} {}", hash, msg.fid(), body.text);
} else {
panic!("Body is not CastAddBody");
}
}

_ => println!(" - {}", hash),
}
}
55 changes: 55 additions & 0 deletions src/perf/gen_multi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use crate::perf::generate::{MessageGenerator, NextMessage};
use crate::proto;
use crate::storage::store::test_helper;
use crate::utils::cli;
use crate::utils::factory::events_factory;
use ed25519_dalek::SigningKey;
use rand::Rng;
use std::collections::HashSet;

pub struct MultiUser {
initialized_fids: HashSet<u32>,
private_key: SigningKey,
thread_id: u32,
}

impl MultiUser {
pub fn new(thread_id: u32) -> Self {
Self {
initialized_fids: HashSet::new(),
private_key: test_helper::default_signer(),
thread_id,
}
}
}

impl MessageGenerator for MultiUser {
fn next(&mut self, seq: u64) -> Vec<NextMessage> {
let mut rng = rand::thread_rng();

let fid: u32 = rng.gen_range(1..=5000) + 1_000_000 * self.thread_id;
let mut messages = Vec::new();

// If the FID has not been initialized, return initial messages
if !self.initialized_fids.contains(&fid) {
let private_key = self.private_key.clone();

messages.push(NextMessage::OnChainEvent(cli::compose_rent_event(fid)));
messages.push(NextMessage::OnChainEvent(
events_factory::create_id_register_event(fid, proto::IdRegisterEventType::Register),
));
messages.push(NextMessage::OnChainEvent(
events_factory::create_signer_event(fid, private_key, proto::SignerEventType::Add),
));

self.initialized_fids.insert(fid);
}

// Add a benchmarking message
let text = format!("For benchmarking {}", seq);
let msg = cli::compose_message(fid, &text, None, Some(&self.private_key));
messages.push(NextMessage::Message(msg));

messages
}
}
53 changes: 53 additions & 0 deletions src/perf/gen_single.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use crate::perf::generate::{MessageGenerator, NextMessage};
use crate::proto;
use crate::storage::store::test_helper;
use crate::utils::cli;
use crate::utils::factory::events_factory;
use ed25519_dalek::SigningKey;

pub struct SingleUser {
private_key: SigningKey,
initialized: bool,
thread_id: u32,
}

impl SingleUser {
pub fn new(thread_id: u32) -> Self {
Self {
initialized: false,
private_key: test_helper::default_signer(),
thread_id,
}
}
}

impl MessageGenerator for SingleUser {
fn next(&mut self, seq: u64) -> Vec<NextMessage> {
let mut messages = Vec::new();

let fid = self.thread_id * 1_000_000;

if !self.initialized {
self.initialized = true;
messages.push(NextMessage::OnChainEvent(cli::compose_rent_event(fid)));
messages.push(NextMessage::OnChainEvent(
events_factory::create_id_register_event(fid, proto::IdRegisterEventType::Register),
));
messages.push(NextMessage::OnChainEvent(
events_factory::create_signer_event(
fid,
self.private_key.clone(),
proto::SignerEventType::Add,
),
));
}

let text = format!("For benchmarking {}", seq);
let msg = cli::compose_message(fid, &text, None, Some(&self.private_key));
messages.push(NextMessage::Message(msg));

messages
}
}

pub fn assert_send<T: Send>() {}
25 changes: 25 additions & 0 deletions src/perf/generate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use crate::perf::gen_multi::MultiUser;
use crate::perf::gen_single::SingleUser;
use crate::proto;

pub enum NextMessage {
OnChainEvent(proto::OnChainEvent),
Message(proto::Message),
}

pub trait MessageGenerator: Send {
fn next(&mut self, seq: u64) -> Vec<NextMessage>;
}

#[derive(Clone)]
pub enum GeneratorTypes {
SingleUser,
MultiUser,
}

pub fn new_generator(typ: GeneratorTypes, thread_id: u32) -> Box<dyn MessageGenerator> {
match typ {
GeneratorTypes::SingleUser => Box::new(SingleUser::new(thread_id)),
GeneratorTypes::MultiUser => Box::new(MultiUser::new(thread_id)),
}
}
3 changes: 3 additions & 0 deletions src/perf/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
pub mod engine_only_perftest;
mod gen_multi;
mod gen_single;
pub mod generate;
pub mod perftest;
106 changes: 44 additions & 62 deletions src/perf/perftest.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
use crate::perf::gen_single::SingleUser;
use crate::perf::generate::{new_generator, GeneratorTypes};
use crate::perf::{gen_single, generate};
use crate::proto;
use crate::proto::hub_service_client::HubServiceClient;
use crate::proto::Block;
use crate::storage::store::test_helper;
use crate::utils::cli::send_on_chain_event;
use crate::utils::cli::{compose_message, follow_blocks, send_message};
use crate::utils::factory::events_factory;
use crate::{
consensus::proposer::current_time, proto::admin_service_client::AdminServiceClient,
utils::cli::compose_rent_event,
};
use crate::utils::cli::{follow_blocks, send_message};
use crate::{consensus::proposer::current_time, proto::admin_service_client::AdminServiceClient};
use clap::Parser;
use ed25519_dalek::{SecretKey, SigningKey};
use figment::{
providers::{Env, Format, Toml},
Figment,
};
use hex;
use hex::FromHex;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};
use std::error::Error;
use std::path::Path;
use std::sync::{atomic, Arc};
Expand Down Expand Up @@ -75,76 +71,59 @@ pub fn load_and_merge_config(config_path: &str) -> Result<Config, Box<dyn Error>
fn start_submit_messages(
messages_tx: mpsc::Sender<proto::Message>,
config: Config,
gen_type: GeneratorTypes,
) -> Vec<tokio::task::JoinHandle<()>> {
gen_single::assert_send::<SingleUser>();

let mut submit_message_handles = vec![];
const FID: u32 = 6833;

let i = Arc::new(atomic::AtomicU64::new(1));
let seq = Arc::new(atomic::AtomicU64::new(0));

for rpc_addr in config.submit_message.rpc_addrs {
for (index, rpc_addr) in config.submit_message.rpc_addrs.into_iter().enumerate() {
let thread_id = (index + 1) as u32;
let messages_tx = messages_tx.clone();
let private_key = SigningKey::from_bytes(
&SecretKey::from_hex(
"1000000000000000000000000000000000000000000000000000000000000000",
)
.unwrap(),
);
let seq = seq.clone();
let gen_type = gen_type.clone();

let i = i.clone();
submit_message_handles.push(tokio::spawn(async move {
let mut generator = new_generator(gen_type, thread_id);

let submit_message_handle = tokio::spawn(async move {
let mut submit_message_timer = time::interval(config.submit_message.interval);

println!("connecting to {}", &rpc_addr);
let mut client = match HubServiceClient::connect(rpc_addr.clone()).await {
Ok(client) => client,
Err(e) => {
panic!("Error connecting to {}: {}", &rpc_addr, e);
}
};

let mut admin_client = match AdminServiceClient::connect(rpc_addr.clone()).await {
Ok(client) => client,
Err(e) => {
panic!("Error connecting to {}: {}", &rpc_addr, e);
}
};

let rent_event = compose_rent_event(FID);
send_on_chain_event(&mut admin_client, rent_event)
.await
.unwrap();

let id_register_event =
events_factory::create_id_register_event(FID, proto::IdRegisterEventType::Register);

send_on_chain_event(&mut admin_client, id_register_event)
let mut client = HubServiceClient::connect(rpc_addr.clone())
.await
.unwrap();
.unwrap_or_else(|e| panic!("Error connecting to {}: {}", &rpc_addr, e));

let signer_event = events_factory::create_signer_event(
FID,
test_helper::default_signer(),
proto::SignerEventType::Add,
);

send_on_chain_event(&mut admin_client, signer_event)
let mut admin_client = AdminServiceClient::connect(rpc_addr.clone())
.await
.unwrap();
.unwrap_or_else(|e| panic!("Error connecting to {}: {}", &rpc_addr, e));

let mut message_queue: VecDeque<generate::NextMessage> = VecDeque::new();
loop {
submit_message_timer.tick().await;
let seq = seq.fetch_add(1, atomic::Ordering::SeqCst);

let current_i = i.fetch_add(1, atomic::Ordering::SeqCst);
let text = format!("For benchmarking {}", current_i);
if message_queue.is_empty() {
let msgs = generator.next(seq);
message_queue.extend(msgs);
}

let msg = compose_message(FID, text.as_str(), None, Some(&private_key));
let message = send_message(&mut client, &msg).await.unwrap();
messages_tx.send(message).await.unwrap();
}
});
let msg = message_queue.pop_front().expect("message queue was empty");

submit_message_handles.push(submit_message_handle);
match msg {
generate::NextMessage::Message(message) => {
let response = send_message(&mut client, &message).await.unwrap();
messages_tx.send(response).await.unwrap();
}
generate::NextMessage::OnChainEvent(event) => {
send_on_chain_event(&mut admin_client, &event)
.await
.expect("Failed to send on-chain event");
}
}
}
}));
}

submit_message_handles
Expand All @@ -159,14 +138,17 @@ pub async fn run() -> Result<(), Box<dyn Error>> {
process::exit(1);
}));

let gen_type = GeneratorTypes::MultiUser;

let cli_args = CliArgs::parse();
let cfg = load_and_merge_config(&cli_args.config_path)?;

println!("Starting scenario {:#?}", cfg);
let (blocks_tx, mut blocks_rx) = mpsc::channel::<Block>(10_000_000);

let (messages_tx, mut messages_rx) = mpsc::channel::<proto::Message>(10_000_000);
let submit_message_handles = start_submit_messages(messages_tx, cfg.clone());

let submit_message_handles = start_submit_messages(messages_tx, cfg.clone(), gen_type);

let follow_blocks_handle = tokio::spawn(async move {
if !cfg.follow_blocks.enable {
Expand Down
2 changes: 1 addition & 1 deletion src/utils/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub async fn send_message(

pub async fn send_on_chain_event(
client: &mut AdminServiceClient<Channel>,
onchain_event: OnChainEvent,
onchain_event: &OnChainEvent,
) -> Result<OnChainEvent, Box<dyn Error>> {
let request = tonic::Request::new(onchain_event.clone());
let response = client.submit_on_chain_event(request).await?;
Expand Down

0 comments on commit 87a9cd5

Please sign in to comment.