Skip to content

Commit

Permalink
feat(network): discv5 discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Oct 11, 2023
1 parent 0eb3059 commit 6c48301
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 35 deletions.
13 changes: 3 additions & 10 deletions bin/tools/src/bin/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use std::{net::SocketAddr, time::Duration};
use std::net::SocketAddr;

use clap::Parser;
use ethers::types::H256;
use rundler_network::{Config as NetworkConfig, ConnectionConfig, Network};
use rundler_network::{Config as NetworkConfig, Network};
use tokio::sync::mpsc;

const PRIVATE_KEY: &str = "b0ddfec7d365b4599ff8367e960f8c4890364f99e2151beac352338cc0cfe1bc";
Expand Down Expand Up @@ -45,13 +44,7 @@ async fn main() -> anyhow::Result<()> {
private_key,
listen_address,
bootnodes,
network_config: ConnectionConfig {
max_chunk_size: 1048576,
request_timeout: Duration::from_secs(10),
ttfb_timeout: Duration::from_secs(5),
},
supported_mempools: vec![H256::random()],
metadata_seq_number: 0,
..Default::default()
};

let (_, action_recv) = mpsc::unbounded_channel();
Expand Down
4 changes: 3 additions & 1 deletion crates/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ use libp2p::{
swarm::{keep_alive, NetworkBehaviour},
};

use crate::rpc;
use crate::{discovery, rpc};

#[derive(NetworkBehaviour)]
pub(crate) struct Behaviour {
// TODO(danc): temp, remove when not needed
pub(crate) keep_alive: keep_alive::Behaviour,
// Request/response protocol
pub(crate) rpc: rpc::Behaviour,
// Discv5 based discovery protocol
pub(crate) discovery: discovery::Behaviour,
// Identity protocol
pub(crate) identify: identify::Behaviour,
}
190 changes: 190 additions & 0 deletions crates/network/src/discovery/behaviour.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// This file is part of Rundler.
//
// Rundler is free software: you can redistribute it and/or modify it under the
// terms of the GNU Lesser General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later version.
//
// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use std::{
pin::Pin,
task::{Context, Poll, Waker},
};

use discv5::{
enr::{CombinedKey, NodeId},
Discv5, Discv5ConfigBuilder, Discv5Error, Discv5Event, Enr, ListenConfig, QueryError,
};
use futures::Future;
use libp2p::{
core::Endpoint,
swarm::{
dummy::ConnectionHandler, ConnectionDenied, ConnectionId, DialFailure, FromSwarm,
NetworkBehaviour, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId,
};
use tokio::{sync::mpsc, time::Interval};
use tracing::{debug, error};

use crate::{Config, Error, Result as NetworkResult};

#[derive(Debug)]
pub(crate) struct DiscoveredPeers {
pub(crate) peers: Vec<Enr>,
}

type QueryFuture = Pin<Box<dyn Future<Output = Result<Vec<Enr>, QueryError>> + Send>>;

pub(crate) struct Behaviour {
discv5: Discv5,
query_fut: Option<QueryFuture>,
event_stream: mpsc::Receiver<Discv5Event>,
waker: Option<Waker>,

// TODO move this to peer manager
tick: Interval,
target_num_peers: usize,
}

impl Behaviour {
pub(crate) async fn new(
config: &Config,
enr: Enr,
enr_key: CombinedKey,
) -> NetworkResult<Self> {
let listen_config =
ListenConfig::from_ip(config.listen_address.ip(), config.listen_address.port() + 1);
debug!("Discv5 listening on {:?}", listen_config);

let discv5_config = Discv5ConfigBuilder::new(listen_config).build();
let mut discv5 = Discv5::new(enr, enr_key, discv5_config)
.map_err(|e| Error::Discovery(Discv5Error::Custom(e)))?;

for bootnode in config.bootnodes.iter() {
discv5
.add_enr(bootnode.clone())
.map_err(|e| Error::Discovery(Discv5Error::Custom(e)))?;
}

discv5.start().await.map_err(Error::Discovery)?;
let event_stream = discv5.event_stream().await.map_err(Error::Discovery)?;

Ok(Self {
discv5,
query_fut: None,
event_stream,
waker: None,
tick: tokio::time::interval(config.tick_interval),
target_num_peers: config.target_num_peers,
})
}

pub(crate) fn start_query(&mut self, num_peers: usize) {
if self.query_fut.is_some() {
return;
}

let predicate = |enr: &Enr| enr.tcp4().is_some() || enr.tcp6().is_some();

debug!("Starting discovery query");
self.query_fut = Some(Box::pin(self.discv5.find_node_predicate(
NodeId::random(),
Box::new(predicate),
num_peers,
)));
self.wake();
}

fn wake(&mut self) {
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = ConnectionHandler;
type ToSwarm = DiscoveredPeers;

fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(ConnectionHandler)
}

fn handle_established_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_addr: &Multiaddr,
_role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(ConnectionHandler)
}

// TODO manage banned peers

// TODO manage ENR updates

fn on_swarm_event(&mut self, event: FromSwarm<'_, Self::ConnectionHandler>) {
if let FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) = event {
debug!("Dial failure to peer {:?}: {:?}", peer_id, error);
}
}

