Skip to content

Commit

Permalink
integrate new automatic bootstrap mechanism on bootnode
Browse files Browse the repository at this point in the history
  • Loading branch information
sh3ll3x3c committed Oct 29, 2024
1 parent db5eb49 commit 845c9e2
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 167 deletions.
1 change: 1 addition & 0 deletions bootstrap/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [0.4.0]

- Integrate upstream `rust-libp2p` `0.54` changes to the bootstrap process
- Add `/p2p/local/info` endpoint
- Add webrtc support to bootstrap

Expand Down
5 changes: 3 additions & 2 deletions bootstrap/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,10 @@ async fn run() -> Result<()> {
network_client
.add_bootstrap_nodes(cfg.bootstraps.iter().map(Into::into).collect())
.await?;
} else {
info!("No bootstrap list provided, starting client as the first bootstrap on the network.")
}
network_client.bootstrap().await?;
info!("Bootstrap done.");

loop_handle.await?;

Ok(())
Expand Down
6 changes: 4 additions & 2 deletions bootstrap/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ pub async fn init(
let kad_store = MemoryStore::new(id_keys.public().to_peer_id());
// create Kademlia Config
let mut kad_cfg = kad::Config::new(cfg.kademlia.protocol_name);
kad_cfg.set_query_timeout(cfg.kademlia.query_timeout);
kad_cfg
.set_query_timeout(cfg.kademlia.query_timeout)
.set_periodic_bootstrap_interval(Some(cfg.kademlia.bootstrap_interval));
// build the Swarm, connecting the lower transport logic with the
// higher layer network behaviour logic
let tokio_swarm = SwarmBuilder::with_existing_identity(id_keys.clone()).with_tokio();
Expand Down Expand Up @@ -112,7 +114,7 @@ pub async fn init(

Ok((
Client::new(command_sender),
EventLoop::new(swarm, command_receiver, cfg.bootstrap_interval),
EventLoop::new(swarm, command_receiver),
))
}

Expand Down
45 changes: 1 addition & 44 deletions bootstrap/src/p2p/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl Client {
.context("Sender not to be dropped.")?
}

// Checks if bootstraps are available and adds them to the routing table automatically triggering bootstrap process
pub async fn add_bootstrap_nodes(&self, nodes: Vec<(PeerId, Multiaddr)>) -> Result<()> {
for (peer, addr) in nodes {
self.dial_peer(peer, addr.clone())
Expand All @@ -74,43 +75,6 @@ impl Client {
Ok(())
}

pub async fn bootstrap(&self) -> Result<()> {
// bootstrapping is impossible on an empty DHT table
// at least one node is required to be known, so check
let counted_peers = self.count_dht_entries().await?;
// for a bootstrap to succeed, we need at least 1 peer in our DHT
if counted_peers < 1 {
// we'll have to wait, until some one successfully connects us
let (peer_id, multiaddr) = self.wait_connection(None).await?;
// add that peer to have someone to bootstrap with
self.add_address(peer_id, multiaddr).await?;
}

// proceed to bootstrap only if connected with someone
let (boot_res_sender, boot_res_receiver) = oneshot::channel();
self.command_sender
.send(Command::Bootstrap {
response_sender: boot_res_sender,
})
.await
.context("Command receiver should not be dropped while bootstrapping.")?;
boot_res_receiver
.await
.context("Sender not to be dropped while bootstrapping.")?
}

async fn wait_connection(&self, peer_id: Option<PeerId>) -> Result<(PeerId, Multiaddr)> {
let (connection_res_sender, connection_res_receiver) = oneshot::channel();
self.command_sender
.send(Command::WaitConnection {
peer_id,
response_sender: connection_res_sender,
})
.await
.context("Command receiver should not be dropped while waiting on connection.")?;
Ok(connection_res_receiver.await?)
}

pub async fn count_dht_entries(&self) -> Result<usize> {
let (response_sender, response_receiver) = oneshot::channel();
self.command_sender
Expand Down Expand Up @@ -155,13 +119,6 @@ pub enum Command {
multiaddr: Multiaddr,
response_sender: oneshot::Sender<Result<()>>,
},
Bootstrap {
response_sender: oneshot::Sender<Result<()>>,
},
WaitConnection {
peer_id: Option<PeerId>,
response_sender: oneshot::Sender<(PeerId, Multiaddr)>,
},
CountDHTPeers {
response_sender: oneshot::Sender<usize>,
},
Expand Down
137 changes: 20 additions & 117 deletions bootstrap/src/p2p/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,16 @@ use libp2p::{
autonat::{self, InboundProbeEvent, OutboundProbeEvent},
futures::StreamExt,
identify::{Event as IdentifyEvent, Info},
kad::{self, BootstrapOk, QueryId, QueryResult},
kad::{self, BootstrapOk, QueryResult},
multiaddr::Protocol,
swarm::SwarmEvent,
Multiaddr, PeerId, Swarm,
PeerId, Swarm,
};
use std::{
collections::{hash_map, HashMap},
str::FromStr,
time::Duration,
};
use tokio::{
sync::{mpsc, oneshot},
time::{interval_at, Instant, Interval},
};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info, trace};

use crate::types::AgentVersion;
Expand All @@ -26,49 +22,24 @@ use super::{
Behaviour, BehaviourEvent,
};

enum QueryChannel {
Bootstrap(oneshot::Sender<Result<()>>),
}

enum SwarmChannel {
Dial(oneshot::Sender<Result<()>>),
ConnectionEstablished(oneshot::Sender<(PeerId, Multiaddr)>),
}

// BootstrapState keeps track of all things bootstrap related
struct BootstrapState {
// referring to this initial bootstrap process,
// one that runs when this node starts up
is_startup_done: bool,
// timer that is responsible for firing periodic bootstraps
timer: Interval,
}

pub struct EventLoop {
swarm: Swarm<Behaviour>,
command_receiver: mpsc::Receiver<Command>,
pending_kad_queries: HashMap<QueryId, QueryChannel>,
pending_kad_routing: HashMap<PeerId, oneshot::Sender<Result<()>>>,
pending_swarm_events: HashMap<PeerId, SwarmChannel>,
bootstrap: BootstrapState,
}

impl EventLoop {
pub fn new(
swarm: Swarm<Behaviour>,
command_receiver: mpsc::Receiver<Command>,
bootstrap_interval: Duration,
) -> Self {
pub fn new(swarm: Swarm<Behaviour>, command_receiver: mpsc::Receiver<Command>) -> Self {
Self {
swarm,
command_receiver,
pending_kad_queries: Default::default(),
pending_kad_routing: Default::default(),
pending_swarm_events: Default::default(),
bootstrap: BootstrapState {
is_startup_done: false,
timer: interval_at(Instant::now() + bootstrap_interval, bootstrap_interval),
},
}
}

Expand All @@ -82,7 +53,6 @@ impl EventLoop {
// shutting down whole network event loop
None => return,
},
_ = self.bootstrap.timer.tick() => self.handle_periodic_bootstraps(),
}
}
}
Expand All @@ -103,35 +73,21 @@ impl EventLoop {
}
},
kad::Event::OutboundQueryProgressed {
id,
result: QueryResult::Bootstrap(bootstrap_result),
..
} => {
match bootstrap_result {
Ok(BootstrapOk {
peer,
num_remaining,
}) => {
trace!("BootstrapOK event. PeerID: {peer:?}. Num remaining: {num_remaining:?}.");
if num_remaining == 0 {
if let Some(QueryChannel::Bootstrap(ch)) =
self.pending_kad_queries.remove(&id)
{
_ = ch.send(Ok(()));
// we can say that the initial bootstrap at initialization is done
self.bootstrap.is_startup_done = true;
}
}
},
Err(err) => {
trace!("Bootstrap error event. Error: {err:?}.");
if let Some(QueryChannel::Bootstrap(ch)) =
self.pending_kad_queries.remove(&id)
{
_ = ch.send(Err(err.into()));
}
},
}
} => match bootstrap_result {
Ok(BootstrapOk {
peer,
num_remaining,
}) => {
debug!("BootstrapOK event. PeerID: {peer:?}. Num remaining: {num_remaining:?}.");
if num_remaining == 0 {
debug!("Bootstrap complete!");
}
},
Err(err) => {
debug!("Bootstrap error event. Error: {err:?}.");
},
},
_ => {},
},
Expand Down Expand Up @@ -238,23 +194,10 @@ impl EventLoop {
SwarmEvent::ConnectionEstablished {
endpoint, peer_id, ..
} => {
// while waiting for a first successful connection,
// we're interested in a case where we are dialing back
if endpoint.is_dialer() {
if let Some(event) = self.pending_swarm_events.remove(&peer_id) {
match event {
// check if there is a command waiting for a response for established 1st connection
SwarmChannel::ConnectionEstablished(ch) => {
// signal back that we have successfully established a connection,
// give us back PeerId and Multiaddress
let addr = endpoint.get_remote_address().to_owned();
_ = ch.send((peer_id, addr));
},
SwarmChannel::Dial(ch) => {
// signal back that dial was a success
_ = ch.send(Ok(()));
},
}
if let Some(SwarmChannel::Dial(ch)) = self.pending_swarm_events.remove(&peer_id)
{
_ = ch.send(Ok(()));
}
}
},
Expand Down Expand Up @@ -309,38 +252,6 @@ impl EventLoop {
.add_address(&peer_id, multiaddr);
self.pending_kad_routing.insert(peer_id, response_sender);
},
Command::Bootstrap { response_sender } => {
match self.swarm.behaviour_mut().kademlia.bootstrap() {
Ok(query_id) => {
self.pending_kad_queries
.insert(query_id, QueryChannel::Bootstrap(response_sender));
},
// no available peers for bootstrap
// send error immediately through response channel
Err(err) => {
_ = response_sender.send(Err(err.into()));
},
}
},
Command::WaitConnection {
peer_id,
response_sender,
} => match peer_id {
// this means that we're waiting on a connection from
// a peer with provided PeerId
Some(id) => {
self.pending_swarm_events
.insert(id, SwarmChannel::ConnectionEstablished(response_sender));
},
// sending no particular PeerId means that we're
// waiting someone to establish connection with us
None => {
self.pending_swarm_events.insert(
self.swarm.local_peer_id().to_owned(),
SwarmChannel::ConnectionEstablished(response_sender),
);
},
},
Command::CountDHTPeers { response_sender } => {
let mut total_peers: usize = 0;
for bucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
Expand Down Expand Up @@ -370,12 +281,4 @@ impl EventLoop {
},
}
}

fn handle_periodic_bootstraps(&mut self) {
// periodic bootstraps should only start after the initial one is done
if self.bootstrap.is_startup_done {
debug!("Starting periodic Bootstrap.");
_ = self.swarm.behaviour_mut().kademlia.bootstrap();
}
}
}
4 changes: 2 additions & 2 deletions bootstrap/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ pub struct LibP2PConfig {
pub identify: IdentifyConfig,
pub kademlia: KademliaConfig,
pub secret_key: Option<SecretKey>,
pub bootstrap_interval: Duration,
}

impl From<&RuntimeConfig> for LibP2PConfig {
Expand All @@ -124,7 +123,6 @@ impl From<&RuntimeConfig> for LibP2PConfig {
identify: IdentifyConfig::new(),
kademlia: rtcfg.into(),
secret_key: rtcfg.secret_key.clone(),
bootstrap_interval: Duration::from_secs(rtcfg.bootstrap_period),
}
}
}
Expand All @@ -133,6 +131,7 @@ impl From<&RuntimeConfig> for LibP2PConfig {
pub struct KademliaConfig {
pub query_timeout: Duration,
pub protocol_name: StreamProtocol,
pub bootstrap_interval: Duration,
}

impl From<&RuntimeConfig> for KademliaConfig {
Expand All @@ -149,6 +148,7 @@ impl From<&RuntimeConfig> for KademliaConfig {
KademliaConfig {
query_timeout: Duration::from_secs(val.kad_query_timeout.into()),
protocol_name,
bootstrap_interval: Duration::from_secs(val.bootstrap_period),
}
}
}
Expand Down

0 comments on commit 845c9e2

Please sign in to comment.