From e9597d58ea8ed4518ef91ab189612707cd12aea2 Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Thu, 29 Jun 2023 17:22:42 +0200 Subject: [PATCH] wip: charge fee when opening a jit channel --- crates/ln-dlc-node/src/ln/event_handler.rs | 14 +- crates/ln-dlc-node/src/node/invoice.rs | 52 ++++++ crates/ln-dlc-node/src/node/mod.rs | 7 + crates/ln-dlc-node/src/tests/mod.rs | 1 + crates/tests-e2e/src/test_subscriber.rs | 6 + mobile/native/Cargo.toml | 4 +- mobile/native/src/api.rs | 2 + mobile/native/src/channel.rs | 149 ++++++++++++++++++ mobile/native/src/event/api.rs | 6 + mobile/native/src/event/mod.rs | 7 + mobile/native/src/lib.rs | 2 + .../native/src/ln_dlc/lightning_subscriber.rs | 35 ++++ mobile/native/src/ln_dlc/mod.rs | 37 ++++- 13 files changed, 315 insertions(+), 7 deletions(-) create mode 100644 mobile/native/src/channel.rs create mode 100644 mobile/native/src/ln_dlc/lightning_subscriber.rs diff --git a/crates/ln-dlc-node/src/ln/event_handler.rs b/crates/ln-dlc-node/src/ln/event_handler.rs index e14a66035..0b092864e 100644 --- a/crates/ln-dlc-node/src/ln/event_handler.rs +++ b/crates/ln-dlc-node/src/ln/event_handler.rs @@ -38,6 +38,7 @@ use std::sync::Arc; use std::sync::MutexGuard; use std::time::Duration; use time::OffsetDateTime; +use tokio::sync::watch; /// The speed at which we want a transaction to confirm used for feerate estimation. /// @@ -54,6 +55,7 @@ pub struct EventHandler { pending_intercepted_htlcs: PendingInterceptedHtlcs, peer_manager: Arc, fee_rate_estimator: Arc, + event_sender: Option>>, } #[allow(clippy::too_many_arguments)] @@ -71,6 +73,7 @@ where pending_intercepted_htlcs: PendingInterceptedHtlcs, peer_manager: Arc, fee_rate_estimator: Arc, + event_sender: Option>>, ) -> Self { Self { channel_manager, @@ -82,6 +85,7 @@ where pending_intercepted_htlcs, peer_manager, fee_rate_estimator, + event_sender, } } @@ -91,10 +95,17 @@ where let event_str = format!("{event:?}"); - match self.match_event(event).await { + match self.match_event(event.clone()).await { Ok(()) => tracing::debug!(event = ?event_str, "Successfully handled event"), Err(e) => tracing::error!("Failed to handle event. Error {e:#}"), } + + if let Some(event_sender) = &self.event_sender { + match event_sender.send(Some(event)) { + Ok(()) => tracing::trace!("Sent event to subscriber"), + Err(e) => tracing::error!("Failed to send event to subscriber: {e:#}"), + } + } } async fn match_event(&self, event: Event) -> Result<()> { @@ -678,7 +689,6 @@ where ); } }; - Ok(()) } } diff --git a/crates/ln-dlc-node/src/node/invoice.rs b/crates/ln-dlc-node/src/node/invoice.rs index 60d6efa8b..c6bf8df67 100644 --- a/crates/ln-dlc-node/src/node/invoice.rs +++ b/crates/ln-dlc-node/src/node/invoice.rs @@ -12,21 +12,28 @@ use bitcoin::hashes::Hash; use bitcoin::secp256k1::PublicKey; use bitcoin::secp256k1::Secp256k1; use bitcoin::Network; +use lightning::ln::channelmanager::PaymentId; use lightning::ln::channelmanager::Retry; use lightning::ln::channelmanager::MIN_CLTV_EXPIRY_DELTA; use lightning::ln::PaymentHash; use lightning::routing::gossip::RoutingFees; +use lightning::routing::router::PaymentParameters; use lightning::routing::router::RouteHint; use lightning::routing::router::RouteHintHop; +use lightning::routing::router::RouteParameters; use lightning_invoice::payment::pay_invoice; use lightning_invoice::payment::PaymentError; use lightning_invoice::Currency; use lightning_invoice::Invoice; use lightning_invoice::InvoiceBuilder; +use rand::Rng; use std::time::Duration; use std::time::SystemTime; use time::OffsetDateTime; +// The timeout after which we abandon retrying failed payments. +const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10); + impl

Node

