Skip to content
This repository has been archived by the owner on Aug 31, 2023. It is now read-only.

Commit

Permalink
chore: Introduce dedicated runtime
Browse files Browse the repository at this point in the history
Annotating the function with `#[tokio::main]` killed the runtime after the function completes. Thus we had to block the run function indefinitely for the long living threads to keep going.

This change introduces a runtime managed outside of the function scope and thus allows the `run` function to return bringing the following advantages.

 - We don't block a whole frb worker thread just to run the lightning node, sync tasks, background processor, etc.
 - We are using a multi threaded runtime instead of the current thread
   - allowing to actually join the background processor without blocking all other tasks.
   - making better use of multiple cpu cores.
 - We are not creating a new runtime on every async bridge call.
  • Loading branch information
holzeis committed Dec 12, 2022
1 parent 1a19388 commit 4e54c8d
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 169 deletions.
244 changes: 129 additions & 115 deletions rust/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::offer::Offer;
use crate::wallet;
use crate::wallet::Balance;
use crate::wallet::LightningTransaction;
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Result;
use flutter_rust_bridge::StreamSink;
Expand All @@ -22,13 +21,14 @@ use lightning_invoice::Invoice;
use lightning_invoice::InvoiceDescription;
use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
use state::Storage;
use std::ops::Add;
use std::path::Path;
use std::str::FromStr;
use std::time::SystemTime;
use time::Duration;
pub use time::OffsetDateTime;
use tokio::try_join;
use tokio::runtime::Runtime;

pub struct Address {
pub address: String,
Expand Down Expand Up @@ -168,91 +168,114 @@ impl WalletInfo {
Ok(tx_history)
}
}
/// Lazily creates a multi threaded runtime with the the number of worker threads corresponding to
/// the number of available cores.
fn runtime() -> Result<&'static Runtime> {
static RUNTIME: Storage<Runtime> = Storage::new();

#[tokio::main(flavor = "current_thread")]
pub async fn refresh_wallet_info() -> Result<WalletInfo> {
wallet::sync()?;
WalletInfo::build_wallet_info().await
if RUNTIME.try_get().is_none() {
let runtime = Runtime::new()?;
RUNTIME.set(runtime);
}

Ok(RUNTIME.get())
}

pub fn refresh_wallet_info() -> Result<WalletInfo> {
runtime()?.block_on(async {
wallet::sync()?;
WalletInfo::build_wallet_info().await
})
}

