Skip to content

Commit

Permalink
add p2p
Browse files Browse the repository at this point in the history
  • Loading branch information
sergerad committed Nov 12, 2024
1 parent e755625 commit 3369ef0
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[workspace]

resolver = "2"
members = ["rollup", "script", "sequencer", "p2p"]
members = ["rollup", "script", "sequencer", "p2p", "rpc"]

[workspace.dependencies]
alloy-primitives = { version = "0.8.0", features = ["rand", "serde"] }
Expand Down
55 changes: 48 additions & 7 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::collections::hash_map::DefaultHasher;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use tokio::{io, select};
use tokio::sync::mpsc::Receiver;
use tokio::{io, select, task};

pub use gossipsub::Message as GossipMessage;

Expand Down Expand Up @@ -72,6 +73,50 @@ impl Network {
Ok(Self { swarm })
}

pub fn start(mut outbound: Receiver<(Vec<u8>, String)>) -> Receiver<GossipMessage> {
let (tx, rx) = tokio::sync::mpsc::channel(32);
let mut network = Network::new().unwrap();
task::spawn(async move {
loop {
select! {
Some((data, topic)) = outbound.recv() => {
let topic = gossipsub::IdentTopic::new(topic);
if let Err(e) = network.swarm.behaviour_mut().gossipsub.publish(topic, data) {
println!("Failed to publish message: {e}");
}
},
event = network.swarm.select_next_some() => match event {
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, _multiaddr) in list {
println!("mDNS discovered a new peer: {peer_id}");
network.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
for (peer_id, _multiaddr) in list {
println!("mDNS discover peer has expired: {peer_id}");
network.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: _peer_id,
message_id: _id,
message,
})) => {
tx.send(message).await.unwrap();
},
SwarmEvent::NewListenAddr { address, .. } => {
println!("Local node is listening on {address}");
}
_ => {
}
}
}
}
});
rx
}