where P: Storage, @@ -154,6 +161,51 @@ where } } + #[autometrics] + pub fn send_spontaneous_payment( + &self, + amount_msat: u64, + receiver: PublicKey, + ) -> Result { + tracing::info!("Sending spontaneous payment of {amount_msat} to {receiver}"); + let mut payment_id = [0u8; 32]; + rand::thread_rng().fill(&mut payment_id); + + let route_params = RouteParameters { + payment_params: PaymentParameters::for_keysend(receiver, MIN_CLTV_EXPIRY_DELTA.into()), + final_value_msat: amount_msat, + }; + + let payment_hash = self + .channel_manager + .send_spontaneous_payment_with_retry( + None, + PaymentId(payment_id), + route_params, + Retry::Timeout(LDK_PAYMENT_RETRY_TIMEOUT), + ) + .map_err(|e| { + anyhow!( + "Failed to send spontaneous payment to {}. Error: {e:?}", + receiver + ) + })?; + + self.storage.insert_payment( + payment_hash, + PaymentInfo { + preimage: None, + secret: None, + status: HTLCStatus::Pending, + amt_msat: MillisatAmount(Some(amount_msat)), + flow: PaymentFlow::Outbound, + timestamp: OffsetDateTime::now_utc(), + }, + )?; + + Ok(payment_hash) + } + #[autometrics] pub fn send_payment(&self, invoice: &Invoice) -> Result<()> { let status = match pay_invoice(invoice, Retry::Attempts(10), &self.channel_manager) { diff --git a/crates/ln-dlc-node/src/node/mod.rs b/crates/ln-dlc-node/src/node/mod.rs index aee35fcfa..497f124f9 100644 --- a/crates/ln-dlc-node/src/node/mod.rs +++ b/crates/ln-dlc-node/src/node/mod.rs @@ -52,6 +52,7 @@ use std::sync::Mutex; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; +use tokio::sync::watch; use tokio::sync::RwLock; mod channel_manager; @@ -73,6 +74,7 @@ pub use channel_manager::ChannelManager; pub use dlc_channel::dlc_message_name; pub use dlc_channel::sub_channel_message_name; pub use invoice::HTLCStatus; +use lightning::util::events::Event; pub use storage::InMemoryStore; pub use storage::Storage; pub use sub_channel_manager::SubChannelManager; @@ -175,6 +177,7 @@ where seed: Bip39Seed, ephemeral_randomness: [u8; 32], oracle: OracleInfo, + event_sender: watch::Sender>, ) -> Result { let user_config = app_config(); Node::new( @@ -194,6 +197,7 @@ where user_config, LnDlcNodeSettings::default(), oracle.into(), + Some(event_sender), ) } @@ -238,6 +242,7 @@ where user_config, settings, oracle.into(), + None, ) } @@ -261,6 +266,7 @@ where ldk_user_config: UserConfig, settings: LnDlcNodeSettings, oracle_client: P2PDOracleClient, + event_sender: Option>>, ) -> Result { let time_since_unix_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?; @@ -483,6 +489,7 @@ where Arc::new(Mutex::new(HashMap::new())), peer_manager.clone(), fee_rate_estimator.clone(), + event_sender, ); // Connection manager diff --git a/crates/ln-dlc-node/src/tests/mod.rs b/crates/ln-dlc-node/src/tests/mod.rs index 8d35080ba..3de4fd542 100644 --- a/crates/ln-dlc-node/src/tests/mod.rs +++ b/crates/ln-dlc-node/src/tests/mod.rs @@ -137,6 +137,7 @@ impl Node { user_config, LnDlcNodeSettings::default(), oracle.into(), + None, )?; tracing::debug!(%name, info = %node.info, "Node started"); diff --git a/crates/tests-e2e/src/test_subscriber.rs b/crates/tests-e2e/src/test_subscriber.rs index 773dbebdc..536f1fd8e 100644 --- a/crates/tests-e2e/src/test_subscriber.rs +++ b/crates/tests-e2e/src/test_subscriber.rs @@ -119,6 +119,12 @@ impl Subscriber for Senders { tracing::trace!(?prices, "Received price update event"); // TODO: Add prices from orderbook_commons } + native::event::EventInternal::ChannelReady(channel_id) => { + tracing::trace!(?channel_id, "Received channel ready event"); + } + native::event::EventInternal::PaymentClaimed(amount_msats) => { + tracing::trace!(amount_msats, "Received payment claimed event"); + } } } diff --git a/mobile/native/Cargo.toml b/mobile/native/Cargo.toml index 7ed2f4fde..2467e8716 100644 --- a/mobile/native/Cargo.toml +++ b/mobile/native/Cargo.toml @@ -10,6 +10,7 @@ crate-type = ["rlib", "cdylib", "staticlib"] anyhow = "1" base64 = "0.21.0" bdk = { version = "0.27.0", default-features = false, features = ["key-value-db", "use-esplora-blocking"] } +bitcoin = "0.29" coordinator-commons = { path = "../../crates/coordinator-commons" } diesel = { version = "2.0.0", features = ["sqlite", "r2d2", "extras"] } diesel_migrations = "2.0.0" @@ -39,6 +40,3 @@ tracing-subscriber = { version = "0.3", default-features = false, features = ["f trade = { path = "../../crates/trade" } url = "2.3.1" uuid = { version = "1.3.0", features = ["v4", "fast-rng", "macro-diagnostics"] } - -[dev-dependencies] -bitcoin = "0.29" diff --git a/mobile/native/src/api.rs b/mobile/native/src/api.rs index 795a832a1..df0a655c7 100644 --- a/mobile/native/src/api.rs +++ b/mobile/native/src/api.rs @@ -1,4 +1,5 @@ use crate::calculations; +use crate::channel::ChannelSubscriber; use crate::commons::api::Price; use crate::config; use crate::config::api::Config; @@ -196,6 +197,7 @@ pub fn run(config: Config, app_dir: String, seed_dir: String) -> Result<()> { db::init_db(&app_dir, get_network())?; let runtime = ln_dlc::get_or_create_tokio_runtime()?; ln_dlc::run(app_dir, seed_dir, runtime)?; + event::subscribe(ChannelSubscriber::new()); orderbook::subscribe(ln_dlc::get_node_key(), runtime) } diff --git a/mobile/native/src/channel.rs b/mobile/native/src/channel.rs new file mode 100644 index 000000000..32adefa86 --- /dev/null +++ b/mobile/native/src/channel.rs @@ -0,0 +1,149 @@ +use crate::config; +use crate::event::subscriber::Subscriber; +use crate::event::EventInternal; +use crate::event::EventType; +use crate::ln_dlc; +use ln_dlc_node::node::rust_dlc_manager::ChannelId; +use serde::Deserialize; +use serde::Serialize; +use std::sync::Arc; +use std::sync::Mutex; +use tokio::runtime::Handle; + +#[derive(Clone)] +pub struct ChannelSubscriber { + pub open_channel_tx: Arc>>, +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct EsploraTransaction { + pub txid: String, + pub fee: u32, +} + +impl Subscriber for ChannelSubscriber { + fn notify(&self, event: &EventInternal) { + match event { + EventInternal::ChannelReady(channel_id) => { + self.fetch_channel_opening_fee(channel_id); + } + EventInternal::PaymentClaimed(amount_msats) => { + self.pay_channel_fees(*amount_msats); + } + _ => {} + } + } + + fn events(&self) -> Vec { + vec![EventType::ChannelReady, EventType::PaymentClaimed] + } +} + +impl ChannelSubscriber { + pub fn new() -> Self { + Self { + open_channel_tx: Arc::new(Mutex::new(None)), + } + } + + // attempts to pay channel fee with received payment (amount in msats) + fn pay_channel_fees(&self, amount_msats: u64) { + tracing::debug!("Received payment of {amount_msats}."); + if let Some(transaction) = self.get_funding_transaction() { + tracing::debug!("Trying to pay channel opening fees of {}", transaction.fee); + let funding_tx_fees_msats = (transaction.fee * 1000) as u64; + + if funding_tx_fees_msats > amount_msats { + tracing::warn!("Trying to pay fees with an amount smaller than the fees!") + } + + let receiver = config::get_coordinator_info().pubkey; + match ln_dlc::send_spontaneous_payment(funding_tx_fees_msats, receiver) { + Ok(payment_hash) => { + let payment_hash_as_str = hex::encode(payment_hash.0); + // unset the funding transaction marking it as being paid. + self.unset_funding_transaction(); + tracing::info!("Successfully paid funding transaction fees of {funding_tx_fees_msats} to {receiver}. Payment hash: {payment_hash_as_str}"); + } + Err(e) => { + tracing::error!("Failed to pay funding transaction fees of {funding_tx_fees_msats} to {receiver}. Error: {e:#}"); + } + } + } else { + tracing::debug!("No pending funding transaction found!"); + } + } + + // Fetches the channel opening fee from esplora + fn fetch_channel_opening_fee(&self, channel_id: &ChannelId) { + let channel_id_as_str = hex::encode(channel_id); + tracing::debug!("Received new inbound channel with id {channel_id_as_str}"); + + match ln_dlc::get_funding_transaction(channel_id) { + Ok(txid) => { + let transaction: Option = tokio::task::block_in_place(|| { + Handle::current().block_on(async move { + tracing::debug!( + "-------------------> Querying {}", + format!("{}/tx/{txid}", config::get_esplora_endpoint()) + ); + + match reqwest::get(format!("{}tx/{txid}", config::get_esplora_endpoint())) + .await + { + Ok(response) => { + // tracing::debug!("---------------> Response: {}", + // response.text().await.unwrap()); + match response.json().await { + Ok(response) => Some(response), + Err(e) => { + tracing::error!( + "Failed to fetch transaction from esplora. Error: {e:#}" + ); + None + } + } + } + Err(e) => { + tracing::error!( + "Failed to fetch transaction from esplora. Error: {e:#}" + ); + None + } + } + }) + }); + + if let Some(transaction) = transaction { + tracing::debug!( + "Successfully fetched transaction fees of {} for new inbound channel with id {channel_id_as_str}", + transaction.fee + ); + self.set_funding_transaction(transaction); + } + } + Err(e) => tracing::error!("{e:#}"), + } + } + + fn set_funding_transaction(&self, transaction: EsploraTransaction) { + *self + .open_channel_tx + .lock() + .expect("Mutex to not be poisoned") = Some(transaction); + } + + fn unset_funding_transaction(&self) { + *self + .open_channel_tx + .lock() + .expect("Mutex to not be poisoned") = None; + } + + fn get_funding_transaction(&self) -> Option { + self.open_channel_tx + .lock() + .expect("Mutex to not be poisoned") + .clone() + } +} diff --git a/mobile/native/src/event/api.rs b/mobile/native/src/event/api.rs index 01d221fae..8ba736a77 100644 --- a/mobile/native/src/event/api.rs +++ b/mobile/native/src/event/api.rs @@ -50,6 +50,12 @@ impl From for Event { .into(); Event::PriceUpdateNotification(best_price) } + EventInternal::ChannelReady(_) => { + unreachable!("This internal event is not exposed to the UI") + } + EventInternal::PaymentClaimed(_) => { + unreachable!("This internal event is not exposed to the UI") + } } } } diff --git a/mobile/native/src/event/mod.rs b/mobile/native/src/event/mod.rs index 92d9b4572..90e987303 100644 --- a/mobile/native/src/event/mod.rs +++ b/mobile/native/src/event/mod.rs @@ -4,6 +4,7 @@ pub mod subscriber; use crate::api::WalletInfo; use coordinator_commons::TradeParams; +use ln_dlc_node::node::rust_dlc_manager::ChannelId; use orderbook_commons::Prices; use std::hash::Hash; use trade::ContractSymbol; @@ -31,6 +32,8 @@ pub enum EventInternal { PositionUpdateNotification(Position), PositionCloseNotification(ContractSymbol), PriceUpdateNotification(Prices), + ChannelReady(ChannelId), + PaymentClaimed(u64), } impl From for EventType { @@ -46,6 +49,8 @@ impl From for EventType { EventInternal::PositionUpdateNotification(_) => EventType::PositionUpdateNotification, EventInternal::PositionCloseNotification(_) => EventType::PositionClosedNotification, EventInternal::PriceUpdateNotification(_) => EventType::PriceUpdateNotification, + EventInternal::ChannelReady(_) => EventType::ChannelReady, + EventInternal::PaymentClaimed(_) => EventType::PaymentClaimed, } } } @@ -60,4 +65,6 @@ pub enum EventType { PositionUpdateNotification, PositionClosedNotification, PriceUpdateNotification, + ChannelReady, + PaymentClaimed, } diff --git a/mobile/native/src/lib.rs b/mobile/native/src/lib.rs index 1ec93076f..74c47e174 100644 --- a/mobile/native/src/lib.rs +++ b/mobile/native/src/lib.rs @@ -20,3 +20,5 @@ mod orderbook; unused_qualifications )] mod bridge_generated; + +mod channel; diff --git a/mobile/native/src/ln_dlc/lightning_subscriber.rs b/mobile/native/src/ln_dlc/lightning_subscriber.rs new file mode 100644 index 000000000..cdb9355e5 --- /dev/null +++ b/mobile/native/src/ln_dlc/lightning_subscriber.rs @@ -0,0 +1,35 @@ +use crate::event; +use crate::event::EventInternal; +use crate::ln_dlc::node::Node; +use lightning::util::events::Event; +use tokio::sync::watch::Receiver; + +impl Node { + pub async fn listen_for_lightning_events(&self, mut event_receiver: Receiver>) { + loop { + let event = match event_receiver.changed().await { + Ok(()) => { + if let Some(event) = event_receiver.borrow().clone() { + event + } else { + continue; + } + } + Err(e) => { + tracing::error!("Failed to receive event: {e:#}"); + continue; + } + }; + + match event { + Event::ChannelReady { channel_id, .. } => { + event::publish(&EventInternal::ChannelReady(channel_id)) + }, + Event::PaymentClaimed { amount_msat, .. } => { + event::publish(&EventInternal::PaymentClaimed(amount_msat)) + } + _ => tracing::debug!("Ignoring event on the mobile app"), + } + } + } +} diff --git a/mobile/native/src/ln_dlc/mod.rs b/mobile/native/src/ln_dlc/mod.rs index 80e4184f3..600113558 100644 --- a/mobile/native/src/ln_dlc/mod.rs +++ b/mobile/native/src/ln_dlc/mod.rs @@ -17,12 +17,17 @@ use anyhow::Result; use bdk::bitcoin::secp256k1::rand::thread_rng; use bdk::bitcoin::secp256k1::rand::RngCore; use bdk::bitcoin::secp256k1::SecretKey; +use bdk::bitcoin::Txid; use bdk::bitcoin::XOnlyPublicKey; use bdk::BlockTime; use coordinator_commons::TradeParams; use itertools::chain; use itertools::Itertools; +use lightning::ln::PaymentHash; +use lightning::util::events::Event; use lightning_invoice::Invoice; +use ln_dlc_node::node::rust_dlc_manager::subchannel::LNChannelManager; +use ln_dlc_node::node::rust_dlc_manager::ChannelId; use ln_dlc_node::node::LnDlcNodeSettings; use ln_dlc_node::node::NodeInfo; use ln_dlc_node::seed::Bip39Seed; @@ -37,10 +42,13 @@ use std::path::Path; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use bitcoin::secp256k1::PublicKey; use time::OffsetDateTime; use tokio::runtime::Runtime; +use tokio::sync::watch; use tokio::task::spawn_blocking; +mod lightning_subscriber; mod node; static NODE: Storage> = Storage::new(); @@ -75,12 +83,23 @@ pub async fn update_node_settings(settings: LnDlcNodeSettings) { node.inner.update_settings(settings).await; } -// TODO: should we also wrap the oracle as `NodeInfo`. It would fit the required attributes pubkey -// and address. pub fn get_oracle_pubkey() -> XOnlyPublicKey { NODE.get().inner.oracle_pk() } +pub fn get_funding_transaction(channel_id: &ChannelId) -> Result { + let node = NODE.get(); + let channel_details = node.inner.channel_manager.get_channel_details(channel_id); + + if let Some(channel_details) = channel_details { + if let Some(funding_txo) = channel_details.funding_txo { + return Ok(funding_txo.txid); + } + } + + bail!("Could not find funding transaction for {:?}", channel_id); +} + /// Lazily creates a multi threaded runtime with the the number of worker threads corresponding to /// the number of available cores. pub fn get_or_create_tokio_runtime() -> Result<&'static Runtime> { @@ -133,6 +152,8 @@ pub fn run(data_dir: String, seed_dir: String, runtime: &Runtime) -> Result<()> let seed_path = seed_dir.join("seed"); let seed = Bip39Seed::initialize(&seed_path)?; + let (event_sender, event_receiver) = watch::channel::>(None); + let node = Arc::new(ln_dlc_node::node::Node::new_app( "10101", network, @@ -144,9 +165,15 @@ pub fn run(data_dir: String, seed_dir: String, runtime: &Runtime) -> Result<()> seed, ephemeral_randomness, config::get_oracle_info(), + event_sender, )?); let node = Arc::new(Node { inner: node }); + runtime.spawn({ + let node = node.clone(); + async move { node.listen_for_lightning_events(event_receiver).await } + }); + runtime.spawn({ let node = node.clone(); async move { node.keep_connected(config::get_coordinator_info()).await } @@ -417,6 +444,12 @@ pub fn send_payment(invoice: &str) -> Result<()> { NODE.get().inner.send_payment(&invoice) } +pub fn send_spontaneous_payment(amount_msats: u64, receiver: PublicKey) -> Result { + NODE.get() + .inner + .send_spontaneous_payment(amount_msats, receiver) +} + pub async fn trade(trade_params: TradeParams) -> Result<(), (FailureReason, anyhow::Error)> { let client = reqwest_client(); let response = client