Skip to content

Commit

Permalink
shard routing (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
suurkivi authored Dec 7, 2024
1 parent 71a04c0 commit af9e7ae
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 57 deletions.
5 changes: 4 additions & 1 deletion config/sample.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
log_format = "json"

[statsd]
prefix = "snaptest"

[fnames]
disable = false
stop_at = 9999999999
stop_at = 200
start_from = 0
url = "https://fnames.farcaster.xyz/transfers"

Expand Down
10 changes: 7 additions & 3 deletions src/bin/follow_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@ async fn main() {
println!("Block {}", block_number);

for chunk in &block.shard_chunks {
let height = chunk.header.as_ref().unwrap().height.unwrap();
let shard_id = height.shard_index;
let shard_height = height.block_number;

for tx in &chunk.transactions {
for msg in &tx.user_messages {
show_msg(msg);
show_msg(shard_id, shard_height, msg);
}
}
}
}
}

fn show_msg(msg: &Message) {
fn show_msg(shard_id: u32, _shard_height: u64, msg: &Message) {
let hash = hex::encode(&msg.hash);

let data = match msg.data.as_ref() {
Expand All @@ -58,7 +62,7 @@ fn show_msg(msg: &Message) {
match mt {
MessageType::CastAdd => {
if let Some(message_data::Body::CastAddBody(body)) = data.body.as_ref() {
println!(" - {} {} {}", hash, msg.fid(), body.text);
println!(" - {} {} {} {}", hash, shard_id, msg.fid(), body.text);
} else {
panic!("Body is not CastAddBody");
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod cfg;
pub mod connectors;
pub mod consensus;
pub mod core;
pub mod mempool;
pub mod network;
pub mod node;
pub mod perf;
Expand Down
7 changes: 6 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
)
.await;

let admin_service = MyAdminService::new(db_manager, node.shard_senders.clone());
let admin_service = MyAdminService::new(
db_manager,
node.shard_senders.clone(),
app_config.consensus.num_shards,
);

let rpc_shard_stores = node.shard_stores.clone();
let rpc_shard_senders = node.shard_senders.clone();
Expand All @@ -166,6 +170,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
rpc_shard_stores,
rpc_shard_senders,
statsd_client.clone(),
app_config.consensus.num_shards,
);

let resp = Server::builder()
Expand Down
1 change: 1 addition & 0 deletions src/mempool/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod routing;
7 changes: 7 additions & 0 deletions src/mempool/routing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use sha2::{Digest, Sha256};

pub fn route_message(fid: u32, num_shards: u32) -> u32 {
let hash = Sha256::digest(fid.to_be_bytes());
let hash_u32 = u32::from_be_bytes(hash[..4].try_into().unwrap());
(hash_u32 % num_shards) + 1
}
39 changes: 31 additions & 8 deletions src/network/admin_server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::mempool::routing;
use crate::proto::admin_service_server::AdminService;
use crate::proto::ValidatorMessage;
use crate::proto::{self, OnChainEvent};
Expand All @@ -6,7 +7,6 @@ use rocksdb;
use std::collections::HashMap;
use std::{io, path, process};
use thiserror::Error;
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};
use tracing::{info, warn};

Expand Down Expand Up @@ -62,7 +62,8 @@ impl DbManager {

pub struct MyAdminService {
db_manager: DbManager,
message_tx: mpsc::Sender<MempoolMessage>,
num_shards: u32,
pub shard_senders: HashMap<u32, Senders>,
}

#[derive(Debug, Error)]
Expand All @@ -77,12 +78,15 @@ pub enum AdminServiceError {
const DB_DESTROY_KEY: &[u8] = b"__destroy_all_databases_on_start__";

impl MyAdminService {
pub fn new(db_manager: DbManager, shard_senders: HashMap<u32, Senders>) -> Self {
// TODO(aditi): This logic will change once a mempool exists
let message_tx = shard_senders.get(&1u32).unwrap().messages_tx.clone();
pub fn new(
db_manager: DbManager,
shard_senders: HashMap<u32, Senders>,
num_shards: u32,
) -> Self {
Self {
db_manager,
message_tx,
shard_senders,
num_shards,
}
}
}
Expand Down Expand Up @@ -122,13 +126,32 @@ impl AdminService for MyAdminService {

let onchain_event = request.into_inner();

let result = self
.message_tx
let fid = onchain_event.fid as u32;
if fid == 0 {
return Err(Status::invalid_argument(
"no fid or invalid fid".to_string(),
));
}

let dst_shard = routing::route_message(fid, self.num_shards);

let sender = match self.shard_senders.get(&dst_shard) {
Some(sender) => sender,
None => {
return Err(Status::invalid_argument(
"no shard sender for fid".to_string(),
))
}
};

let result = sender
.messages_tx
.send(MempoolMessage::ValidatorMessage(ValidatorMessage {
on_chain_event: Some(onchain_event.clone()),
fname_transfer: None,
}))
.await;

match result {
Ok(()) => {
let response = Response::new(onchain_event);
Expand Down
155 changes: 114 additions & 41 deletions src/network/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashMap;

use crate::core::error::HubError;
use crate::mempool::routing;
use crate::proto;
use crate::proto::hub_service_server::HubService;
use crate::proto::Block;
Expand All @@ -12,6 +11,7 @@ use crate::storage::store::stores::{StoreLimits, Stores};
use crate::storage::store::BlockStore;
use crate::utils::statsd_wrapper::StatsdClientWrapper;
use hex::ToHex;
use std::collections::HashMap;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
Expand All @@ -21,7 +21,7 @@ pub struct MyHubService {
block_store: BlockStore,
shard_stores: HashMap<u32, Stores>,
shard_senders: HashMap<u32, Senders>,
message_tx: mpsc::Sender<MempoolMessage>,
num_shards: u32,
statsd_client: StatsdClientWrapper,
}

Expand All @@ -31,54 +31,71 @@ impl MyHubService {
shard_stores: HashMap<u32, Stores>,
shard_senders: HashMap<u32, Senders>,
statsd_client: StatsdClientWrapper,
num_shards: u32,
) -> Self {
// TODO(aditi): This logic will change once a mempool exists
let message_tx = shard_senders.get(&1u32).unwrap().messages_tx.clone();

Self {
block_store,
shard_senders,
shard_stores,
message_tx,
statsd_client,
num_shards,
}
}
}

#[tonic::async_trait]
impl HubService for MyHubService {
async fn submit_message(
async fn submit_message_internal(
&self,
request: Request<proto::Message>,
) -> Result<Response<proto::Message>, Status> {
let start_time = std::time::Instant::now();
message: proto::Message,
bypass_validation: bool,
) -> Result<proto::Message, Status> {
let fid = message.fid();
if fid == 0 {
return Err(Status::invalid_argument(
"no fid or invalid fid".to_string(),
));
}

let hash = request.get_ref().hash.encode_hex::<String>();
info!(hash, "Received call to [submit_message] RPC");
let dst_shard = routing::route_message(fid, self.num_shards);

let message = request.into_inner();
let sender = match self.shard_senders.get(&dst_shard) {
Some(sender) => sender,
None => {
return Err(Status::invalid_argument(
"no shard sender for fid".to_string(),
))
}
};

let stores = self.shard_stores.get(&1u32).unwrap();
// TODO: This is a hack to get around the fact that self cannot be made mutable
let mut readonly_engine = ShardEngine::new(
stores.db.clone(),
stores.trie.clone(),
1,
StoreLimits::default(),
self.statsd_client.clone(),
100,
);
let result = readonly_engine.simulate_message(&message);
let stores = match self.shard_stores.get(&dst_shard) {
Some(sender) => sender,
None => {
return Err(Status::invalid_argument(
"no shard store for fid".to_string(),
))
}
};

if let Err(err) = result {
return Err(Status::invalid_argument(format!(
"Invalid message: {}",
err.to_string()
)));
if !bypass_validation {
// TODO: This is a hack to get around the fact that self cannot be made mutable
let mut readonly_engine = ShardEngine::new(
stores.db.clone(),
stores.trie.clone(),
1,
StoreLimits::default(),
self.statsd_client.clone(),
100,
);
let result = readonly_engine.simulate_message(&message);

if let Err(err) = result {
return Err(Status::invalid_argument(format!(
"Invalid message: {}",
err.to_string()
)));
}
}

let result = self
.message_tx
let result = sender
.messages_tx
.send(MempoolMessage::UserMessage(message.clone()))
.await;

Expand All @@ -94,14 +111,70 @@ impl HubService for MyHubService {
}
}

let elapsed = start_time.elapsed().as_millis();
Ok(message)
}
}

#[tonic::async_trait]
impl HubService for MyHubService {
async fn submit_message_with_options(
&self,
request: Request<proto::SubmitMessageRequest>,
) -> Result<Response<proto::SubmitMessageResponse>, Status> {
let start_time = std::time::Instant::now();

let hash = request
.get_ref()
.message
.as_ref()
.map(|msg| msg.hash.encode_hex::<String>())
.unwrap_or_default();
info!(%hash, "Received call to [submit_message_with_options] RPC");

let proto::SubmitMessageRequest {
message,
bypass_validation,
} = request.into_inner();

let message = match message {
Some(msg) => msg,
None => return Err(Status::invalid_argument("Message is required")),
};

let response_message = self
.submit_message_internal(message, bypass_validation.unwrap_or(false))
.await?;

let response = proto::SubmitMessageResponse {
message: Some(response_message),
};

self.statsd_client.time(
"rpc.submit_message_with_options.duration",
start_time.elapsed().as_millis() as u64,
);

Ok(Response::new(response))
}

async fn submit_message(
&self,
request: Request<proto::Message>,
) -> Result<Response<proto::Message>, Status> {
let start_time = std::time::Instant::now();

let hash = request.get_ref().hash.encode_hex::<String>();
info!(hash, "Received call to [submit_message] RPC");

let response = Response::new(message);
let message = request.into_inner();
let response_message = self.submit_message_internal(message, false).await?;

self.statsd_client
.time("rpc.submit_message.duration", elapsed as u64);
self.statsd_client.time(
"rpc.submit_message.duration",
start_time.elapsed().as_millis() as u64,
);

Ok(response)
Ok(Response::new(response_message))
}

type GetBlocksStream = ReceiverStream<Result<Block, Status>>;
Expand Down Expand Up @@ -198,7 +271,7 @@ impl HubService for MyHubService {
// TODO(aditi): Rethink the channel size
let (server_tx, client_rx) = mpsc::channel::<Result<HubEvent, Status>>(100);
let events_txs = match request.get_ref().shard_index {
Some(shard_id) => match self.shard_senders.get(&(shard_id as u32)) {
Some(shard_id) => match self.shard_senders.get(&(shard_id)) {
None => {
return Err(Status::from_error(Box::new(
HubError::invalid_internal_state("Missing shard event tx"),
Expand Down
2 changes: 2 additions & 0 deletions src/network/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ mod tests {
let shard2_senders = Senders::new(msgs_tx.clone());
let stores = HashMap::from([(1, shard1_stores), (2, shard2_stores)]);
let senders = HashMap::from([(1, shard1_senders), (2, shard2_senders)]);
let num_shards = senders.len() as u32;

(
stores.clone(),
Expand All @@ -118,6 +119,7 @@ mod tests {
stores,
senders,
statsd_client,
num_shards,
),
)
}
Expand Down
Loading

0 comments on commit af9e7ae

Please sign in to comment.