Skip to content

Commit

Permalink
Merge pull request lightningdevkit#266 from tnull/2024-03-introduce-c…
Browse files Browse the repository at this point in the history
…onnection-manager

Improve connection management
  • Loading branch information
tnull authored Apr 25, 2024
2 parents 0bf24c8 + 77c538b commit 640a1fd
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 69 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ bip39 = "2.0.0"

rand = "0.8.5"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
futures = "0.3"
tokio = { version = "1", default-features = false, features = [ "rt-multi-thread", "time", "sync" ] }
esplora-client = { version = "0.6", default-features = false }
libc = "0.2"
Expand Down
5 changes: 5 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::config::{
Config, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, DEFAULT_ESPLORA_SERVER_URL,
WALLET_KEYS_SEED_LEN,
};
use crate::connection::ConnectionManager;
use crate::event::EventQueue;
use crate::fee_estimator::OnchainFeeEstimator;
use crate::gossip::GossipSource;
Expand Down Expand Up @@ -895,6 +896,9 @@ fn build_with_store_internal(

liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager)));

let connection_manager =
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));

let output_sweeper = match io::utils::read_output_sweeper(
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Expand Down Expand Up @@ -991,6 +995,7 @@ fn build_with_store_internal(
chain_monitor,
output_sweeper,
peer_manager,
connection_manager,
keys_manager,
network_graph,
gossip_source,
Expand Down
147 changes: 147 additions & 0 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use crate::logger::{log_error, log_info, Logger};
use crate::types::PeerManager;
use crate::Error;

use lightning::ln::msgs::SocketAddress;

use bitcoin::secp256k1::PublicKey;

use std::collections::hash_map::{self, HashMap};
use std::net::ToSocketAddrs;
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use std::time::Duration;

pub(crate) struct ConnectionManager<L: Deref + Clone + Sync + Send>
where
L::Target: Logger,
{
pending_connections:
Mutex<HashMap<PublicKey, Vec<tokio::sync::oneshot::Sender<Result<(), Error>>>>>,
peer_manager: Arc<PeerManager>,
logger: L,
}

impl<L: Deref + Clone + Sync + Send> ConnectionManager<L>
where
L::Target: Logger,
{
pub(crate) fn new(peer_manager: Arc<PeerManager>, logger: L) -> Self {
let pending_connections = Mutex::new(HashMap::new());
Self { pending_connections, peer_manager, logger }
}

pub(crate) async fn connect_peer_if_necessary(
&self, node_id: PublicKey, addr: SocketAddress,
) -> Result<(), Error> {
if self.peer_manager.peer_by_node_id(&node_id).is_some() {
return Ok(());
}

self.do_connect_peer(node_id, addr).await
}

pub(crate) async fn do_connect_peer(
&self, node_id: PublicKey, addr: SocketAddress,
) -> Result<(), Error> {
// First, we check if there is already an outbound connection in flight, if so, we just
// await on the corresponding watch channel. The task driving the connection future will
// send us the result..
let pending_ready_receiver_opt = self.register_or_subscribe_pending_connection(&node_id);
if let Some(pending_connection_ready_receiver) = pending_ready_receiver_opt {
return pending_connection_ready_receiver.await.map_err(|e| {
debug_assert!(false, "Failed to receive connection result: {:?}", e);
log_error!(self.logger, "Failed to receive connection result: {:?}", e);
Error::ConnectionFailed
})?;
}

log_info!(self.logger, "Connecting to peer: {}@{}", node_id, addr);

let socket_addr = addr
.to_socket_addrs()
.map_err(|e| {
log_error!(self.logger, "Failed to resolve network address {}: {}", addr, e);
self.propagate_result_to_subscribers(&node_id, Err(Error::InvalidSocketAddress));
Error::InvalidSocketAddress
})?
.next()
.ok_or_else(|| {
log_error!(self.logger, "Failed to resolve network address {}", addr);
self.propagate_result_to_subscribers(&node_id, Err(Error::InvalidSocketAddress));
Error::InvalidSocketAddress
})?;

let connection_future = lightning_net_tokio::connect_outbound(
Arc::clone(&self.peer_manager),
node_id,
socket_addr,
);

let res = match connection_future.await {
Some(connection_closed_future) => {
let mut connection_closed_future = Box::pin(connection_closed_future);
loop {
tokio::select! {
_ = &mut connection_closed_future => {
log_info!(self.logger, "Peer connection closed: {}@{}", node_id, addr);
break Err(Error::ConnectionFailed);
},
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
};

match self.peer_manager.peer_by_node_id(&node_id) {
Some(_) => break Ok(()),
None => continue,
}
}
},
None => {
log_error!(self.logger, "Failed to connect to peer: {}@{}", node_id, addr);
Err(Error::ConnectionFailed)
},
};

self.propagate_result_to_subscribers(&node_id, res);

res
}

fn register_or_subscribe_pending_connection(
&self, node_id: &PublicKey,
) -> Option<tokio::sync::oneshot::Receiver<Result<(), Error>>> {
let mut pending_connections_lock = self.pending_connections.lock().unwrap();
match pending_connections_lock.entry(*node_id) {
hash_map::Entry::Occupied(mut entry) => {
let (tx, rx) = tokio::sync::oneshot::channel();
entry.get_mut().push(tx);
Some(rx)
},
hash_map::Entry::Vacant(entry) => {
entry.insert(Vec::new());
None
},
}
}

fn propagate_result_to_subscribers(&self, node_id: &PublicKey, res: Result<(), Error>) {
// Send the result to any other tasks that might be waiting on it by now.
let mut pending_connections_lock = self.pending_connections.lock().unwrap();
if let Some(connection_ready_senders) = pending_connections_lock.remove(node_id) {
for sender in connection_ready_senders {
let _ = sender.send(res).map_err(|e| {
debug_assert!(
false,
"Failed to send connection result to subscribers: {:?}",
e
);
log_error!(
self.logger,
"Failed to send connection result to subscribers: {:?}",
e
);
});
}
}
}
}
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt;

#[derive(Debug, PartialEq, Eq)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// An error that possibly needs to be handled by the user.
pub enum Error {
/// Returned when trying to start [`crate::Node`] while it is already running.
Expand Down
4 changes: 2 additions & 2 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl Future for EventFuture {
}
}

pub(crate) struct EventHandler<L: Deref>
pub(crate) struct EventHandler<L: Deref + Clone + Sync + Send + 'static>
where
L::Target: Logger,
{
Expand All @@ -307,7 +307,7 @@ where
config: Arc<Config>,
}

impl<L: Deref> EventHandler<L>
impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
where
L::Target: Logger,
{
Expand Down
76 changes: 11 additions & 65 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
mod balance;
mod builder;
mod config;
mod connection;
mod error;
mod event;
mod fee_estimator;
Expand Down Expand Up @@ -124,6 +125,7 @@ use config::{
LDK_PAYMENT_RETRY_TIMEOUT, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL,
RGS_SYNC_INTERVAL, WALLET_SYNC_INTERVAL_MINIMUM_SECS,
};
use connection::ConnectionManager;
use event::{EventHandler, EventQueue};
use gossip::GossipSource;
use liquidity::LiquiditySource;
Expand Down Expand Up @@ -187,6 +189,7 @@ pub struct Node {
chain_monitor: Arc<ChainMonitor>,
output_sweeper: Arc<Sweeper>,
peer_manager: Arc<PeerManager>,
connection_manager: Arc<ConnectionManager<Arc<FilesystemLogger>>>,
keys_manager: Arc<KeysManager>,
network_graph: Arc<NetworkGraph>,
gossip_source: Arc<GossipSource>,
Expand Down Expand Up @@ -498,6 +501,7 @@ impl Node {
}

// Regularly reconnect to persisted peers.
let connect_cm = Arc::clone(&self.connection_manager);
let connect_pm = Arc::clone(&self.peer_manager);
let connect_logger = Arc::clone(&self.logger);
let connect_peer_store = Arc::clone(&self.peer_store);
Expand All @@ -518,11 +522,9 @@ impl Node {
.collect::<Vec<_>>();

for peer_info in connect_peer_store.list_peers().iter().filter(|info| !pm_peers.contains(&info.node_id)) {
let res = do_connect_peer(
let res = connect_cm.do_connect_peer(
peer_info.node_id,
peer_info.address.clone(),
Arc::clone(&connect_pm),
Arc::clone(&connect_logger),
).await;
match res {
Ok(_) => {
Expand Down Expand Up @@ -871,14 +873,13 @@ impl Node {

let con_node_id = peer_info.node_id;
let con_addr = peer_info.address.clone();
let con_logger = Arc::clone(&self.logger);
let con_pm = Arc::clone(&self.peer_manager);
let con_cm = Arc::clone(&self.connection_manager);

// We need to use our main runtime here as a local runtime might not be around to poll
// connection futures going forward.
tokio::task::block_in_place(move || {
runtime.block_on(async move {
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
})
})?;

Expand Down Expand Up @@ -944,14 +945,13 @@ impl Node {

let con_node_id = peer_info.node_id;
let con_addr = peer_info.address.clone();
let con_logger = Arc::clone(&self.logger);
let con_pm = Arc::clone(&self.peer_manager);
let con_cm = Arc::clone(&self.connection_manager);

// We need to use our main runtime here as a local runtime might not be around to poll
// connection futures going forward.
tokio::task::block_in_place(move || {
runtime.block_on(async move {
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
})
})?;

Expand Down Expand Up @@ -1601,14 +1601,13 @@ impl Node {

let con_node_id = peer_info.node_id;
let con_addr = peer_info.address.clone();
let con_logger = Arc::clone(&self.logger);
let con_pm = Arc::clone(&self.peer_manager);
let con_cm = Arc::clone(&self.connection_manager);

// We need to use our main runtime here as a local runtime might not be around to poll
// connection futures going forward.
tokio::task::block_in_place(move || {
runtime.block_on(async move {
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
})
})?;

Expand Down Expand Up @@ -1849,56 +1848,3 @@ pub struct NodeStatus {
/// Will be `None` if we have no public channels or we haven't broadcasted since the [`Node`] was initialized.
pub latest_node_announcement_broadcast_timestamp: Option<u64>,
}

async fn connect_peer_if_necessary(
node_id: PublicKey, addr: SocketAddress, peer_manager: Arc<PeerManager>,
logger: Arc<FilesystemLogger>,
) -> Result<(), Error> {
if peer_manager.peer_by_node_id(&node_id).is_some() {
return Ok(());
}

do_connect_peer(node_id, addr, peer_manager, logger).await
}

async fn do_connect_peer(
node_id: PublicKey, addr: SocketAddress, peer_manager: Arc<PeerManager>,
logger: Arc<FilesystemLogger>,
) -> Result<(), Error> {
log_info!(logger, "Connecting to peer: {}@{}", node_id, addr);

let socket_addr = addr
.to_socket_addrs()
.map_err(|e| {
log_error!(logger, "Failed to resolve network address: {}", e);
Error::InvalidSocketAddress
})?
.next()
.ok_or(Error::ConnectionFailed)?;

match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), node_id, socket_addr)
.await
{
Some(connection_closed_future) => {
let mut connection_closed_future = Box::pin(connection_closed_future);
loop {
match futures::poll!(&mut connection_closed_future) {
std::task::Poll::Ready(_) => {
log_info!(logger, "Peer connection closed: {}@{}", node_id, addr);
return Err(Error::ConnectionFailed);
},
std::task::Poll::Pending => {},
}
// Avoid blocking the tokio context by sleeping a bit
match peer_manager.peer_by_node_id(&node_id) {
Some(_) => return Ok(()),
None => tokio::time::sleep(Duration::from_millis(10)).await,
}
}
},
None => {
log_error!(logger, "Failed to connect to peer: {}@{}", node_id, addr);
Err(Error::ConnectionFailed)
},
}
}
30 changes: 30 additions & 0 deletions tests/integration_tests_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,33 @@ fn do_connection_restart_behavior(persist: bool) {
assert!(node_b.list_peers().is_empty());
}
}

#[test]
fn concurrent_connections_succeed() {
let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd();
let (node_a, node_b) = setup_two_nodes(&electrsd, false);

let node_a = Arc::new(node_a);
let node_b = Arc::new(node_b);

let node_id_b = node_b.node_id();
let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone();

while !node_b.status().is_listening {
std::thread::sleep(std::time::Duration::from_millis(10));
}

let mut handles = Vec::new();
for _ in 0..10 {
let thread_node = Arc::clone(&node_a);
let thread_addr = node_addr_b.clone();
let handle = std::thread::spawn(move || {
thread_node.connect(node_id_b, thread_addr, false).unwrap();
});
handles.push(handle);
}

for h in handles {
h.join().unwrap();
}
}

0 comments on commit 640a1fd

Please sign in to comment.