Skip to content

Commit

Permalink
submit_message: apply backpressure (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
suurkivi authored Dec 9, 2024
1 parent 92d5cd4 commit 01a9ca3
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 18 deletions.
2 changes: 1 addition & 1 deletion config/perftest-sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ rpc_addrs = [
"http://127.0.0.12:3383",
"http://127.0.0.13:3383"
]
interval = "100ms"
interval = "2500us"

[follow_blocks]
rpc_addr = "http://127.0.0.11:3383"
Expand Down
3 changes: 3 additions & 0 deletions config/sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ url = "https://fnames.farcaster.xyz/transfers"
[consensus]
num_shards = 1
shard_ids = [1]

[mempool]
queue_size = 500
4 changes: 3 additions & 1 deletion src/cfg.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{connectors, consensus, network};
use crate::{connectors, consensus, mempool, network};
use clap::Parser;
use figment::{
providers::{Env, Format, Serialized, Toml},
Expand Down Expand Up @@ -32,6 +32,7 @@ pub struct Config {
pub onchain_events: connectors::onchain_events::Config,
pub consensus: consensus::consensus::Config,
pub gossip: network::gossip::Config,
pub mempool: mempool::mempool::Config,
pub rpc_address: String,
pub rocksdb_dir: String,
pub clear_db: bool,
Expand All @@ -47,6 +48,7 @@ impl Default for Config {
onchain_events: connectors::onchain_events::Config::default(),
consensus: consensus::consensus::Config::default(),
gossip: network::gossip::Config::default(),
mempool: mempool::mempool::Config::default(),
rpc_address: "0.0.0.0:3383".to_string(),
rocksdb_dir: ".rocks".to_string(),
clear_db: false,
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let node = SnapchainNode::create(
keypair.clone(),
app_config.consensus.clone(),
app_config.mempool.clone(),
Some(app_config.rpc_address.clone()),
gossip_tx.clone(),
None,
Expand Down
12 changes: 12 additions & 0 deletions src/mempool/mempool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub queue_size: u32,
}

impl Default for Config {
fn default() -> Self {
Self { queue_size: 500 }
}
}
1 change: 1 addition & 0 deletions src/mempool/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod mempool;
pub mod routing;
16 changes: 10 additions & 6 deletions src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl MyHubService {
};

let stores = match self.shard_stores.get(&dst_shard) {
Some(sender) => sender,
Some(store) => store,
None => {
return Err(Status::invalid_argument(
"no shard store for fid".to_string(),
Expand All @@ -83,6 +83,7 @@ impl MyHubService {
StoreLimits::default(),
self.statsd_client.clone(),
100,
200,
);
let result = readonly_engine.simulate_message(&message);

Expand All @@ -94,16 +95,19 @@ impl MyHubService {
}
}

let result = sender
match sender
.messages_tx
.send(MempoolMessage::UserMessage(message.clone()))
.await;

match result {
.try_send(MempoolMessage::UserMessage(message.clone()))
{
Ok(_) => {
self.statsd_client.count("rpc.submit_message.success", 1);
info!("successfully submitted message");
}
Err(mpsc::error::TrySendError::Full(_)) => {
self.statsd_client
.count("rpc.submit_message.channel_full", 1);
return Err(Status::resource_exhausted("channel is full"));
}
Err(e) => {
self.statsd_client.count("rpc.submit_message.failure", 1);
info!("error sending: {:?}", e.to_string());
Expand Down
3 changes: 3 additions & 0 deletions src/node/snapchain_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::core::types::{
Address, Height, ShardId, SnapchainShard, SnapchainValidator, SnapchainValidatorContext,
SnapchainValidatorSet,
};
use crate::mempool::mempool;
use crate::network::gossip::GossipEvent;
use crate::proto::{Block, ShardChunk};
use crate::storage::db::RocksDB;
Expand Down Expand Up @@ -36,6 +37,7 @@ impl SnapchainNode {
pub async fn create(
keypair: Keypair,
config: Config,
mempool_config: mempool::Config,
rpc_address: Option<String>,
gossip_tx: mpsc::Sender<GossipEvent<SnapchainValidatorContext>>,
block_tx: Option<mpsc::Sender<Block>>,
Expand Down Expand Up @@ -92,6 +94,7 @@ impl SnapchainNode {
StoreLimits::default(),
statsd_client.clone(),
config.max_messages_per_block,
mempool_config.queue_size,
);

shard_senders.insert(shard_id, engine.get_senders());
Expand Down
31 changes: 22 additions & 9 deletions src/perf/perftest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,37 @@ fn start_submit_messages(
message_queue.extend(msgs);
}

let msg = message_queue.pop_front().expect("message queue was empty");
let msg = message_queue.front().expect("message queue was empty");

match msg {
generate::NextMessage::Message(message) => {
let response = client
.submit_message_with_options(proto::SubmitMessageRequest {
message: Some(message),
message: Some(message.clone()),
bypass_validation: Some(true),
})
.await
.unwrap();
let sent = response.into_inner().message.unwrap();
messages_tx.send(sent).await.unwrap();
.await;

match response {
Ok(resp) => {
let sent = resp.into_inner().message.unwrap();
messages_tx.send(sent).await.unwrap();
message_queue.pop_front(); // Remove message only if successfully sent
}
Err(status) if status.code() == tonic::Code::ResourceExhausted => {
// TODO: emit metrics
}
Err(e) => {
panic!("Unexpected error: {:?}", e); // Handle other errors as needed
}
}
}
generate::NextMessage::OnChainEvent(event) => {
send_on_chain_event(&mut admin_client, &event)
.await
.expect("Failed to send on-chain event");
if let Err(e) = send_on_chain_event(&mut admin_client, &event).await {
panic!("Failed to send on-chain event: {:?}", e);
} else {
message_queue.pop_front(); // Remove event if successfully sent
}
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/storage/store/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ impl ShardEngine {
store_limits: StoreLimits,
statsd_client: StatsdClientWrapper,
max_messages_per_block: u32,
mempool_queue_size: u32,
) -> ShardEngine {
// TODO: adding the trie here introduces many calls that want to return errors. Rethink unwrap strategy.
let (messages_tx, messages_rx) = mpsc::channel::<MempoolMessage>(10_000);
let (messages_tx, messages_rx) =
mpsc::channel::<MempoolMessage>(mempool_queue_size as usize);
ShardEngine {
shard_id,
stores: Stores::new(db.clone(), trie, store_limits),
Expand Down
1 change: 1 addition & 0 deletions src/storage/store/test_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub fn new_engine_with_options(options: EngineOptions) -> (ShardEngine, tempfile
test_limits,
statsd_client,
256,
256 * 2,
),
dir,
)
Expand Down
2 changes: 2 additions & 0 deletions tests/consensus_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;

use hex;
use libp2p::identity::ed25519::Keypair;
use snapchain::mempool::mempool;
use snapchain::network::server::MyHubService;
use snapchain::node::snapchain_node::SnapchainNode;
use snapchain::proto::hub_service_server::HubServiceServer;
Expand Down Expand Up @@ -71,6 +72,7 @@ impl NodeForTest {
let node = SnapchainNode::create(
keypair.clone(),
config,
mempool::Config::default(),
None,
gossip_tx,
Some(block_tx),
Expand Down

0 comments on commit 01a9ca3

Please sign in to comment.