Skip to content

Commit

Permalink
Merge pull request #4 from getAlby/sync-impl
Browse files Browse the repository at this point in the history
Refactor: make the app synchronous
  • Loading branch information
rdmitr authored Nov 16, 2024
2 parents 0073980 + 2b800f5 commit 2c39017
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 272 deletions.
42 changes: 30 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hub-recovery"
version = "0.1.0"
version = "0.2.0"
edition = "2021"

[dependencies]
Expand All @@ -9,6 +9,7 @@ anyhow = "1"
bip39 = "2.1.0"
bitcoin = "0.32.4"
clap = { version = "4.5", features = ["derive"] }
ctrlc = "3.4"
hex = { version = "0.4", features = ["serde"] }
hmac = "0.12"
ldk-node = { git = "https://github.com/getAlby/ldk-node" }
Expand All @@ -17,5 +18,4 @@ log4rs = { version = "1", default-features = false, features = ["file_appender"]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal"] }
url = "2"
105 changes: 105 additions & 0 deletions src/balance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::collections::HashSet;
use std::ops::Not;

use ldk_node::lightning::ln::ChannelId;
use ldk_node::{LightningBalance, Node, PendingSweepBalance};
use log::info;

fn get_ln_balance_channel_amount(balance: &LightningBalance) -> (ChannelId, u64) {
match balance {
LightningBalance::ClaimableOnChannelClose {
channel_id,
amount_satoshis,
..
} => (*channel_id, *amount_satoshis),
LightningBalance::ClaimableAwaitingConfirmations {
channel_id,
amount_satoshis,
..
} => (*channel_id, *amount_satoshis),
LightningBalance::ContentiousClaimable {
channel_id,
amount_satoshis,
..
} => (*channel_id, *amount_satoshis),
LightningBalance::MaybeTimeoutClaimableHTLC {
channel_id,
amount_satoshis,
..
} => (*channel_id, *amount_satoshis),
LightningBalance::MaybePreimageClaimableHTLC {
channel_id,
amount_satoshis,
..
} => (*channel_id, *amount_satoshis),
LightningBalance::CounterpartyRevokedOutputClaimable {
channel_id,
amount_satoshis,
..
} => (*channel_id, *amount_satoshis),
}
}

fn get_pending_sweep_balance_amount(amount: &PendingSweepBalance) -> u64 {
match amount {
PendingSweepBalance::PendingBroadcast {
amount_satoshis, ..
} => *amount_satoshis,
PendingSweepBalance::BroadcastAwaitingConfirmation {
amount_satoshis, ..
} => *amount_satoshis,
PendingSweepBalance::AwaitingThresholdConfirmations {
amount_satoshis, ..
} => *amount_satoshis,
}
}

pub fn check_and_print_balances(node: &Node) -> u64 {
let channels = node.list_channels();
let balances = node.list_balances();

let channel_ids = channels
.iter()
.map(|c| c.channel_id)
.collect::<HashSet<_>>();

let claimable = balances
.lightning_balances
.iter()
.filter_map(|b| {
let (channel_id, amount) = get_ln_balance_channel_amount(b);
channel_ids.contains(&channel_id).not().then(|| amount)
})
.reduce(|total, amount| total + amount)
.unwrap_or(0);

let pending_sweep = balances
.pending_balances_from_channel_closures
.iter()
.map(get_pending_sweep_balance_amount)
.reduce(|total, amount| total + amount)
.unwrap_or(0);

info!(
"balances: spendable: {}, reserved: {}, claimable: {}, pending sweep: {}",
balances.spendable_onchain_balance_sats,
balances.total_anchor_channels_reserve_sats,
claimable,
pending_sweep
);

println!("Balances (sats):");
println!(
" Spendable: {}; total: {}; reserved: {}",
balances.spendable_onchain_balance_sats,
balances.total_onchain_balance_sats - balances.total_anchor_channels_reserve_sats,
balances.total_anchor_channels_reserve_sats
);
println!(
" Pending from channel closures: {}",
claimable + pending_sweep
);
println!();

claimable + pending_sweep
}
85 changes: 54 additions & 31 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::collections::HashSet;
use std::io;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};

use anyhow::{anyhow, Context, Result};
use clap::Parser;
Expand All @@ -16,16 +19,12 @@ use log::{error, info, LevelFilter};
use log4rs::append::file::FileAppender;
use log4rs::config::{Appender, Config, Root};
use log4rs::encode::pattern::PatternEncoder;
use tokio::signal;
use tokio::task;
use url::Url;