fn on_connection_handler_event(
&mut self,
_peer_id: PeerId,
_connection_id: ConnectionId,
_event: THandlerOutEvent<Self>,
) {
// Do nothing
}

fn poll(
&mut self,
cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(fut) = &mut self.query_fut {
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(peers)) => {
self.query_fut = None;
debug!("Query finished, found {} peers", peers.len());
return Poll::Ready(ToSwarm::GenerateEvent(DiscoveredPeers { peers }));
}
Poll::Ready(Err(e)) => {
self.query_fut = None;
error!("Query error: {:?}", e);
}
Poll::Pending => {}
}
}

while let Poll::Ready(Some(event)) = self.event_stream.poll_recv(cx) {
// These aren't currently useful, peers are discovered in queries
match event {
Discv5Event::Discovered(e) => debug!("Discovered peers in event: {:?}", e),
e => debug!("Discovery event: {:?}", e),
}
}

if self.tick.poll_tick(cx).is_ready() {
self.tick.reset();
self.start_query(self.target_num_peers)
}

self.waker = Some(cx.waker().clone());
Poll::Pending
}
}
15 changes: 11 additions & 4 deletions crates/network/src/enr.rs → crates/network/src/discovery/enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

// Adapted from https://github.com/sigp/lighthouse/blob/stable/beacon_node/lighthouse_network/src/discovery/enr_ext.rs
//! Utilities for working with Ethereum Network Records (ENRs).
//!
//! Adapted from https://github.com/sigp/lighthouse/blob/stable/beacon_node/lighthouse_network/src/discovery/enr_ext.rs

use discv5::{
enr::{k256, CombinedKey, CombinedPublicKey, EnrBuilder},
Expand All @@ -25,7 +27,8 @@ use libp2p::{

use crate::Config;

pub(crate) fn build_enr(config: &Config) -> (Enr, CombinedKey) {
/// Build an ENR from the given configuration.
pub fn build_enr(config: &Config) -> (Enr, CombinedKey) {
let enr_key: CombinedKey =
k256::ecdsa::SigningKey::from_slice(&hex::decode(&config.private_key).unwrap())
.unwrap()
Expand All @@ -41,9 +44,12 @@ pub(crate) fn build_enr(config: &Config) -> (Enr, CombinedKey) {
(enr, enr_key)
}

pub(crate) trait EnrExt {
/// Extension trait for ENR.
pub trait EnrExt {
/// Returns the multiaddr of the ENR.
fn multiaddr(&self) -> Vec<Multiaddr>;

/// Returns the peer id of the ENR.
fn peer_id(&self) -> PeerId;
}

Expand All @@ -70,7 +76,8 @@ impl EnrExt for Enr {
}
}

pub(crate) trait CombinedKeyPublicExt {
/// Extension trait for CombinedPublicKey
pub trait CombinedKeyPublicExt {
/// Converts the publickey into a peer id, without consuming the key.
fn as_peer_id(&self) -> PeerId;
}
Expand Down
17 changes: 17 additions & 0 deletions crates/network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// This file is part of Rundler.
//
// Rundler is free software: you can redistribute it and/or modify it under the
// terms of the GNU Lesser General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later version.
//
// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

mod behaviour;
pub(crate) use behaviour::Behaviour;

pub mod enr;
6 changes: 5 additions & 1 deletion crates/network/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

/// Network errors
#[derive(thiserror::Error, Debug)]
pub enum Error {}
pub enum Error {
/// Discovery error
#[error("Discovery error: {0}")]
Discovery(discv5::Discv5Error),
}

/// Network result
pub type Result<T> = std::result::Result<T, Error>;
22 changes: 11 additions & 11 deletions crates/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@
//! Lots of inspiration for the components of this implementation were taken from the Lighthouse implementation
//! of the Ethereum consensus layer p2p protocol. See [here](https://github.com/sigp/lighthouse/) for more details.

mod rpc;
pub use rpc::{
message::{
ErrorKind as ResponseErrorKind, PooledUserOpHashesRequest, PooledUserOpHashesResponse,
PooledUserOpsByHashRequest, PooledUserOpsByHashResponse, MAX_OPS_PER_REQUEST,
},
ConnectionConfig,
};

mod behaviour;
mod discovery;
pub use discovery::enr;

mod network;
pub use network::{
Action, AppRequest, AppRequestId, AppResponse, AppResponseResult, Config, Event, Network,
};

mod enr;

mod error;
pub use error::{Error, Result};

mod rpc;
pub use rpc::{
message::{
ErrorKind as ResponseErrorKind, PooledUserOpHashesRequest, PooledUserOpHashesResponse,
PooledUserOpsByHashRequest, PooledUserOpsByHashResponse, MAX_OPS_PER_REQUEST,
},
ConnectionConfig,
};
Loading

0 comments on commit 6c48301

Please sign in to comment.