pub async fn poll(&mut self) -> Result<Option<GossipMessage>, Box<dyn Error>> {
select! {
event = self.swarm.select_next_some() => match event {
Expand All @@ -90,14 +135,10 @@ impl Network {
Ok(None)
},
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
message_id: id,
propagation_source: _peer_id,
message_id: _id,
message,
})) => {
println!(
"poll Got message: '{}' with id: {id} from peer: {peer_id}",
String::from_utf8_lossy(&message.data),
);
Ok(Some(message))
},
SwarmEvent::NewListenAddr { address, .. } => {
Expand Down
49 changes: 45 additions & 4 deletions rollup/src/sequencer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
use std::sync::Arc;

use log::info;
use tokio::sync::Mutex;
use p2p::GossipMessage;
use serde_json::json;
use tokio::{
sync::{
mpsc::{Receiver, Sender},
Mutex,
},
task,
};

use crate::{
Block, BlockHeader, Blockchain, SignedBlockHeader, SignedTransaction, Signer, Transaction,
Expand All @@ -10,16 +18,30 @@ use crate::{

pub struct TransactionSubmitter {
transactions_pool: Arc<Mutex<Vec<SignedTransaction>>>,
outbound: Sender<(Vec<u8>, String)>,
}

impl TransactionSubmitter {
pub fn new(transactions_pool: Arc<Mutex<Vec<SignedTransaction>>>) -> Self {
TransactionSubmitter { transactions_pool }
pub fn new(
transactions_pool: Arc<Mutex<Vec<SignedTransaction>>>,
outbound: Sender<(Vec<u8>, String)>,
) -> Self {
TransactionSubmitter {
transactions_pool,
outbound,
}
}

pub async fn submit(&self, transaction: SignedTransaction) {
let transactions_pool = self.transactions_pool.clone();
transactions_pool.lock().await.push(transaction);
transactions_pool.lock().await.push(transaction.clone());
self.outbound
.send((
json!(transaction).to_string().as_bytes().to_vec(),
"transactions".to_string(),
))
.await
.unwrap();
}
}

Expand All @@ -44,7 +66,26 @@ impl Sequencer {
signer: impl Into<Signer>,
transactions_pool: Arc<Mutex<Vec<SignedTransaction>>>,
blockchain: Arc<Mutex<Blockchain>>,
mut inbound: Receiver<GossipMessage>,
) -> Self {
let tx_pool = transactions_pool.clone();
task::spawn(async move {
loop {
if let Some(msg) = inbound.recv().await {
match msg.topic.as_str() {
"transactions" => {
let transaction: SignedTransaction =
serde_json::from_slice(&msg.data).unwrap();
tx_pool.lock().await.push(transaction);
}
"block" => {
unimplemented!("Handle incoming blocks");
}
_ => {}
}
}
}
});
Sequencer {
signer: signer.into(),
transactions_pool,
Expand Down
16 changes: 16 additions & 0 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "rpc"
version = "0.1.0"
edition = "2021"

[dependencies]
bincode = "1.3"
clap = { version = "4.0", features = ["env"] }
rocket = { version = "0.5", features = ["json"] }
rollup = { package = "rollup", path = "../rollup", version = "0.1.0" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10"
env_logger = { workspace = true }
tokio = { version = "1", features = ["full"] }
p2p = { package = "p2p", path = "../p2p", version = "0.1.0" }
67 changes: 67 additions & 0 deletions rpc/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#[macro_use]
extern crate rocket;

use std::sync::Arc;

use rocket::State;
use rocket::{serde::json::Json, Config};
use rollup::{Blockchain, SignedTransaction, TransactionSubmitter};
use serde_json::{json, Value};
use tokio::sync::Mutex;

/// Accepts a transaction and adds it to the respective transaction pools.
#[post("/", data = "<payload>")]
async fn submit(
submitter: &State<TransactionSubmitter>,
payload: Json<SignedTransaction>,
) -> Value {
// Extract the transaction from the payload.
let transaction = payload.into_inner();
let tx_digest = transaction.transaction.hash();

// Add the transaction to the pool.
submitter.submit(transaction).await;

// Respond with the transaction digest.
json!({ "tx_digest": tx_digest.to_string() })
}

/// Returns the head block of the blockchain.
#[get("/")]
async fn head(chain: &State<Arc<Mutex<Blockchain>>>) -> Value {
// Retrieve the head block from the sequencer and return it.
let head = chain.lock().await.head();
json!(head)
}

#[launch]
#[tokio::main]
async fn rocket() -> _ {
env_logger::init();
// Set up sequencer.
let pool = Arc::new(tokio::sync::Mutex::new(vec![]));
let chain = Arc::new(tokio::sync::Mutex::new(Blockchain::default()));
let (tx_out, rx_out) = tokio::sync::mpsc::channel::<(Vec<u8>, String)>(32);
let mut rx_in = p2p::Network::start(rx_out);
let submitter = TransactionSubmitter::new(pool, tx_out);

// Spawn block producing sequencer task.
tokio::task::spawn(async move {
loop {
let msg = rx_in.recv().await.unwrap();
println!("RPC Received message: {:?}", msg);
}
});

// Launch the HTTP server.
let mut config = Config {
log_level: rocket::config::LogLevel::Critical,
..Config::debug_default()
};
config.port = 8001;
rocket::build()
.configure(config)
.mount("/", routes![submit, head])
.manage(submitter)
.manage(chain)
}
21 changes: 20 additions & 1 deletion script/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use tokio::process::Command;

/// Specifies the anticipated URL that the sequencer will listen on.
const SEQUENCER_URL: &str = "127.0.0.1:8000";
const RPC_URL: &str = "127.0.0.1:8001";

/// Runs the sequencer process and blocks on it's completion.
async fn run_sequencer(sk: SecretKey) {
Expand All @@ -22,10 +23,25 @@ async fn run_sequencer(sk: SecretKey) {
.expect("Failure while waiting for sequencer process");
}

async fn run_rpc() {
let mut sequencer = Command::new("cargo")
.arg("run")
.arg("--bin")
.arg("rpc")
.arg("--")
.kill_on_drop(true)
.spawn()
.expect("Failed to start sequencer process");
let _ = sequencer
.wait()
.await
.expect("Failure while waiting for sequencer process");
}

/// Sends the provided transaction to the sequencer and waits for the response.
async fn send_transaction(tx: SignedTransaction) -> Result<reqwest::Response, reqwest::Error> {
reqwest::Client::new()
.post(&format!("http://{}/", SEQUENCER_URL))
.post(&format!("http://{}/", RPC_URL))
.json(&tx)
.send()
.await
Expand Down Expand Up @@ -105,6 +121,9 @@ async fn main() {
tokio::spawn(async move {
run_sequencer(sk).await;
});
tokio::spawn(async move {
run_rpc().await;
});

// Continuously check the head block.
tokio::spawn(async move {
Expand Down
1 change: 1 addition & 0 deletions sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ serde_json = "1.0"
sha2 = "0.10"
env_logger = { workspace = true }
tokio = { version = "1", features = ["full"] }
p2p = { package = "p2p", path = "../p2p", version = "0.1.0" }
6 changes: 4 additions & 2 deletions sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ async fn rocket() -> _ {
let sk = std::env::var("KEY").unwrap();
let pool = Arc::new(tokio::sync::Mutex::new(vec![]));
let chain = Arc::new(tokio::sync::Mutex::new(Blockchain::default()));
let mut sequencer = Sequencer::new(sk.as_str(), pool.clone(), chain.clone());
let submitter = TransactionSubmitter::new(pool);
let (tx_out, rx_out) = tokio::sync::mpsc::channel::<(Vec<u8>, String)>(32);
let rx_in = p2p::Network::start(rx_out);
let mut sequencer = Sequencer::new(sk.as_str(), pool.clone(), chain.clone(), rx_in);
let submitter = TransactionSubmitter::new(pool, tx_out);

// Spawn block producing sequencer task.
tokio::task::spawn(async move {
Expand Down

0 comments on commit 3369ef0

Please sign in to comment.