mod node_tasks;
mod periodic_blocking_task;
mod balance;
mod scb;
mod state;

use periodic_blocking_task::StopHandle;
use scb::EncodedChannelMonitorBackup;
use state::{ChannelState, State};

Expand Down Expand Up @@ -168,7 +167,7 @@ fn get_scb_path<P: AsRef<Path>>(dir: P, arg: Option<&str>) -> PathBuf {
}
}

async fn run<P: AsRef<Path>>(args: &Args, dir: P) -> Result<()> {
fn run<P: AsRef<Path>>(args: &Args, dir: P) -> Result<()> {
let dir = dir.as_ref();
let mut state = State::try_load(dir.join(STATE_FILE))
.context("failed to load recovery state")?
Expand Down Expand Up @@ -248,7 +247,12 @@ async fn run<P: AsRef<Path>>(args: &Args, dir: P) -> Result<()> {
.ok_or(anyhow!("invalid LDK path"))?
.to_string(),
)
.set_esplora_server(args.esplora_server.to_string())
.set_esplora_server(
args.esplora_server
.to_string()
.trim_end_matches('/')
.to_string(),
)
.set_liquidity_source_lsps2(
SocketAddress::from_str("52.88.33.119:9735").unwrap(),
PublicKey::from_str(
Expand Down Expand Up @@ -336,33 +340,53 @@ async fn run<P: AsRef<Path>>(args: &Args, dir: P) -> Result<()> {
.save(dir.join(STATE_FILE))
.context("failed to save recovery state")?;

let stop = Arc::new(StopHandle::new());

let node_task = node_tasks::spawn_node_event_loop_task(node.clone(), stop.clone());
let balance_task = node_tasks::spawn_balance_task(node.clone(), stop.clone());
let sync_task = node_tasks::spawn_wallet_sync_task(node.clone(), stop.clone());

println!("Waiting for channel recovery to complete. This may take a while...");
println!("It is safe to interrupt this program by pressing Ctrl-C. You can resume it later to check recovery status.");
let (tx, rx) = mpsc::channel();
ctrlc::set_handler(move || tx.send(()).expect("Could not send signal on channel."))
.expect("Error setting Ctrl-C handler");

tokio::select! {
_ = signal::ctrl_c() => stop.stop(),
_ = stop.wait() => {}
}
let mut last_balance = Instant::now();
let mut last_sync = Instant::now();
loop {
if rx.try_recv().is_ok() {
println!("Stopping...");
break;
}

println!("Stopping...");
let now = Instant::now();

if now.duration_since(last_balance).as_secs() >= 3 {
if balance::check_and_print_balances(&node) == 0 {
info!("no more pending funds, stopping the node");
println!("Recovery completed successfully");
break;
}
last_balance = now;
}

if now.duration_since(last_sync).as_secs() >= 4 {
info!("syncing wallets");
node.sync_wallets().context("failed to sync wallets")?;
info!("wallets synced");
last_sync = now;
}

loop {
match node.next_event() {
Some(event) => {
info!("event: {:?}", event);
node.event_handled();
}
None => break,
}
}

thread::sleep(Duration::from_millis(100));
}

info!("stopping node");
info!("waiting for node task to finish");
node_task.wait().await.context("node task failed")?;
info!("waiting for balance task to finish");
balance_task.wait().await.context("balance task failed")?;
info!("waiting for sync task to finish");
sync_task.wait().await.context("sync task failed")?;
info!("stopping node");
task::spawn_blocking(move || node.stop().context("failed to stop LDK node"))
.await
.context("node stop task failed")??;
node.stop().context("failed to stop LDK node")?;
info!("done");

Ok(())
Expand Down Expand Up @@ -401,8 +425,7 @@ fn get_local_dir(use_cwd: bool) -> Result<PathBuf> {
}
}

#[tokio::main]
async fn main() {
fn main() {
let args = Args::parse();

setup_logging(args.verbosity).unwrap();
Expand All @@ -426,7 +449,7 @@ async fn main() {
}
}

if let Err(e) = run(&args, &local_dir).await {
if let Err(e) = run(&args, &local_dir) {
error!("recovery failed: {:?}", e);

eprintln!(
Expand Down
7 changes: 0 additions & 7 deletions src/node_tasks.rs

This file was deleted.

Loading

0 comments on commit 2c39017

Please sign in to comment.