#[tokio::main(flavor = "current_thread")]
pub async fn run(stream: StreamSink<Event>, app_dir: String) -> Result<()> {
pub fn run(stream: StreamSink<Event>, app_dir: String) -> Result<()> {
let network = config::network();
anyhow::ensure!(!app_dir.is_empty(), "app_dir must not be empty");
stream.add(Event::Init(format!("Initialising {network} wallet")));
wallet::init_wallet(Path::new(app_dir.as_str()))?;

stream.add(Event::Init("Initialising database".to_string()));
db::init_db(
&Path::new(app_dir.as_str())
.join(network.to_string())
.join("taker.sqlite"),
)
.await?;

stream.add(Event::Init("Starting full ldk node".to_string()));
let background_processor = wallet::run_ldk().await?;

stream.add(Event::Init("Fetching an offer".to_string()));
stream.add(Event::Offer(offer::get_offer().await.ok()));

stream.add(Event::Init("Fetching your balance".to_string()));
stream.add(Event::WalletInfo(
WalletInfo::build_wallet_info().await.ok(),
));
stream.add(Event::Init("Checking channel state".to_string()));
stream.add(Event::ChannelState(get_channel_state()));

stream.add(Event::Init("Ready".to_string()));
stream.add(Event::Ready);

// spawn a connection task keeping the connection with the maker alive.
let peer_manager = wallet::get_peer_manager()?;
let connection_handle = connection::spawn(peer_manager);

// sync offers every 5 seconds
let offer_handle = offer::spawn(stream.clone());

// sync wallet every 60 seconds
let wallet_sync_handle = tokio::spawn(async {
loop {
wallet::sync().unwrap_or_else(|e| tracing::error!(?e, "Failed to sync wallet"));
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
});

// sync wallet info every 10 seconds
let wallet_info_stream = stream.clone();
let wallet_info_sync_handle = tokio::spawn(async move {
loop {
match WalletInfo::build_wallet_info().await {
Ok(wallet_info) => {
let _ = wallet_info_stream.add(Event::WalletInfo(Some(wallet_info)));
let runtime = runtime()?;
runtime.block_on(async move {
stream.add(Event::Init(format!("Initialising {network} wallet")));
wallet::init_wallet(Path::new(app_dir.as_str()))?;

stream.add(Event::Init("Initialising database".to_string()));
db::init_db(
&Path::new(app_dir.as_str())
.join(network.to_string())
.join("taker.sqlite"),
)
.await?;

stream.add(Event::Init("Starting full ldk node".to_string()));
let background_processor = wallet::run_ldk()?;

stream.add(Event::Init("Fetching an offer".to_string()));
stream.add(Event::Offer(offer::get_offer().await.ok()));

stream.add(Event::Init("Fetching your balance".to_string()));
stream.add(Event::WalletInfo(
WalletInfo::build_wallet_info().await.ok(),
));
stream.add(Event::Init("Checking channel state".to_string()));
stream.add(Event::ChannelState(get_channel_state()));

stream.add(Event::Init("Ready".to_string()));
stream.add(Event::Ready);

// spawn a connection task keeping the connection with the maker alive.
runtime.spawn(async move {
let peer_info = config::maker_peer_info();
loop {
let peer_manager = wallet::get_peer_manager();
connection::connect(peer_manager, peer_info).await;
// add a delay before retrying to connect
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});

let offer_stream = stream.clone();
runtime.spawn(async move {
loop {
offer_stream.add(Event::Offer(offer::get_offer().await.ok()));
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});

runtime.spawn(async {
loop {
wallet::sync().unwrap_or_else(|e| tracing::error!(?e, "Failed to sync wallet"));
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
});

let wallet_info_stream = stream.clone();
runtime.spawn(async move {
loop {
match WalletInfo::build_wallet_info().await {
Ok(wallet_info) => {
let _ = wallet_info_stream.add(Event::WalletInfo(Some(wallet_info)));
}
Err(e) => tracing::error!(?e, "Failed to build wallet info"),
}
Err(e) => tracing::error!(?e, "Failed to build wallet info"),
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
});

// sync channel state every 5 seconds
let channel_state_stream = stream.clone();
let channel_state_handle = tokio::spawn(async move {
loop {
channel_state_stream.add(Event::ChannelState(get_channel_state()));
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});
});

try_join!(
connection_handle,
offer_handle,
wallet_sync_handle,
wallet_info_sync_handle,
channel_state_handle,
)?;
let channel_state_stream = stream.clone();
runtime.spawn(async move {
loop {
channel_state_stream.add(Event::ChannelState(get_channel_state()));
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});

background_processor.join().map_err(|e| anyhow!(e))
runtime.spawn_blocking(move || {
// background processor joins on a sync thread, meaning that join here will block a
// full thread, which is dis-encouraged to do in async code.
if let Err(err) = background_processor.join() {
tracing::error!(?err, "Background processor stopped unexpected");
}
});
Ok(())
})
}

pub fn get_balance() -> Result<Balance> {
Expand All @@ -279,16 +302,18 @@ pub fn network() -> SyncReturn<String> {
SyncReturn(config::network().to_string())
}

#[tokio::main(flavor = "current_thread")]
pub async fn open_channel(taker_amount: u64) -> Result<()> {
let peer_info = config::maker_peer_info();
wallet::open_channel(peer_info, taker_amount).await
pub fn open_channel(taker_amount: u64) -> Result<()> {
runtime()?.block_on(async {
let peer_info = config::maker_peer_info();
wallet::open_channel(peer_info, taker_amount).await
})
}

#[tokio::main(flavor = "current_thread")]
pub async fn close_channel() -> Result<()> {
let peer_info = config::maker_peer_info();
wallet::close_channel(peer_info.pubkey, false).await
pub fn close_channel() -> Result<()> {
runtime()?.block_on(async {
let peer_info = config::maker_peer_info();
wallet::close_channel(peer_info.pubkey, false).await
})
}

pub fn send_to_address(address: String, amount: u64) -> Result<String> {
Expand All @@ -301,42 +326,32 @@ pub fn send_to_address(address: String, amount: u64) -> Result<String> {
Ok(txid)
}

#[tokio::main(flavor = "current_thread")]
pub async fn list_cfds() -> Result<Vec<Cfd>> {
let mut conn = db::acquire().await?;
cfd::load_cfds(&mut conn).await
pub fn list_cfds() -> Result<Vec<Cfd>> {
runtime()?.block_on(async {
let mut conn = db::acquire().await?;
cfd::load_cfds(&mut conn).await
})
}

#[tokio::main(flavor = "current_thread")]
pub async fn open_cfd(order: Order) -> Result<()> {
cfd::open(&order).await
pub fn open_cfd(order: Order) -> Result<()> {
runtime()?.block_on(async { cfd::open(&order).await })
}

#[tokio::main(flavor = "current_thread")]
pub async fn call_faucet(address: String) -> Result<String> {
pub fn call_faucet(address: String) -> Result<String> {
anyhow::ensure!(
!address.is_empty(),
"Cannot call faucet because of empty address"
);
faucet::call_faucet(address).await
runtime()?.block_on(async { faucet::call_faucet(address).await })
}

#[tokio::main(flavor = "current_thread")]
pub async fn get_fee_recommendation() -> Result<u32> {
let fee_recommendation = wallet::get_fee_recommendation()?;

Ok(fee_recommendation)
pub fn get_fee_recommendation() -> Result<u32> {
wallet::get_fee_recommendation()
}

/// Settles a CFD with the given taker and maker amounts in sats
#[tokio::main(flavor = "current_thread")]
pub async fn settle_cfd(cfd: Cfd, offer: Offer) -> Result<()> {
cfd::settle(&cfd, &offer).await
}

#[tokio::main(flavor = "current_thread")]
pub async fn get_lightning_tx_history() -> Result<Vec<LightningTransaction>> {
wallet::get_lightning_history().await
pub fn settle_cfd(cfd: Cfd, offer: Offer) -> Result<()> {
runtime()?.block_on(async { cfd::settle(&cfd, &offer).await })
}

/// Initialise logging infrastructure for Rust
Expand All @@ -350,19 +365,18 @@ pub fn get_seed_phrase() -> Vec<String> {
wallet::get_seed_phrase()
}

#[tokio::main(flavor = "current_thread")]
pub async fn send_lightning_payment(invoice: String) -> Result<()> {
pub fn send_lightning_payment(invoice: String) -> Result<()> {
anyhow::ensure!(!invoice.is_empty(), "Cannot pay empty invoice");
wallet::send_lightning_payment(&invoice).await
runtime()?.block_on(async { wallet::send_lightning_payment(&invoice).await })
}

#[tokio::main(flavor = "current_thread")]
pub async fn create_lightning_invoice(
pub fn create_lightning_invoice(
amount_sats: u64,
expiry_secs: u32,
description: String,
) -> Result<String> {
wallet::create_invoice(amount_sats, expiry_secs, description).await
runtime()?
.block_on(async { wallet::create_invoice(amount_sats, expiry_secs, description).await })
}

// Note, this implementation has to be on the api level as otherwise it wouldn't be generated
Expand Down
56 changes: 24 additions & 32 deletions rust/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,36 @@
use crate::config;
use crate::lightning::PeerInfo;
use crate::lightning::PeerManager;
use bdk::bitcoin::secp256k1::PublicKey;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;

pub fn spawn(peer_manager: Arc<PeerManager>) -> JoinHandle<()> {
// keep connection with maker alive!
tokio::spawn(async move {
let peer_info = config::maker_peer_info();
loop {
tracing::info!("Connecting to {peer_info}");
match lightning_net_tokio::connect_outbound(
Arc::clone(&peer_manager),
peer_info.pubkey,
peer_info.peer_addr,
)
.await
{
Some(connection_closed_future) => {
let mut connection_closed_future = Box::pin(connection_closed_future);
while !is_connected(&peer_manager, peer_info.pubkey) {
if futures::poll!(&mut connection_closed_future).is_ready() {
tracing::warn!("Peer disconnected before we finished the handshake! Retrying in 5 seconds.");
tokio::time::sleep(Duration::from_secs(5)).await;
return;
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
tracing::info!("Successfully connected to {peer_info}");
connection_closed_future.await;
tracing::warn!("Lost connection to maker, retrying immediately.")
}
None => {
tracing::warn!("Failed to connect to maker! Retrying in 5 seconds.");
pub async fn connect(peer_manager: Arc<PeerManager>, peer_info: PeerInfo) {
tracing::info!("Connecting to {peer_info}");
match lightning_net_tokio::connect_outbound(
Arc::clone(&peer_manager),
peer_info.pubkey,
peer_info.peer_addr,
)
.await
{
Some(connection_closed_future) => {
let mut connection_closed_future = Box::pin(connection_closed_future);
while !is_connected(&peer_manager, peer_info.pubkey) {
if futures::poll!(&mut connection_closed_future).is_ready() {
tracing::warn!("Peer disconnected before we finished the handshake! Retrying in 5 seconds.");
tokio::time::sleep(Duration::from_secs(5)).await;
return;
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
tracing::info!("Successfully connected to {peer_info}");
connection_closed_future.await;
tracing::warn!("Lost connection to maker, retrying immediately.")
}
None => {
tracing::warn!("Failed to connect to maker! Retrying.");
}
})
}
}

fn is_connected(peer_manager: &Arc<PeerManager>, pubkey: PublicKey) -> bool {
Expand Down
Loading

0 comments on commit 4e54c8d

Please sign in to comment.