From 4e54c8d16c07550e9198b4d600be7f5e958161f1 Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Mon, 12 Dec 2022 16:03:41 +0100 Subject: [PATCH] chore: Introduce dedicated runtime Annotating the function with `#[tokio::main]` killed the runtime after the function completes. Thus we had to block the run function indefinitely for the long living threads to keep going. This change introduces a runtime managed outside of the function scope and thus allows the `run` function to return bringing the following advantages. - We don't block a whole frb worker thread just to run the lightning node, sync tasks, background processor, etc. - We are using a multi threaded runtime instead of the current thread - allowing to actually join the background processor without blocking all other tasks. - making better use of multiple cpu cores. - We are not creating a new runtime on every async bridge call. --- rust/src/api.rs | 244 ++++++++++++++++++++++------------------- rust/src/connection.rs | 56 ++++------ rust/src/lightning.rs | 6 +- rust/src/offer.rs | 13 --- rust/src/wallet.rs | 12 +- 5 files changed, 162 insertions(+), 169 deletions(-) diff --git a/rust/src/api.rs b/rust/src/api.rs index 4e656de5..2463ecad 100644 --- a/rust/src/api.rs +++ b/rust/src/api.rs @@ -13,7 +13,6 @@ use crate::offer::Offer; use crate::wallet; use crate::wallet::Balance; use crate::wallet::LightningTransaction; -use anyhow::anyhow; use anyhow::Context; use anyhow::Result; use flutter_rust_bridge::StreamSink; @@ -22,13 +21,14 @@ use lightning_invoice::Invoice; use lightning_invoice::InvoiceDescription; use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; +use state::Storage; use std::ops::Add; use std::path::Path; use std::str::FromStr; use std::time::SystemTime; use time::Duration; pub use time::OffsetDateTime; -use tokio::try_join; +use tokio::runtime::Runtime; pub struct Address { pub address: String, @@ -168,91 +168,114 @@ impl WalletInfo { Ok(tx_history) } } +/// Lazily creates a multi threaded runtime with the the number of worker threads corresponding to +/// the number of available cores. +fn runtime() -> Result<&'static Runtime> { + static RUNTIME: Storage = Storage::new(); -#[tokio::main(flavor = "current_thread")] -pub async fn refresh_wallet_info() -> Result { - wallet::sync()?; - WalletInfo::build_wallet_info().await + if RUNTIME.try_get().is_none() { + let runtime = Runtime::new()?; + RUNTIME.set(runtime); + } + + Ok(RUNTIME.get()) +} + +pub fn refresh_wallet_info() -> Result { + runtime()?.block_on(async { + wallet::sync()?; + WalletInfo::build_wallet_info().await + }) } -#[tokio::main(flavor = "current_thread")] -pub async fn run(stream: StreamSink, app_dir: String) -> Result<()> { +pub fn run(stream: StreamSink, app_dir: String) -> Result<()> { let network = config::network(); anyhow::ensure!(!app_dir.is_empty(), "app_dir must not be empty"); - stream.add(Event::Init(format!("Initialising {network} wallet"))); - wallet::init_wallet(Path::new(app_dir.as_str()))?; - - stream.add(Event::Init("Initialising database".to_string())); - db::init_db( - &Path::new(app_dir.as_str()) - .join(network.to_string()) - .join("taker.sqlite"), - ) - .await?; - - stream.add(Event::Init("Starting full ldk node".to_string())); - let background_processor = wallet::run_ldk().await?; - - stream.add(Event::Init("Fetching an offer".to_string())); - stream.add(Event::Offer(offer::get_offer().await.ok())); - - stream.add(Event::Init("Fetching your balance".to_string())); - stream.add(Event::WalletInfo( - WalletInfo::build_wallet_info().await.ok(), - )); - stream.add(Event::Init("Checking channel state".to_string())); - stream.add(Event::ChannelState(get_channel_state())); - - stream.add(Event::Init("Ready".to_string())); - stream.add(Event::Ready); - - // spawn a connection task keeping the connection with the maker alive. - let peer_manager = wallet::get_peer_manager()?; - let connection_handle = connection::spawn(peer_manager); - - // sync offers every 5 seconds - let offer_handle = offer::spawn(stream.clone()); - - // sync wallet every 60 seconds - let wallet_sync_handle = tokio::spawn(async { - loop { - wallet::sync().unwrap_or_else(|e| tracing::error!(?e, "Failed to sync wallet")); - tokio::time::sleep(std::time::Duration::from_secs(60)).await; - } - }); - - // sync wallet info every 10 seconds - let wallet_info_stream = stream.clone(); - let wallet_info_sync_handle = tokio::spawn(async move { - loop { - match WalletInfo::build_wallet_info().await { - Ok(wallet_info) => { - let _ = wallet_info_stream.add(Event::WalletInfo(Some(wallet_info))); + let runtime = runtime()?; + runtime.block_on(async move { + stream.add(Event::Init(format!("Initialising {network} wallet"))); + wallet::init_wallet(Path::new(app_dir.as_str()))?; + + stream.add(Event::Init("Initialising database".to_string())); + db::init_db( + &Path::new(app_dir.as_str()) + .join(network.to_string()) + .join("taker.sqlite"), + ) + .await?; + + stream.add(Event::Init("Starting full ldk node".to_string())); + let background_processor = wallet::run_ldk()?; + + stream.add(Event::Init("Fetching an offer".to_string())); + stream.add(Event::Offer(offer::get_offer().await.ok())); + + stream.add(Event::Init("Fetching your balance".to_string())); + stream.add(Event::WalletInfo( + WalletInfo::build_wallet_info().await.ok(), + )); + stream.add(Event::Init("Checking channel state".to_string())); + stream.add(Event::ChannelState(get_channel_state())); + + stream.add(Event::Init("Ready".to_string())); + stream.add(Event::Ready); + + // spawn a connection task keeping the connection with the maker alive. + runtime.spawn(async move { + let peer_info = config::maker_peer_info(); + loop { + let peer_manager = wallet::get_peer_manager(); + connection::connect(peer_manager, peer_info).await; + // add a delay before retrying to connect + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + }); + + let offer_stream = stream.clone(); + runtime.spawn(async move { + loop { + offer_stream.add(Event::Offer(offer::get_offer().await.ok())); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + }); + + runtime.spawn(async { + loop { + wallet::sync().unwrap_or_else(|e| tracing::error!(?e, "Failed to sync wallet")); + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + } + }); + + let wallet_info_stream = stream.clone(); + runtime.spawn(async move { + loop { + match WalletInfo::build_wallet_info().await { + Ok(wallet_info) => { + let _ = wallet_info_stream.add(Event::WalletInfo(Some(wallet_info))); + } + Err(e) => tracing::error!(?e, "Failed to build wallet info"), } - Err(e) => tracing::error!(?e, "Failed to build wallet info"), + tokio::time::sleep(std::time::Duration::from_secs(10)).await; } - tokio::time::sleep(std::time::Duration::from_secs(10)).await; - } - }); - - // sync channel state every 5 seconds - let channel_state_stream = stream.clone(); - let channel_state_handle = tokio::spawn(async move { - loop { - channel_state_stream.add(Event::ChannelState(get_channel_state())); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - }); + }); - try_join!( - connection_handle, - offer_handle, - wallet_sync_handle, - wallet_info_sync_handle, - channel_state_handle, - )?; + let channel_state_stream = stream.clone(); + runtime.spawn(async move { + loop { + channel_state_stream.add(Event::ChannelState(get_channel_state())); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + }); - background_processor.join().map_err(|e| anyhow!(e)) + runtime.spawn_blocking(move || { + // background processor joins on a sync thread, meaning that join here will block a + // full thread, which is dis-encouraged to do in async code. + if let Err(err) = background_processor.join() { + tracing::error!(?err, "Background processor stopped unexpected"); + } + }); + Ok(()) + }) } pub fn get_balance() -> Result { @@ -279,16 +302,18 @@ pub fn network() -> SyncReturn { SyncReturn(config::network().to_string()) } -#[tokio::main(flavor = "current_thread")] -pub async fn open_channel(taker_amount: u64) -> Result<()> { - let peer_info = config::maker_peer_info(); - wallet::open_channel(peer_info, taker_amount).await +pub fn open_channel(taker_amount: u64) -> Result<()> { + runtime()?.block_on(async { + let peer_info = config::maker_peer_info(); + wallet::open_channel(peer_info, taker_amount).await + }) } -#[tokio::main(flavor = "current_thread")] -pub async fn close_channel() -> Result<()> { - let peer_info = config::maker_peer_info(); - wallet::close_channel(peer_info.pubkey, false).await +pub fn close_channel() -> Result<()> { + runtime()?.block_on(async { + let peer_info = config::maker_peer_info(); + wallet::close_channel(peer_info.pubkey, false).await + }) } pub fn send_to_address(address: String, amount: u64) -> Result { @@ -301,42 +326,32 @@ pub fn send_to_address(address: String, amount: u64) -> Result { Ok(txid) } -#[tokio::main(flavor = "current_thread")] -pub async fn list_cfds() -> Result> { - let mut conn = db::acquire().await?; - cfd::load_cfds(&mut conn).await +pub fn list_cfds() -> Result> { + runtime()?.block_on(async { + let mut conn = db::acquire().await?; + cfd::load_cfds(&mut conn).await + }) } -#[tokio::main(flavor = "current_thread")] -pub async fn open_cfd(order: Order) -> Result<()> { - cfd::open(&order).await +pub fn open_cfd(order: Order) -> Result<()> { + runtime()?.block_on(async { cfd::open(&order).await }) } -#[tokio::main(flavor = "current_thread")] -pub async fn call_faucet(address: String) -> Result { +pub fn call_faucet(address: String) -> Result { anyhow::ensure!( !address.is_empty(), "Cannot call faucet because of empty address" ); - faucet::call_faucet(address).await + runtime()?.block_on(async { faucet::call_faucet(address).await }) } -#[tokio::main(flavor = "current_thread")] -pub async fn get_fee_recommendation() -> Result { - let fee_recommendation = wallet::get_fee_recommendation()?; - - Ok(fee_recommendation) +pub fn get_fee_recommendation() -> Result { + wallet::get_fee_recommendation() } /// Settles a CFD with the given taker and maker amounts in sats -#[tokio::main(flavor = "current_thread")] -pub async fn settle_cfd(cfd: Cfd, offer: Offer) -> Result<()> { - cfd::settle(&cfd, &offer).await -} - -#[tokio::main(flavor = "current_thread")] -pub async fn get_lightning_tx_history() -> Result> { - wallet::get_lightning_history().await +pub fn settle_cfd(cfd: Cfd, offer: Offer) -> Result<()> { + runtime()?.block_on(async { cfd::settle(&cfd, &offer).await }) } /// Initialise logging infrastructure for Rust @@ -350,19 +365,18 @@ pub fn get_seed_phrase() -> Vec { wallet::get_seed_phrase() } -#[tokio::main(flavor = "current_thread")] -pub async fn send_lightning_payment(invoice: String) -> Result<()> { +pub fn send_lightning_payment(invoice: String) -> Result<()> { anyhow::ensure!(!invoice.is_empty(), "Cannot pay empty invoice"); - wallet::send_lightning_payment(&invoice).await + runtime()?.block_on(async { wallet::send_lightning_payment(&invoice).await }) } -#[tokio::main(flavor = "current_thread")] -pub async fn create_lightning_invoice( +pub fn create_lightning_invoice( amount_sats: u64, expiry_secs: u32, description: String, ) -> Result { - wallet::create_invoice(amount_sats, expiry_secs, description).await + runtime()? + .block_on(async { wallet::create_invoice(amount_sats, expiry_secs, description).await }) } // Note, this implementation has to be on the api level as otherwise it wouldn't be generated diff --git a/rust/src/connection.rs b/rust/src/connection.rs index 38341d55..d523cf24 100644 --- a/rust/src/connection.rs +++ b/rust/src/connection.rs @@ -1,44 +1,36 @@ -use crate::config; +use crate::lightning::PeerInfo; use crate::lightning::PeerManager; use bdk::bitcoin::secp256k1::PublicKey; use std::sync::Arc; use std::time::Duration; -use tokio::task::JoinHandle; -pub fn spawn(peer_manager: Arc) -> JoinHandle<()> { - // keep connection with maker alive! - tokio::spawn(async move { - let peer_info = config::maker_peer_info(); - loop { - tracing::info!("Connecting to {peer_info}"); - match lightning_net_tokio::connect_outbound( - Arc::clone(&peer_manager), - peer_info.pubkey, - peer_info.peer_addr, - ) - .await - { - Some(connection_closed_future) => { - let mut connection_closed_future = Box::pin(connection_closed_future); - while !is_connected(&peer_manager, peer_info.pubkey) { - if futures::poll!(&mut connection_closed_future).is_ready() { - tracing::warn!("Peer disconnected before we finished the handshake! Retrying in 5 seconds."); - tokio::time::sleep(Duration::from_secs(5)).await; - return; - } - tokio::time::sleep(Duration::from_secs(5)).await; - } - tracing::info!("Successfully connected to {peer_info}"); - connection_closed_future.await; - tracing::warn!("Lost connection to maker, retrying immediately.") - } - None => { - tracing::warn!("Failed to connect to maker! Retrying in 5 seconds."); +pub async fn connect(peer_manager: Arc, peer_info: PeerInfo) { + tracing::info!("Connecting to {peer_info}"); + match lightning_net_tokio::connect_outbound( + Arc::clone(&peer_manager), + peer_info.pubkey, + peer_info.peer_addr, + ) + .await + { + Some(connection_closed_future) => { + let mut connection_closed_future = Box::pin(connection_closed_future); + while !is_connected(&peer_manager, peer_info.pubkey) { + if futures::poll!(&mut connection_closed_future).is_ready() { + tracing::warn!("Peer disconnected before we finished the handshake! Retrying in 5 seconds."); tokio::time::sleep(Duration::from_secs(5)).await; + return; } + tokio::time::sleep(Duration::from_secs(5)).await; } + tracing::info!("Successfully connected to {peer_info}"); + connection_closed_future.await; + tracing::warn!("Lost connection to maker, retrying immediately.") + } + None => { + tracing::warn!("Failed to connect to maker! Retrying."); } - }) + } } fn is_connected(peer_manager: &Arc, pubkey: PublicKey) -> bool { diff --git a/rust/src/lightning.rs b/rust/src/lightning.rs index f482ad75..cc15c487 100644 --- a/rust/src/lightning.rs +++ b/rust/src/lightning.rs @@ -115,7 +115,7 @@ pub struct LightningSystem { pub network: Network, } -#[derive(Serialize)] +#[derive(Serialize, Clone, Copy)] pub struct PeerInfo { pub pubkey: PublicKey, pub peer_addr: SocketAddr, @@ -545,7 +545,7 @@ fn default_user_config() -> UserConfig { } } -pub async fn run_ldk(system: &LightningSystem) -> Result { +pub fn run_ldk(system: &LightningSystem) -> Result { let ldk_data_dir = system.data_dir.to_string_lossy().to_string(); let runtime_handle = tokio::runtime::Handle::current(); @@ -630,7 +630,7 @@ pub async fn run_ldk_server( tracing::info!("Listening on {address}"); - let background_processor = run_ldk(system).await?; + let background_processor = run_ldk(system)?; Ok((tcp_handle, background_processor)) } diff --git a/rust/src/offer.rs b/rust/src/offer.rs index dfd3606f..ce8109b1 100644 --- a/rust/src/offer.rs +++ b/rust/src/offer.rs @@ -1,13 +1,10 @@ -use crate::api::Event; use crate::config::maker_endpoint; use anyhow::anyhow; use anyhow::bail; use anyhow::Result; -use flutter_rust_bridge::StreamSink; use reqwest::StatusCode; use serde::Deserialize; use serde::Serialize; -use tokio::task::JoinHandle; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Offer { @@ -16,16 +13,6 @@ pub struct Offer { pub index: f64, } -pub fn spawn(stream: StreamSink) -> JoinHandle<()> { - tokio::spawn(async move { - loop { - let offer = get_offer().await.ok(); - stream.add(Event::Offer(offer)); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - }) -} - pub async fn get_offer() -> Result { let client = reqwest::Client::builder() .timeout(crate::config::TCP_TIMEOUT) diff --git a/rust/src/wallet.rs b/rust/src/wallet.rs index 8cff8ba6..f4da06e7 100644 --- a/rust/src/wallet.rs +++ b/rust/src/wallet.rs @@ -238,8 +238,8 @@ impl Wallet { } /// Run the lightning node - pub async fn run_ldk(&self) -> Result { - lightning::run_ldk(&self.lightning).await + pub fn run_ldk(&self) -> Result { + lightning::run_ldk(&self.lightning) } /// Run the lightning node @@ -418,9 +418,9 @@ pub fn init_wallet(data_dir: &Path) -> Result<()> { Ok(()) } -pub async fn run_ldk() -> Result { +pub fn run_ldk() -> Result { let wallet = get_wallet(); - wallet.run_ldk().await + wallet.run_ldk() } pub async fn run_ldk_server(address: SocketAddr) -> Result<(JoinHandle<()>, BackgroundProcessor)> { @@ -485,8 +485,8 @@ pub fn get_seed_phrase() -> Vec { get_wallet().seed.get_seed_phrase() } -pub fn get_peer_manager() -> Result> { - Ok(get_wallet().lightning.peer_manager.clone()) +pub fn get_peer_manager() -> Arc { + get_wallet().lightning.peer_manager.clone() } pub async fn send_lightning_payment(invoice: &str) -> Result<()> {