From 9d38cc8126ce64a1732965c7982420f04a49467e Mon Sep 17 00:00:00 2001 From: dancoombs Date: Thu, 12 Oct 2023 12:06:32 -0400 Subject: [PATCH] feat(network): initial gossip implementation --- Cargo.lock | 91 ++++-- crates/network/Cargo.toml | 10 +- crates/network/src/behaviour.rs | 5 +- crates/network/src/gossip/message.rs | 130 ++++++++ crates/network/src/gossip/mod.rs | 31 ++ crates/network/src/gossip/snappy.rs | 62 ++++ crates/network/src/gossip/topic.rs | 131 ++++++++ crates/network/src/lib.rs | 5 + crates/network/src/network.rs | 363 ++++++++++++++++----- crates/network/src/rpc/handler/codec.rs | 14 +- crates/network/src/rpc/handler/inbound.rs | 3 +- crates/network/src/rpc/handler/outbound.rs | 9 +- crates/network/src/rpc/handler/serde.rs | 79 +---- crates/network/src/rpc/protocol.rs | 10 +- crates/network/src/types.rs | 81 +++++ crates/network/tests/test_integration.rs | 232 ++++++++++--- 16 files changed, 1012 insertions(+), 244 deletions(-) create mode 100644 crates/network/src/gossip/message.rs create mode 100644 crates/network/src/gossip/mod.rs create mode 100644 crates/network/src/gossip/snappy.rs create mode 100644 crates/network/src/gossip/topic.rs create mode 100644 crates/network/src/types.rs diff --git a/Cargo.lock b/Cargo.lock index ae4741b8f..00bb96186 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,7 +80,7 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ - "getrandom 0.2.8", + "getrandom 0.2.10", "once_cell", "version_check", ] @@ -507,9 +507,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.0" +version = "0.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" +checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" [[package]] name = "base64ct" @@ -938,7 +938,7 @@ dependencies = [ "bs58 0.4.0", "coins-core", "digest 0.10.7", - "getrandom 0.2.8", + "getrandom 0.2.10", "hmac 0.12.1", "k256", "lazy_static", @@ -955,7 +955,7 @@ checksum = "84f4d04ee18e58356accd644896aeb2094ddeafb6a713e056cef0c0a8e468c15" dependencies = [ "bitvec 0.17.4", "coins-bip32", - "getrandom 0.2.8", + "getrandom 0.2.10", "hmac 0.12.1", "once_cell", "pbkdf2 0.12.1", @@ -970,7 +970,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b949a1c63fb7eb591eb7ba438746326aedf0ae843e51ec92ba6bec5bb382c4f" dependencies = [ - "base64 0.21.0", + "base64 0.21.4", "bech32", "bs58 0.4.0", "digest 0.10.7", @@ -1718,7 +1718,7 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe81b5c06ecfdbc71dd845216f225f53b62a10cb8a16c946836a3467f701d05b" dependencies = [ - "base64 0.21.0", + "base64 0.21.4", "bytes", "ed25519-dalek", "hex", @@ -2092,7 +2092,7 @@ checksum = "b411b119f1cf0efb69e2190883dee731251882bb21270f893ee9513b3a697c48" dependencies = [ "async-trait", "auto_impl", - "base64 0.21.0", + "base64 0.21.4", "bytes", "enr 0.8.1", "ethers-core", @@ -2395,6 +2395,17 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-ticker" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9763058047f713632a52e916cc7f6a4b3fc6e9fc1ff8c5b1dc49e5a89041682e" +dependencies = [ + "futures", + "futures-timer", + "instant", +] + [[package]] name = "futures-timer" version = "3.0.2" @@ -2456,9 +2467,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.8" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" +checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", "js-sys", @@ -2652,6 +2663,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hex_fmt" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" + [[package]] name = "hkdf" version = "0.12.3" @@ -3343,12 +3360,13 @@ dependencies = [ "bytes", "futures", "futures-timer", - "getrandom 0.2.8", + "getrandom 0.2.10", "instant", "libp2p-allow-block-list", "libp2p-connection-limits", "libp2p-core", "libp2p-dns", + "libp2p-gossipsub", "libp2p-identify", "libp2p-identity", "libp2p-mdns", @@ -3430,6 +3448,38 @@ dependencies = [ "trust-dns-resolver", ] +[[package]] +name = "libp2p-gossipsub" +version = "0.45.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d157562dba6017193e5285acf6b1054759e83540bfd79f75b69d6ce774c88da" +dependencies = [ + "asynchronous-codec", + "base64 0.21.4", + "byteorder", + "bytes", + "either", + "fnv", + "futures", + "futures-ticker", + "getrandom 0.2.10", + "hex_fmt", + "instant", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "log", + "prometheus-client", + "quick-protobuf", + "quick-protobuf-codec", + "rand", + "regex", + "sha2 0.10.8", + "smallvec", + "unsigned-varint", + "void", +] + [[package]] name = "libp2p-identify" version = "0.43.0" @@ -3500,6 +3550,7 @@ checksum = "239ba7d28f8d0b5d77760dc6619c05c7e88e74ec8fbbe97f856f20a56745e620" dependencies = [ "instant", "libp2p-core", + "libp2p-gossipsub", "libp2p-identify", "libp2p-identity", "libp2p-ping", @@ -3898,7 +3949,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a4964177ddfdab1e3a2b37aec7cf320e14169abb0ed73999f558136409178d5" dependencies = [ - "base64 0.21.0", + "base64 0.21.4", "hyper", "indexmap 1.9.3", "ipnet", @@ -5082,7 +5133,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.8", + "getrandom 0.2.10", ] [[package]] @@ -5173,7 +5224,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" dependencies = [ - "getrandom 0.2.8", + "getrandom 0.2.10", "redox_syscall 0.2.16", "thiserror", ] @@ -5228,7 +5279,7 @@ version = "0.11.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" dependencies = [ - "base64 0.21.0", + "base64 0.21.4", "bytes", "encoding_rs", "futures-core", @@ -5462,11 +5513,13 @@ dependencies = [ "ethereum_ssz_derive", "ethers", "futures", + "getrandom 0.2.10", "hex", "libp2p", "libp2p-mplex", "rand", "rundler-types", + "sha2 0.10.8", "snap", "ssz_types", "thiserror", @@ -5854,7 +5907,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" dependencies = [ - "base64 0.21.0", + "base64 0.21.4", ] [[package]] @@ -6101,7 +6154,7 @@ version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ca3b16a3d82c4088f343b7480a93550b3eabe1a358569c2dfe38bbcead07237" dependencies = [ - "base64 0.21.0", + "base64 0.21.4", "chrono", "hex", "indexmap 1.9.3", @@ -6876,7 +6929,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64 0.21.0", + "base64 0.21.4", "bytes", "h2", "http", @@ -7282,7 +7335,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" dependencies = [ - "getrandom 0.2.8", + "getrandom 0.2.10", "serde", ] diff --git a/crates/network/Cargo.toml b/crates/network/Cargo.toml index 45a9885e3..68146abc8 100644 --- a/crates/network/Cargo.toml +++ b/crates/network/Cargo.toml @@ -12,12 +12,13 @@ rundler-types = { path = "../types" } ethereum_ssz = "0.5.3" ethereum_ssz_derive = "0.5.3" ethers.workspace = true -snap = "1.1.0" -ssz_types = "0.5.4" discv5 = { version = "0.3.1", features = ["libp2p"] } futures.workspace = true hex = "0.4.3" libp2p-mplex = "0.40.0" +sha2 = "0.10.8" +snap = "1.1.0" +ssz_types = "0.5.4" tokio.workspace = true tokio-io-timeout = "1.2.0" tokio-util = { workspace = true, features = ["codec", "compat"] } @@ -25,10 +26,13 @@ thiserror.workspace = true tracing.workspace = true unsigned-varint = { version = "0.7.2", features = ["codec"] } +# required version by gossipsub +getrandom = "0.2.10" + [dependencies.libp2p] version = "0.52.3" default-features = false -features = ["tokio", "noise", "macros", "tcp", "identify", "yamux", "secp256k1"] +features = ["tokio", "noise", "macros", "tcp", "identify", "yamux", "secp256k1", "gossipsub"] [dev-dependencies] rand.workspace = true diff --git a/crates/network/src/behaviour.rs b/crates/network/src/behaviour.rs index 628fbc5c2..9ad0c447e 100644 --- a/crates/network/src/behaviour.rs +++ b/crates/network/src/behaviour.rs @@ -16,12 +16,15 @@ use libp2p::{ swarm::{keep_alive, NetworkBehaviour}, }; -use crate::{discovery, rpc}; +use crate::{discovery, gossip, rpc}; + +pub(crate) type GossipSub = libp2p::gossipsub::Behaviour; #[derive(NetworkBehaviour)] pub(crate) struct Behaviour { // TODO(danc): temp, remove when not needed pub(crate) keep_alive: keep_alive::Behaviour, + pub(crate) gossipsub: GossipSub, // Request/response protocol pub(crate) rpc: rpc::Behaviour, // Discv5 based discovery protocol diff --git a/crates/network/src/gossip/message.rs b/crates/network/src/gossip/message.rs new file mode 100644 index 000000000..cb0c5e0f4 --- /dev/null +++ b/crates/network/src/gossip/message.rs @@ -0,0 +1,130 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +use ethers::types::{Address, H256, U256}; +use libp2p::gossipsub; +use rundler_types::UserOperation; +use sha2::{Digest, Sha256}; +use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; + +use super::topic::{Topic, TopicKind}; +use crate::types::{Encoding, UserOperationSsz}; + +const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [1, 0, 0, 0]; +//const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0, 0, 0, 0]; + +/// Gossipsub messages +#[derive(Clone, Debug)] +pub enum Message { + /// User operations with entrypoint gossip message + UserOperationsWithEntryPoint(UserOperationsWithEntryPoint), +} + +impl Message { + pub(crate) fn encode(self) -> Vec { + match self { + Message::UserOperationsWithEntryPoint(m) => { + UserOperationsWithEntryPointSsz::from(m).as_ssz_bytes() + } + } + } + + // TODO error handling + pub(crate) fn decode(topic: &Topic, data: Vec) -> Result { + if topic.encoding() != Encoding::SSZSnappy { + return Err("invalid encoding"); + } + + match topic.kind() { + TopicKind::UserOperationsWithEntryPoint => { + let uo_ssz = UserOperationsWithEntryPointSsz::from_ssz_bytes(&data).unwrap(); + let uo = UserOperationsWithEntryPoint::try_from(uo_ssz).unwrap(); + Ok(Self::UserOperationsWithEntryPoint(uo)) + } + } + } + + pub(crate) fn topic(&self, mempool_id: H256) -> Topic { + match self { + Message::UserOperationsWithEntryPoint(_) => Topic::new( + mempool_id, + TopicKind::UserOperationsWithEntryPoint, + Encoding::SSZSnappy, + ), + } + } +} + +/// User operations with entrypoint gossip message +#[derive(Clone, Debug)] +pub struct UserOperationsWithEntryPoint { + /// The entry point contract this message targets + pub entry_point_contract: Address, + /// The block hash at which these user operations were verified + pub verified_at_block_hash: H256, + /// The chain ID of the chain these user operations are for + pub chain_id: U256, + /// The user operations + pub user_operations: Vec, +} + +pub(crate) fn message_id(message: &gossipsub::Message) -> gossipsub::MessageId { + // NOTE: this message has already gone through the DataTransform layer, + // so its data is already decompressed, and is valid. + // this means that MESSAGE_DOMAIN_INVALID_SNAPPY is never used. + // TODO: check on this. + + // TODO: these message IDs are not scoped to topics. This may be a problem. + + let mut vec = Vec::with_capacity(MESSAGE_DOMAIN_VALID_SNAPPY.len() + message.data.len()); + vec.extend_from_slice(&MESSAGE_DOMAIN_VALID_SNAPPY); + vec.extend_from_slice(&message.data); + Sha256::digest(&vec)[..20].into() +} + +#[derive(Clone, Debug, Encode, Decode)] +struct UserOperationsWithEntryPointSsz { + entry_point_contract: Address, + verified_at_block_hash: H256, + chain_id: U256, + user_operations: Vec, +} + +impl From for UserOperationsWithEntryPointSsz { + fn from(uo: UserOperationsWithEntryPoint) -> Self { + Self { + entry_point_contract: uo.entry_point_contract, + verified_at_block_hash: uo.verified_at_block_hash, + chain_id: uo.chain_id, + user_operations: uo.user_operations.into_iter().map(|uo| uo.into()).collect(), + } + } +} + +impl TryFrom for UserOperationsWithEntryPoint { + type Error = &'static str; + + fn try_from(uo_ssz: UserOperationsWithEntryPointSsz) -> Result { + Ok(Self { + entry_point_contract: uo_ssz.entry_point_contract, + verified_at_block_hash: uo_ssz.verified_at_block_hash, + chain_id: uo_ssz.chain_id, + user_operations: uo_ssz + .user_operations + .into_iter() + .map(|uo| uo.try_into()) + .collect::, _>>()?, + }) + } +} diff --git a/crates/network/src/gossip/mod.rs b/crates/network/src/gossip/mod.rs new file mode 100644 index 000000000..9f8aef639 --- /dev/null +++ b/crates/network/src/gossip/mod.rs @@ -0,0 +1,31 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +pub(crate) mod message; +mod snappy; +use libp2p::gossipsub; +pub(crate) use snappy::SnappyTransform; +pub(crate) mod topic; + +use crate::Config; + +pub(crate) fn gossipsub_config(config: &Config) -> gossipsub::Config { + gossipsub::ConfigBuilder::default() + .max_transmit_size(config.gossip_max_size) + .validate_messages() + .validation_mode(gossipsub::ValidationMode::Anonymous) + // TODO fast message id + .message_id_fn(message::message_id) + .build() + .expect("valid gossipsub config") +} diff --git a/crates/network/src/gossip/snappy.rs b/crates/network/src/gossip/snappy.rs new file mode 100644 index 000000000..cf893c928 --- /dev/null +++ b/crates/network/src/gossip/snappy.rs @@ -0,0 +1,62 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +// adapted from https://github.com/sigp/lighthouse/blob/stable/beacon_node/lighthouse_network/src/types/pubsub.rs + +use std::io; + +use libp2p::gossipsub::{DataTransform, Message, RawMessage, TopicHash}; +use snap::raw::{Decoder, Encoder}; + +pub(crate) struct SnappyTransform { + gossip_max_size: usize, +} + +impl SnappyTransform { + pub(crate) fn new(gossip_max_size: usize) -> Self { + Self { gossip_max_size } + } +} + +impl DataTransform for SnappyTransform { + fn inbound_transform(&self, raw_message: RawMessage) -> Result { + let len = snap::raw::decompress_len(&raw_message.data)?; + if len > self.gossip_max_size { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "ssz_snappy decoded data > GOSSIP_MAX_SIZE", + )); + } + + let mut decoder = Decoder::new(); + let decompressed_data = decoder.decompress_vec(&raw_message.data)?; + + Ok(Message { + source: raw_message.source, + data: decompressed_data, + sequence_number: raw_message.sequence_number, + topic: raw_message.topic, + }) + } + + fn outbound_transform(&self, _topic: &TopicHash, data: Vec) -> Result, io::Error> { + if data.len() > self.gossip_max_size { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "ssz_snappy Encoded data > GOSSIP_MAX_SIZE", + )); + } + let mut encoder = Encoder::new(); + encoder.compress_vec(&data).map_err(Into::into) + } +} diff --git a/crates/network/src/gossip/topic.rs b/crates/network/src/gossip/topic.rs new file mode 100644 index 000000000..935c1f549 --- /dev/null +++ b/crates/network/src/gossip/topic.rs @@ -0,0 +1,131 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +use ethers::types::H256; +use libp2p::gossipsub::{self, TopicHash}; + +use crate::types::Encoding; + +const TOPIC_PREFIX: &str = "account_abstraction"; + +pub(crate) fn mempool_to_topics(mempool_id: H256) -> Vec { + vec![Topic::new( + mempool_id, + TopicKind::UserOperationsWithEntryPoint, + Encoding::SSZSnappy, + )] +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub(crate) enum TopicKind { + UserOperationsWithEntryPoint, +} + +impl AsRef for TopicKind { + fn as_ref(&self) -> &str { + match self { + TopicKind::UserOperationsWithEntryPoint => "user_operations_with_entry_point", + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub(crate) struct Topic { + mempool_id: H256, + kind: TopicKind, + encoding: Encoding, + id: String, +} + +impl Topic { + pub(crate) fn new(mempool_id: H256, kind: TopicKind, encoding: Encoding) -> Self { + let id = format!( + "/{}/{:?}/{}/{}", + TOPIC_PREFIX, + mempool_id, + kind.as_ref(), + encoding.as_ref() + ); + Self { + mempool_id, + kind, + encoding, + id, + } + } + + pub(crate) fn mempool_id(&self) -> H256 { + self.mempool_id + } + + pub(crate) fn kind(&self) -> &TopicKind { + &self.kind + } + + pub(crate) fn encoding(&self) -> Encoding { + self.encoding + } +} + +impl AsRef for Topic { + fn as_ref(&self) -> &str { + self.id.as_ref() + } +} + +impl From for TopicHash { + fn from(topic: Topic) -> Self { + TopicHash::from_raw(topic.as_ref()) + } +} + +impl From for gossipsub::IdentTopic { + fn from(topic: Topic) -> Self { + gossipsub::IdentTopic::new(topic.as_ref()) + } +} + +impl TryFrom for Topic { + type Error = &'static str; + + fn try_from(topic: TopicHash) -> Result { + let mut parts = topic.as_str().split('/'); + let _ = parts.next(); + + if parts.next() != Some(TOPIC_PREFIX) { + return Err("invalid topic: invalid prefix"); + } + + let mempool_id = parts + .next() + .ok_or("invalid topic: missing mempool id")? + .parse::() + .map_err(|_| "invalid topic: invalid mempool id")?; + + let kind = match parts.next() { + Some("user_operations_with_entry_point") => TopicKind::UserOperationsWithEntryPoint, + _ => return Err("invalid topic: invalid kind"), + }; + + let encoding = match parts.next() { + Some("ssz_snappy") => Encoding::SSZSnappy, + _ => return Err("invalid topic: invalid encoding"), + }; + + if parts.next().is_some() { + return Err("invalid topic: too many parts"); + } + + Ok(Self::new(mempool_id, kind, encoding)) + } +} diff --git a/crates/network/src/lib.rs b/crates/network/src/lib.rs index 940435f52..180da2d2b 100644 --- a/crates/network/src/lib.rs +++ b/crates/network/src/lib.rs @@ -36,6 +36,9 @@ pub use network::{ mod error; pub use error::{Error, Result}; +mod gossip; +pub use gossip::message::{Message as GossipMessage, UserOperationsWithEntryPoint}; + mod rpc; pub use rpc::{ message::{ @@ -44,3 +47,5 @@ pub use rpc::{ }, ConnectionConfig, }; + +mod types; diff --git a/crates/network/src/network.rs b/crates/network/src/network.rs index beaa9985f..aeb87be76 100644 --- a/crates/network/src/network.rs +++ b/crates/network/src/network.rs @@ -11,7 +11,10 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. -use std::{collections::HashSet, net::SocketAddr}; +use std::{ + collections::{HashMap, HashSet}, + net::SocketAddr, +}; use discv5::Enr; use ethers::types::H256; @@ -19,6 +22,7 @@ use futures::StreamExt; use libp2p::{ core, core::{transport::upgrade, ConnectedPoint}, + gossipsub::{self, MessageAcceptance, MessageId, PublishError}, identify, identity::{secp256k1, Keypair}, noise, @@ -29,12 +33,13 @@ use tokio::sync::mpsc; use tracing::{debug, error, info}; use crate::{ - behaviour::{Behaviour, BehaviourEvent}, + behaviour::{Behaviour, BehaviourEvent, GossipSub}, discovery::{ self, enr::{self, EnrExt}, }, error::Result, + gossip::{self, message::Message as GossipMessage, topic, topic::Topic, SnappyTransform}, rpc::{ self, message::{ @@ -65,6 +70,8 @@ pub struct Config { pub tick_interval: std::time::Duration, /// target number of peers to connect to pub target_num_peers: usize, + /// max size of gossip messages + pub gossip_max_size: usize, } impl Default for Config { @@ -81,6 +88,7 @@ impl Default for Config { metadata_seq_number: 0, tick_interval: std::time::Duration::from_secs(1), target_num_peers: 16, + gossip_max_size: 1024 * 1024, } } } @@ -124,16 +132,46 @@ pub struct PeerRequestId(pub u64); #[derive(Debug)] pub enum Event { /// Peer connected, can send requests - PeerConnected(PeerId), + PeerConnected { + /// Peer that connected + peer_id: PeerId, + }, /// Peer disconnected, can no longer send requests - PeerDisconnected(PeerId), + PeerDisconnected { + /// Peer that disconnected + peer_id: PeerId, + }, /// Network shutdown complete ShutdownComplete, /// Request from peer - RequestReceived(PeerId, PeerRequestId, AppRequest), + RequestReceived { + /// Peer that sent the request + peer_id: PeerId, + /// Request ID to associate with response + request_id: PeerRequestId, + /// The request + request: AppRequest, + }, /// Response from peer - ResponseReceived(PeerId, AppRequestId, AppResponseResult), - // TODO: gossip messages will go here + ResponseReceived { + /// Peer that sent the response + peer_id: PeerId, + /// Request ID to associate with response + request_id: AppRequestId, + /// The response + response: AppResponseResult, + }, + /// Gossip messages + GossipMessage { + /// Message ID + id: MessageId, + /// Peer that sent the message, not necessarily the original author + peer_id: PeerId, + /// The mempool id + mempool_id: H256, + /// The message + message: gossip::message::Message, + }, } /// Actions sent to the network @@ -146,10 +184,37 @@ pub enum Action { /// `ShutdownComplete` event when shutdown is complete. Shutdown, /// Request to send to peer - Request(PeerId, AppRequestId, AppRequest), + Request { + /// Peer to send request to + peer_id: PeerId, + /// Request ID to associate with response + request_id: AppRequestId, + /// The request + request: AppRequest, + }, /// Response to send to peer - Response(PeerRequestId, AppResponseResult), - // TODO: gossip messages will go here + Response { + /// Request ID to associate with response + request_id: PeerRequestId, + /// The response + response: AppResponseResult, + }, + /// Gossip message to send + GossipMessage { + /// The mempool ids to gossip on + mempool_ids: Vec, + /// The message to gossip + message: GossipMessage, + }, + /// Validate gossip message + ValidateMessage { + /// Peer that sent the message, not necessarily the original author + peer_id: PeerId, + /// The message ID + id: MessageId, + /// Validation result + result: MessageAcceptance, + }, } // Requests sent by the network @@ -172,6 +237,9 @@ pub struct Network { // TODO move this to peer manager known_peers: HashSet, + // TODO determine what to do with messages that can't be sent + // due to insufficient peers. This is to help testing. + gossip_cache: HashMap>>, } impl Network { @@ -202,11 +270,23 @@ impl Network { )) .boxed(); + let transform = SnappyTransform::new(config.gossip_max_size); + let gossipsub = gossipsub::Behaviour::new_with_transform( + gossipsub::MessageAuthenticity::Anonymous, + gossip::gossipsub_config(&config), + // TODO enable metrics + None, + transform, + ) + .expect("should create gossipsub behaviour"); + let rpc = rpc::Behaviour::new(config.network_config.clone()); + let discovery = discovery::Behaviour::new(&config, enr.clone(), enr_key).await?; let behaviour = Behaviour { keep_alive: keep_alive::Behaviour, + gossipsub, rpc, discovery, identify: identify::Behaviour::new(identify::Config::new( @@ -243,6 +323,16 @@ impl Network { .map(|b| b.peer_id()) .collect::>(); + for mempool in &config.supported_mempools { + for topic in topic::mempool_to_topics(*mempool) { + swarm + .behaviour_mut() + .gossipsub + .subscribe(&topic.into()) + .unwrap(); + } + } + Ok(Self { swarm, enr, @@ -250,6 +340,7 @@ impl Network { event_sender, action_recv, known_peers, + gossip_cache: HashMap::new(), }) } @@ -259,19 +350,6 @@ impl Network { tokio::select! { Some(action) = self.action_recv.recv() => { match action { - Action::Request(peer_id, request_id, req) => { - self.swarm.behaviour_mut().rpc.send_request( - &peer_id, - NetworkRequestId::App(request_id), - req.into(), - ); - }, - Action::Response(request_id, req) => { - self.swarm.behaviour_mut().rpc.send_response( - request_id, - req.map(Into::into), - ); - }, Action::Shutdown => { info!("Shutting down network"); @@ -280,6 +358,50 @@ impl Network { self.send_event(Event::ShutdownComplete); return Ok(()); } + Action::Request{peer_id, request_id, request} => { + self.rpc_mut().send_request( + &peer_id, + NetworkRequestId::App(request_id), + request.into(), + ); + }, + Action::Response{request_id, response} => { + self.rpc_mut().send_response( + request_id, + response.map(Into::into), + ); + }, + Action::GossipMessage{mempool_ids, message} => { + for mempool in mempool_ids { + let topic = message.topic(mempool); + debug!("Publishing gossip message on topic {:?}", topic); + if let Err(e) = self.gossip_mut().publish(topic, message.clone().encode()) { + error!("Failed to publish gossip message: {:?}", e); + + let encoded = message.clone().encode(); + let topic = message.topic(mempool); + if let PublishError::InsufficientPeers = e { + if let Some(c) = self.gossip_cache.get_mut(&topic) { + c.insert(encoded); + } else { + let mut c = HashSet::new(); + c.insert(encoded); + self.gossip_cache.insert(topic, c); + } + } + + } + } + }, + Action::ValidateMessage{peer_id, id, result} => { + if let Err(e) = self.gossip_mut().report_message_validation_result( + &id, + &peer_id, + result, + ) { + error!("Failed to report message validation result: {:?}", e); + } + }, } }, Some(swarm_event) = self.swarm.next() => { @@ -296,6 +418,7 @@ impl Network { } } } + SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(e)) => self.on_gossipsub_event(e), SwarmEvent::Behaviour(BehaviourEvent::Rpc(e)) => self.on_rpc_event(e), SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. @@ -306,7 +429,7 @@ impl Network { peer_id, endpoint, .. } => { debug!("Connection closed with peer {:?} at endpoint {:?}", peer_id, endpoint); - self.send_event(Event::PeerDisconnected(peer_id)); + self.send_event(Event::PeerDisconnected{peer_id}); self.known_peers.remove(&peer_id); } e => debug!("Unhandled event: {:?}", e), @@ -321,23 +444,77 @@ impl Network { &self.enr } - fn on_connection_established(&mut self, peer: PeerId, endpoint: ConnectedPoint) { + fn on_connection_established(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { info!( "Connection established with peer {:?} at endpoint {:?}", - peer, endpoint + peer_id, endpoint ); match endpoint { ConnectedPoint::Dialer { .. } => { - self.request_status(peer); + self.request_status(peer_id); } ConnectedPoint::Listener { .. } => {} } - self.send_event(Event::PeerConnected(peer)); + self.send_event(Event::PeerConnected { peer_id }); // TODO peer manager // peers can discover us first and connect to us - if self.known_peers.insert(peer) { - debug!("Discovered peer {:?}", peer); + if self.known_peers.insert(peer_id) { + debug!("Discovered peer {:?}", peer_id); + } + } + + fn on_gossipsub_event(&mut self, event: gossipsub::Event) { + match event { + gossipsub::Event::Message { + propagation_source: peer_id, + message, + message_id, + } => { + let Ok(topic) = message.topic.try_into() else { + error!("Received message on unknown topic"); + return; + }; + match GossipMessage::decode(&topic, message.data) { + Ok(message) => { + debug!("Received gossip message: {:?}", message); + self.send_event(Event::GossipMessage { + id: message_id, + peer_id, + mempool_id: topic.mempool_id(), + message, + }); + } + Err(e) => { + error!("Failed to decode gossip message: {:?}", e); + if let Err(e) = self.gossip_mut().report_message_validation_result( + &message_id, + &peer_id, + MessageAcceptance::Reject, + ) { + error!("Failed to report message validation result: {:?}", e); + } + } + } + } + gossipsub::Event::Subscribed { topic, .. } => { + debug!("Subscribed to topic {:?}", topic); + let topic = match Topic::try_from(topic) { + Ok(topic) => topic, + Err(e) => { + error!("Failed to decode topic: {:?}", e); + return; + } + }; + if let Some(c) = self.gossip_cache.remove(&topic) { + for message in c { + if let Err(e) = self.gossip_mut().publish(topic.clone(), message) { + error!("Failed to publish gossip message: {:?}", e); + } + } + } + } + e => debug!("Unhandled gossipsub event: {:?}", e), } } @@ -346,12 +523,10 @@ impl Network { rpc::Event::Request(peer_id, request_id, request) => match request { Request::Status(status) => { debug!("Received status: {:?} from peer {:?}", status, peer_id); - self.swarm.behaviour_mut().rpc.send_response( - request_id, - Ok(Response::Status(Status { - supported_mempools: self.config.supported_mempools.clone(), - })), - ); + let response = Ok(Response::Status(Status { + supported_mempools: self.config.supported_mempools.clone(), + })); + self.rpc_mut().send_response(request_id, response); // Notify peer manager to update status } Request::Goodbye(_) => { @@ -360,32 +535,28 @@ impl Network { } Request::Ping(ping) => { debug!("Received ping: {:?} from peer {:?}", ping, peer_id); - self.swarm.behaviour_mut().rpc.send_response( - request_id, - Ok(Response::Ping(Pong { - metadata_seq_number: self.config.metadata_seq_number, - })), - ); + let response = Ok(Response::Ping(Pong { + metadata_seq_number: self.config.metadata_seq_number, + })); + self.rpc_mut().send_response(request_id, response); // Notify peer manager to update ping time } Request::Metadata => { - self.swarm.behaviour_mut().rpc.send_response( - request_id, - Ok(Response::Metadata(Metadata { - seq_number: self.config.metadata_seq_number, - })), - ); + let response = Ok(Response::Metadata(Metadata { + seq_number: self.config.metadata_seq_number, + })); + self.rpc_mut().send_response(request_id, response); } Request::PooledUserOpHashes(r) => { - self.send_event(Event::RequestReceived( + self.send_event(Event::RequestReceived { peer_id, request_id, - AppRequest::PooledUserOpHashes(r), - )); + request: AppRequest::PooledUserOpHashes(r), + }); } Request::PooledUserOpsByHash(r) => { if r.hashes.len() > MAX_OPS_PER_REQUEST { - self.swarm.behaviour_mut().rpc.send_response( + self.rpc_mut().send_response( request_id, Err(ResponseError { kind: ErrorKind::InvalidRequest, @@ -399,11 +570,11 @@ impl Network { // Notify peer manager to update metadata } else { // Send request to network manager - self.send_event(Event::RequestReceived( + self.send_event(Event::RequestReceived { peer_id, request_id, - AppRequest::PooledUserOpsByHash(r), - )); + request: AppRequest::PooledUserOpsByHash(r), + }); } } }, @@ -421,7 +592,7 @@ impl Network { // Notify peer manager to update metadata } Ok(Response::PooledUserOpHashes(r)) => { - let NetworkRequestId::App(id) = request_id else { + let NetworkRequestId::App(request_id) = request_id else { error!( "Received unexpected request id for PooledUserOpHashes: {:?}", request_id @@ -430,14 +601,14 @@ impl Network { }; // Send response to network manager - self.send_event(Event::ResponseReceived( + self.send_event(Event::ResponseReceived { peer_id, - id, - Ok(AppResponse::PooledUserOpHashes(r)), - )); + request_id, + response: Ok(AppResponse::PooledUserOpHashes(r)), + }); } Ok(Response::PooledUserOpsByHash(r)) => { - let NetworkRequestId::App(id) = request_id else { + let NetworkRequestId::App(request_id) = request_id else { error!( "Received unexpected request id for PooledUserOpsByHash: {:?}", request_id @@ -446,17 +617,21 @@ impl Network { }; // Send response to network manager - self.send_event(Event::ResponseReceived( + self.send_event(Event::ResponseReceived { peer_id, - id, - Ok(AppResponse::PooledUserOpsByHash(r)), - )); + request_id, + response: Ok(AppResponse::PooledUserOpsByHash(r)), + }); } Err(e) => { error!("Received response error: {:?}", e); // If external request, return error to network manager - if let NetworkRequestId::App(id) = request_id { - self.send_event(Event::ResponseReceived(peer_id, id, Err(e))); + if let NetworkRequestId::App(request_id) = request_id { + self.send_event(Event::ResponseReceived { + peer_id, + request_id, + response: Err(e), + }); } } }, @@ -471,38 +646,46 @@ impl Network { // A status request is sent in response to a new dialed connection // and at a regular interval to all peers. - fn request_status(&mut self, peer: PeerId) { - debug!("Requesting status from peer {:?}", peer); - self.swarm.behaviour_mut().rpc.send_request( - &peer, - NetworkRequestId::Internal, - Request::Status(Status { - supported_mempools: self.config.supported_mempools.clone(), - }), - ); + fn request_status(&mut self, peer_id: PeerId) { + debug!("Requesting status from peer {:?}", peer_id); + let request = Request::Status(Status { + supported_mempools: self.config.supported_mempools.clone(), + }); + self.send_internal_request(peer_id, request) } // A metadata request is sent when a ping/pong is received with a different // metadata sequence number than the local cached version. - fn _request_metadata(&mut self, peer: PeerId) { - debug!("Sending metadata request to peer {:?}", peer); - self.swarm.behaviour_mut().rpc.send_request( - &peer, - NetworkRequestId::Internal, - Request::Metadata, - ); + fn _request_metadata(&mut self, peer_id: PeerId) { + debug!("Sending metadata request to peer {:?}", peer_id); + self.send_internal_request(peer_id, Request::Metadata) } // A ping request is sent at a regular interval to all peers. fn _ping(&mut self, peer: PeerId) { debug!("Sending ping to peer {:?}", peer); - self.swarm.behaviour_mut().rpc.send_request( - &peer, - NetworkRequestId::Internal, - Request::Ping(Ping { - metadata_seq_number: self.config.metadata_seq_number, - }), - ); + let request = Request::Ping(Ping { + metadata_seq_number: self.config.metadata_seq_number, + }); + self.rpc_mut() + .send_request(&peer, NetworkRequestId::Internal, request); + } + + fn gossip_mut(&mut self) -> &mut GossipSub { + &mut self.swarm.behaviour_mut().gossipsub + } + + fn rpc_mut(&mut self) -> &mut rpc::Behaviour { + &mut self.swarm.behaviour_mut().rpc + } + + fn _discovery_mut(&mut self) -> &mut discovery::Behaviour { + &mut self.swarm.behaviour_mut().discovery + } + + fn send_internal_request(&mut self, peer_id: PeerId, request: Request) { + self.rpc_mut() + .send_request(&peer_id, NetworkRequestId::Internal, request); } } diff --git a/crates/network/src/rpc/handler/codec.rs b/crates/network/src/rpc/handler/codec.rs index 20e9e5eb3..053e4ee40 100644 --- a/crates/network/src/rpc/handler/codec.rs +++ b/crates/network/src/rpc/handler/codec.rs @@ -218,13 +218,15 @@ mod test { use rundler_types::UserOperation; use super::*; - use crate::rpc::{ - handler::serde::{CodedError, PooledUserOpsByHashChunkSsz, SszChunk, UserOperationSsz}, - message::{ - ErrorKind, Goodbye, GoodbyeReason, Metadata, Ping, Pong, PooledUserOpHashesRequest, - PooledUserOpHashesResponse, PooledUserOpsByHashRequest, ResponseError, Status, + use crate::{ + rpc::{ + handler::serde::{CodedError, PooledUserOpsByHashChunkSsz, SszChunk}, + message::{ + ErrorKind, Goodbye, GoodbyeReason, Metadata, Ping, Pong, PooledUserOpHashesRequest, + PooledUserOpHashesResponse, PooledUserOpsByHashRequest, ResponseError, Status, + }, }, - protocol::Encoding, + types::{Encoding, UserOperationSsz}, }; const MAX_CHUNK_SIZE: usize = 1048576; diff --git a/crates/network/src/rpc/handler/inbound.rs b/crates/network/src/rpc/handler/inbound.rs index c2792bafd..2a8955b6e 100644 --- a/crates/network/src/rpc/handler/inbound.rs +++ b/crates/network/src/rpc/handler/inbound.rs @@ -23,8 +23,9 @@ use crate::{ network::PeerRequestId, rpc::{ message::{Request, ResponseResult}, - protocol::{self, Encoding, Protocol, ProtocolError}, + protocol::{self, Protocol, ProtocolError}, }, + types::Encoding, }; #[derive(Debug)] diff --git a/crates/network/src/rpc/handler/outbound.rs b/crates/network/src/rpc/handler/outbound.rs index 1d10f2508..50499a2e0 100644 --- a/crates/network/src/rpc/handler/outbound.rs +++ b/crates/network/src/rpc/handler/outbound.rs @@ -17,9 +17,12 @@ use tokio_io_timeout::TimeoutStream; use tokio_util::{codec::Framed, compat::FuturesAsyncReadCompatExt}; use super::{codec, serde, ConnectionConfig}; -use crate::rpc::{ - message::{Request, ResponseResult}, - protocol::{self, Encoding, Protocol, ProtocolError, ProtocolSchema}, +use crate::{ + rpc::{ + message::{Request, ResponseResult}, + protocol::{self, Protocol, ProtocolError, ProtocolSchema}, + }, + types::Encoding, }; #[derive(Debug)] diff --git a/crates/network/src/rpc/handler/serde.rs b/crates/network/src/rpc/handler/serde.rs index 7fc76d3cb..1c71f26fe 100644 --- a/crates/network/src/rpc/handler/serde.rs +++ b/crates/network/src/rpc/handler/serde.rs @@ -11,21 +11,22 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. -use ethers::types::{Address, U256}; -use rundler_types::UserOperation; use ssz::{Decode, DecodeError, Encode}; use ssz_derive::{Decode, Encode}; use tracing::warn; -use crate::rpc::{ - handler::CodecError, - message::{ - ErrorKind, Goodbye, Metadata, Ping, Pong, PooledUserOpHashesRequest, - PooledUserOpHashesResponse, PooledUserOpsByHashRequest, PooledUserOpsByHashResponse, - Request, Response, ResponseError, ResponseResult, Status, MAX_ERROR_MESSAGE_LEN, - MAX_OPS_PER_REQUEST, +use crate::{ + rpc::{ + handler::CodecError, + message::{ + ErrorKind, Goodbye, Metadata, Ping, Pong, PooledUserOpHashesRequest, + PooledUserOpHashesResponse, PooledUserOpsByHashRequest, PooledUserOpsByHashResponse, + Request, Response, ResponseError, ResponseResult, Status, MAX_ERROR_MESSAGE_LEN, + MAX_OPS_PER_REQUEST, + }, + protocol::ProtocolSchema, }, - protocol::ProtocolSchema, + types::UserOperationSsz, }; // TODO: determine what this actually should be @@ -301,63 +302,6 @@ pub(crate) struct PooledUserOpsByHashChunkSsz { pub(crate) user_op: UserOperationSsz, } -#[derive(Debug, Clone, PartialEq, Encode, Decode)] -pub(crate) struct UserOperationSsz { - sender: Vec, - nonce: U256, - init_code: Vec, - call_data: Vec, - call_gas_limit: U256, - verification_gas_limit: U256, - pre_verification_gas: U256, - max_fee_per_gas: U256, - max_priority_fee_per_gas: U256, - paymaster_and_data: Vec, - signature: Vec, -} - -impl From for UserOperationSsz { - fn from(uo: UserOperation) -> Self { - Self { - sender: uo.sender.as_bytes().to_vec(), - nonce: uo.nonce, - init_code: uo.init_code.to_vec(), - call_data: uo.call_data.to_vec(), - call_gas_limit: uo.call_gas_limit, - verification_gas_limit: uo.verification_gas_limit, - pre_verification_gas: uo.pre_verification_gas, - max_fee_per_gas: uo.max_fee_per_gas, - max_priority_fee_per_gas: uo.max_priority_fee_per_gas, - paymaster_and_data: uo.paymaster_and_data.to_vec(), - signature: uo.signature.to_vec(), - } - } -} - -impl TryFrom for UserOperation { - type Error = CodecError; - - fn try_from(uo_ssz: UserOperationSsz) -> Result { - if uo_ssz.sender.len() != 20 { - return Err(CodecError::InvalidData("invalid sender bytes".into())); - } - - Ok(Self { - sender: Address::from_slice(&uo_ssz.sender), - nonce: uo_ssz.nonce, - init_code: uo_ssz.init_code.into(), - call_data: uo_ssz.call_data.into(), - call_gas_limit: uo_ssz.call_gas_limit, - verification_gas_limit: uo_ssz.verification_gas_limit, - pre_verification_gas: uo_ssz.pre_verification_gas, - max_fee_per_gas: uo_ssz.max_fee_per_gas, - max_priority_fee_per_gas: uo_ssz.max_priority_fee_per_gas, - paymaster_and_data: uo_ssz.paymaster_and_data.into(), - signature: uo_ssz.signature.into(), - }) - } -} - impl ProtocolSchema { pub(crate) fn ssz_request_limits(&self) -> (usize, usize) { match self { @@ -414,6 +358,7 @@ impl From for CodecError { #[cfg(test)] mod test { use ethers::types::H256; + use rundler_types::UserOperation; use super::*; diff --git a/crates/network/src/rpc/protocol.rs b/crates/network/src/rpc/protocol.rs index c0e7f1add..a9f1c248a 100644 --- a/crates/network/src/rpc/protocol.rs +++ b/crates/network/src/rpc/protocol.rs @@ -12,7 +12,7 @@ // If not, see https://www.gnu.org/licenses/. use super::handler::CodecError; -use crate::rpc::message::Request; +use crate::{rpc::message::Request, types::Encoding}; const PROTOCOL_PREFIX: &str = "/account_abstraction/req"; @@ -53,14 +53,6 @@ impl Protocol { } } -/// The encoding of a protocol. -/// -/// Currently only SSZSnappy is supported. -#[derive(Clone, Debug)] -pub(crate) enum Encoding { - SSZSnappy, -} - impl AsRef for Encoding { fn as_ref(&self) -> &str { match self { diff --git a/crates/network/src/types.rs b/crates/network/src/types.rs new file mode 100644 index 000000000..326644b6d --- /dev/null +++ b/crates/network/src/types.rs @@ -0,0 +1,81 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +use ethers::types::{Address, U256}; +use rundler_types::UserOperation; +use ssz_derive::{Decode, Encode}; + +#[derive(Debug, Clone, PartialEq, Encode, Decode)] +pub(crate) struct UserOperationSsz { + sender: Vec, + nonce: U256, + init_code: Vec, + call_data: Vec, + call_gas_limit: U256, + verification_gas_limit: U256, + pre_verification_gas: U256, + max_fee_per_gas: U256, + max_priority_fee_per_gas: U256, + paymaster_and_data: Vec, + signature: Vec, +} + +impl From for UserOperationSsz { + fn from(uo: UserOperation) -> Self { + Self { + sender: uo.sender.as_bytes().to_vec(), + nonce: uo.nonce, + init_code: uo.init_code.to_vec(), + call_data: uo.call_data.to_vec(), + call_gas_limit: uo.call_gas_limit, + verification_gas_limit: uo.verification_gas_limit, + pre_verification_gas: uo.pre_verification_gas, + max_fee_per_gas: uo.max_fee_per_gas, + max_priority_fee_per_gas: uo.max_priority_fee_per_gas, + paymaster_and_data: uo.paymaster_and_data.to_vec(), + signature: uo.signature.to_vec(), + } + } +} + +impl TryFrom for UserOperation { + type Error = &'static str; + + fn try_from(uo_ssz: UserOperationSsz) -> Result { + if uo_ssz.sender.len() != 20 { + return Err("invalid sender bytes"); + } + + Ok(Self { + sender: Address::from_slice(&uo_ssz.sender), + nonce: uo_ssz.nonce, + init_code: uo_ssz.init_code.into(), + call_data: uo_ssz.call_data.into(), + call_gas_limit: uo_ssz.call_gas_limit, + verification_gas_limit: uo_ssz.verification_gas_limit, + pre_verification_gas: uo_ssz.pre_verification_gas, + max_fee_per_gas: uo_ssz.max_fee_per_gas, + max_priority_fee_per_gas: uo_ssz.max_priority_fee_per_gas, + paymaster_and_data: uo_ssz.paymaster_and_data.into(), + signature: uo_ssz.signature.into(), + }) + } +} + +/// The encoding of a protocol. +/// +/// Currently only SSZSnappy is supported. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub(crate) enum Encoding { + SSZSnappy, +} diff --git a/crates/network/tests/test_integration.rs b/crates/network/tests/test_integration.rs index 5d6fa9b26..d580dca54 100644 --- a/crates/network/tests/test_integration.rs +++ b/crates/network/tests/test_integration.rs @@ -11,15 +11,19 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. -use std::net::{Ipv4Addr, SocketAddr, TcpListener}; +use std::{ + net::{Ipv4Addr, SocketAddr, TcpListener}, + time::Duration, +}; use discv5::Enr; use ethers::types::H256; use libp2p::PeerId; use rundler_network::{ - enr::EnrExt, Action, AppRequest, AppRequestId, AppResponse, Config, Event, Network, - PooledUserOpHashesRequest, PooledUserOpHashesResponse, PooledUserOpsByHashRequest, - PooledUserOpsByHashResponse, ResponseErrorKind, Result, MAX_OPS_PER_REQUEST, + enr::EnrExt, Action, AppRequest, AppRequestId, AppResponse, Config, Event, GossipMessage, + Network, PooledUserOpHashesRequest, PooledUserOpHashesResponse, PooledUserOpsByHashRequest, + PooledUserOpsByHashResponse, ResponseErrorKind, Result, UserOperationsWithEntryPoint, + MAX_OPS_PER_REQUEST, }; use rundler_types::UserOperation; use tokio::{sync::mpsc, task::JoinHandle}; @@ -68,9 +72,9 @@ async fn setup_network(bootnodes: Vec, supported_mempools: Vec) -> Te } } -async fn setup_node_pair() -> (TestNetworkContext, TestNetworkContext) { - let bootnode = setup_network(vec![], vec![]).await; - let node = setup_network(vec![bootnode.enr.clone()], vec![]).await; +async fn setup_node_pair(mempools: Vec) -> (TestNetworkContext, TestNetworkContext) { + let bootnode = setup_network(vec![], mempools.clone()).await; + let node = setup_network(vec![bootnode.enr.clone()], mempools.clone()).await; (bootnode, node) } @@ -78,7 +82,7 @@ async fn shutdown_node_pair(mut node0: TestNetworkContext, node1: TestNetworkCon shutdown(node1).await; match node0.event_receiver.recv().await { - Some(Event::PeerDisconnected(_)) => {} + Some(Event::PeerDisconnected { .. }) => {} _ => panic!("Expected peer disconnected event"), } @@ -96,12 +100,12 @@ async fn wait_for_pair_connect( node1: &mut TestNetworkContext, ) -> (PeerId, PeerId) { let peer0 = match node0.event_receiver.recv().await { - Some(Event::PeerConnected(peer_id)) => peer_id, + Some(Event::PeerConnected { peer_id }) => peer_id, _ => panic!("Expected peer connected event"), }; let peer1 = match node1.event_receiver.recv().await { - Some(Event::PeerConnected(peer_id)) => peer_id, + Some(Event::PeerConnected { peer_id }) => peer_id, _ => panic!("Expected peer connected event"), }; @@ -127,15 +131,15 @@ async fn test_shutdown() { #[tokio::test] #[traced_test] async fn test_peer_connect() { - let (mut bootnode, mut node) = setup_node_pair().await; + let (mut bootnode, mut node) = setup_node_pair(vec![]).await; match node.event_receiver.recv().await { - Some(Event::PeerConnected(_)) => {} + Some(Event::PeerConnected { .. }) => {} _ => panic!("Expected peer connected event"), } match bootnode.event_receiver.recv().await { - Some(Event::PeerConnected(_)) => {} + Some(Event::PeerConnected { .. }) => {} _ => panic!("Expected peer connected event"), } @@ -145,22 +149,29 @@ async fn test_peer_connect() { #[tokio::test] #[traced_test] async fn test_req_resp_op_hashes() { - let (mut bootnode, mut node) = setup_node_pair().await; + let (mut bootnode, mut node) = setup_node_pair(vec![]).await; let (node_peer_id, bootnode_peer_id) = wait_for_pair_connect(&mut bootnode, &mut node).await; let mempool = H256::random(); bootnode .action_sender - .send(Action::Request( - node_peer_id, - AppRequestId(0), - AppRequest::PooledUserOpHashes(PooledUserOpHashesRequest { mempool, offset: 0 }), - )) + .send(Action::Request { + peer_id: node_peer_id, + request_id: AppRequestId(0), + request: AppRequest::PooledUserOpHashes(PooledUserOpHashesRequest { + mempool, + offset: 0, + }), + }) .unwrap(); let request_id = match node.event_receiver.recv().await { - Some(Event::RequestReceived(peer_id, request_id, request)) => match request { + Some(Event::RequestReceived { + peer_id, + request_id, + request, + }) => match request { AppRequest::PooledUserOpHashes(r) => { assert_eq!(peer_id, bootnode_peer_id); assert_eq!(r.mempool, mempool); @@ -175,19 +186,23 @@ async fn test_req_resp_op_hashes() { let hashes = vec![H256::random(), H256::random()]; node.action_sender - .send(Action::Response( + .send(Action::Response { request_id, - Ok(AppResponse::PooledUserOpHashes( + response: Ok(AppResponse::PooledUserOpHashes( PooledUserOpHashesResponse { more_flag: true, hashes: hashes.clone(), }, )), - )) + }) .unwrap(); match bootnode.event_receiver.recv().await { - Some(Event::ResponseReceived(peer_id, request_id, request)) => match request { + Some(Event::ResponseReceived { + peer_id, + request_id, + response, + }) => match response { Ok(AppResponse::PooledUserOpHashes(r)) => { assert_eq!(peer_id, node_peer_id); assert_eq!(request_id, AppRequestId(0)); @@ -205,24 +220,28 @@ async fn test_req_resp_op_hashes() { #[tokio::test] #[traced_test] async fn test_req_resp_ops_by_hashes() { - let (mut bootnode, mut node) = setup_node_pair().await; + let (mut bootnode, mut node) = setup_node_pair(vec![]).await; let (node_peer_id, bootnode_peer_id) = wait_for_pair_connect(&mut bootnode, &mut node).await; let hashes = vec![H256::random(), H256::random()]; bootnode .action_sender - .send(Action::Request( - node_peer_id, - AppRequestId(0), - AppRequest::PooledUserOpsByHash(PooledUserOpsByHashRequest { + .send(Action::Request { + peer_id: node_peer_id, + request_id: AppRequestId(0), + request: AppRequest::PooledUserOpsByHash(PooledUserOpsByHashRequest { hashes: hashes.clone(), }), - )) + }) .unwrap(); let request_id = match node.event_receiver.recv().await { - Some(Event::RequestReceived(peer_id, request_id, request)) => match request { + Some(Event::RequestReceived { + peer_id, + request_id, + request, + }) => match request { AppRequest::PooledUserOpsByHash(r) => { assert_eq!(peer_id, bootnode_peer_id); assert_eq!(r.hashes, hashes); @@ -234,18 +253,22 @@ async fn test_req_resp_ops_by_hashes() { }; node.action_sender - .send(Action::Response( + .send(Action::Response { request_id, - Ok(AppResponse::PooledUserOpsByHash( + response: Ok(AppResponse::PooledUserOpsByHash( PooledUserOpsByHashResponse { user_ops: vec![UserOperation::default(), UserOperation::default()], }, )), - )) + }) .unwrap(); match bootnode.event_receiver.recv().await { - Some(Event::ResponseReceived(peer_id, request_id, request)) => match request { + Some(Event::ResponseReceived { + peer_id, + request_id, + response, + }) => match response { Ok(AppResponse::PooledUserOpsByHash(r)) => { assert_eq!(peer_id, node_peer_id); assert_eq!(request_id, AppRequestId(0)); @@ -262,24 +285,28 @@ async fn test_req_resp_ops_by_hashes() { #[tokio::test] #[traced_test] async fn test_req_resp_ops_by_hashes_too_many() { - let (mut bootnode, mut node) = setup_node_pair().await; + let (mut bootnode, mut node) = setup_node_pair(vec![]).await; let (node_peer_id, _) = wait_for_pair_connect(&mut bootnode, &mut node).await; let hashes = vec![H256::random(); MAX_OPS_PER_REQUEST + 1]; bootnode .action_sender - .send(Action::Request( - node_peer_id, - AppRequestId(0), - AppRequest::PooledUserOpsByHash(PooledUserOpsByHashRequest { + .send(Action::Request { + peer_id: node_peer_id, + request_id: AppRequestId(0), + request: AppRequest::PooledUserOpsByHash(PooledUserOpsByHashRequest { hashes: hashes.clone(), }), - )) + }) .unwrap(); match bootnode.event_receiver.recv().await { - Some(Event::ResponseReceived(peer_id, request_id, response)) => match response { + Some(Event::ResponseReceived { + peer_id, + request_id, + response, + }) => match response { Err(e) => { assert_eq!(peer_id, node_peer_id); assert_eq!(request_id, AppRequestId(0)); @@ -296,7 +323,7 @@ async fn test_req_resp_ops_by_hashes_too_many() { #[tokio::test] #[traced_test] async fn test_discovery() { - let (mut bootnode, mut node0) = setup_node_pair().await; + let (mut bootnode, mut node0) = setup_node_pair(vec![]).await; wait_for_pair_connect(&mut bootnode, &mut node0).await; let mut node1 = setup_network(vec![bootnode.enr.clone()], vec![]).await; @@ -304,7 +331,7 @@ async fn test_discovery() { // node 1 should discover both bootnode and node0 for _ in 0..2 { match node1.event_receiver.recv().await { - Some(Event::PeerConnected(peer_id)) => { + Some(Event::PeerConnected { peer_id }) => { assert!(peer_id == bootnode.enr.peer_id() || peer_id == node0.enr.peer_id()) } _ => panic!("Expected discovered peer event"), @@ -313,9 +340,124 @@ async fn test_discovery() { // node 0 should discover node 1 match node0.event_receiver.recv().await { - Some(Event::PeerConnected(peer_id)) => assert_eq!(peer_id, node1.enr.peer_id()), + Some(Event::PeerConnected { peer_id }) => assert_eq!(peer_id, node1.enr.peer_id()), _ => panic!("Expected discovered peer event"), } shutdown_nodes(vec![bootnode, node0, node1]).await; } + +#[tokio::test] +#[traced_test] +async fn test_gossip() { + let mempool = H256::random(); + let (mut bootnode, mut node0) = setup_node_pair(vec![mempool]).await; + wait_for_pair_connect(&mut bootnode, &mut node0).await; + + bootnode + .action_sender + .send(Action::GossipMessage { + mempool_ids: vec![mempool], + message: GossipMessage::UserOperationsWithEntryPoint(UserOperationsWithEntryPoint { + entry_point_contract: Default::default(), + verified_at_block_hash: Default::default(), + chain_id: Default::default(), + user_operations: vec![UserOperation::default()], + }), + }) + .unwrap(); + + match node0.event_receiver.recv().await { + Some(Event::GossipMessage { + id: _, + peer_id, + mempool_id, + message, + }) => { + assert_eq!(peer_id, bootnode.enr.peer_id()); + assert_eq!(mempool_id, mempool); + match message { + GossipMessage::UserOperationsWithEntryPoint(uo) => { + assert_eq!(uo.user_operations.len(), 1); + } + } + } + _ => panic!("Expected gossip message received event"), + } + + shutdown_node_pair(bootnode, node0).await; +} + +#[tokio::test] +#[traced_test] +async fn test_gossip_topics() { + let mempool0 = H256::random(); + let mempool1 = H256::random(); + + let mut bootnode = setup_network(vec![], vec![mempool0, mempool1]).await; + let mut node0 = setup_network(vec![bootnode.enr.clone()], vec![mempool0]).await; + let mut node1 = setup_network(vec![bootnode.enr.clone()], vec![mempool1]).await; + + // TODO: unsure if there is as better way to do this without + // surfacing more information. + // This waits for the peers to connect, and then to subscribe to the gossip topics. + tokio::time::sleep(Duration::from_secs(5)).await; + + // Drain all the events thus far + while bootnode.event_receiver.try_recv().is_ok() {} + while node0.event_receiver.try_recv().is_ok() {} + while node1.event_receiver.try_recv().is_ok() {} + + // Send a message across mempools + bootnode + .action_sender + .send(Action::GossipMessage { + mempool_ids: vec![mempool0, mempool1], + message: GossipMessage::UserOperationsWithEntryPoint(UserOperationsWithEntryPoint { + entry_point_contract: Default::default(), + verified_at_block_hash: Default::default(), + chain_id: Default::default(), + user_operations: vec![UserOperation::default()], + }), + }) + .unwrap(); + + // Mempool 0 should receive + match node0.event_receiver.recv().await { + Some(Event::GossipMessage { + id: _, + peer_id, + mempool_id, + message, + }) => { + assert_eq!(peer_id, bootnode.enr.peer_id()); + assert_eq!(mempool_id, mempool0); + match message { + GossipMessage::UserOperationsWithEntryPoint(uo) => { + assert_eq!(uo.user_operations.len(), 1); + } + } + } + _ => panic!("Expected gossip message received event"), + } + + // TODO: libp2p will deduplicate this message before sending to the 2nd mempool topic. + // This is an issue with how the spec is defined. + // match node1.event_receiver.recv().await { + // Some(Event::GossipMessage { + // id: _, + // peer_id, + // mempool_id, + // message, + // }) => { + // assert_eq!(peer_id, bootnode.enr.peer_id()); + // assert_eq!(mempool_id, mempool1); + // match message { + // GossipMessage::UserOperationsWithEntryPoint(uo) => { + // assert_eq!(uo.user_operations.len(), 1); + // } + // } + // } + // _ => panic!("Expected gossip message received event"), + // } +}