diff --git a/coordinator/migrations/2023-07-13-010300_trade_fees/down.sql b/coordinator/migrations/2023-07-13-010300_trade_fees/down.sql new file mode 100644 index 000000000..082b5b1dc --- /dev/null +++ b/coordinator/migrations/2023-07-13-010300_trade_fees/down.sql @@ -0,0 +1,7 @@ +-- This file should undo anything in `up.sql` +DELETE FROM + payments +WHERE + payment_hash = '6f9b8c95c2ba7b1857b19f975372308161fedf50feb78a252200135a41875210'; +ALTER TABLE + trades DROP COLUMN IF EXISTS "fee_payment_hash"; diff --git a/coordinator/migrations/2023-07-13-010300_trade_fees/up.sql b/coordinator/migrations/2023-07-13-010300_trade_fees/up.sql new file mode 100644 index 000000000..3d8c9e2ae --- /dev/null +++ b/coordinator/migrations/2023-07-13-010300_trade_fees/up.sql @@ -0,0 +1,26 @@ +-- Your SQL goes here +-- insert a payment with amount zero that is a dummy default payment that corresponds to this actual zero-amount invoice: +-- lnbcrt0m1pj26wczdq50fjhymeqd9h8vmmfvdjsnp4qtwk40kf07d8fzlhdt2s9vqyeczarvk37safua4a0kz7wellkq3vjpp5d7dce9wzhfa3s4a3n7t4xu3ss9slah6sl6mc5ffzqqf45sv82ggqsp54cmyvxklz48ap60guqsln4wuh2ranlap3grg2djt5ykz7005fc6s9qyysgqcqpcxq8z7ps9nqepcz2l2s4qrp6qeks68qry55ylcz542c8a6m6fffmhlmhtn9nnf458ngv83pk0473xfecdk7m7maumqk4jvaymdg7fpgn9tujcehxpqqte97xd +-- We insert this dummy payment so we are able to sum up fees on already existing trades and make the fee payment hash a mandatory field. +INSERT INTO + payments ( + payment_hash, + htlc_status, + amount_msat, + flow, + payment_timestamp, + description + ) +VALUES + ( + '6f9b8c95c2ba7b1857b19f975372308161fedf50feb78a252200135a41875210', + 'Succeeded', + 0, + 'Inbound', + CURRENT_TIMESTAMP, + 'zero amount payment dummy default value' + ); +ALTER TABLE + trades +ADD + COLUMN "fee_payment_hash" TEXT NOT NULL DEFAULT '6f9b8c95c2ba7b1857b19f975372308161fedf50feb78a252200135a41875210' REFERENCES payments(payment_hash); diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index abcd2c64c..f79dc866e 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -3,7 +3,7 @@ use anyhow::Result; use coordinator::cli::Opts; use coordinator::db; use coordinator::logger; -use coordinator::metrics::collect_metrics; +use coordinator::metrics; use coordinator::metrics::init_meter; use coordinator::node::connection; use coordinator::node::storage::NodeStorage; @@ -17,6 +17,9 @@ use coordinator::settings::Settings; use diesel::r2d2; use diesel::r2d2::ConnectionManager; use diesel::PgConnection; +use hex::FromHex; +use lightning::ln::PaymentHash; +use lightning::util::events::Event; use ln_dlc_node::seed::Bip39Seed; use rand::thread_rng; use rand::RngCore; @@ -27,6 +30,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use time::OffsetDateTime; +use tokio::sync::watch; use tokio::task::spawn_blocking; use tracing::metadata::LevelFilter; use trade::bitmex_client::BitmexClient; @@ -86,6 +90,8 @@ async fn main() -> Result<()> { let mut conn = pool.get()?; run_migration(&mut conn); + let (node_event_sender, node_event_receiver) = watch::channel::>(None); + let node = Arc::new(ln_dlc_node::node::Node::new_coordinator( "10101.finance", network, @@ -99,6 +105,7 @@ async fn main() -> Result<()> { ephemeral_randomness, settings.ln_dlc.clone(), opts.get_oracle_info(), + node_event_sender, )?); let node = Node::new(node, pool.clone()); @@ -130,12 +137,14 @@ async fn main() -> Result<()> { } }); + metrics::collect_metrics_based_on_node_events(node.clone(), node_event_receiver); + tokio::spawn({ let node = node.clone(); async move { loop { let node = node.clone(); - spawn_blocking(move || collect_metrics(node)) + spawn_blocking(move || metrics::collect(node)) .await .expect("To spawn blocking thread"); tokio::time::sleep(PROCESS_PROMETHEUS_METRICS).await; @@ -218,8 +227,23 @@ async fn main() -> Result<()> { } }; + // Upon collab closing an expired position we cannot charge a fee using an + // invoice. This dummy hash exists in the database to + // represent zero-amount invoices. + let zero_amount_payment_hash_dummy = PaymentHash( + <[u8; 32]>::from_hex( + "6f9b8c95c2ba7b1857b19f975372308161fedf50feb78a252200135a41875210", + ) + .expect("static payment hash to decode"), + ); + match node - .close_position(position, closing_price, channel_id) + .close_position( + position, + closing_price, + channel_id, + zero_amount_payment_hash_dummy, + ) .await { Ok(_) => tracing::info!( diff --git a/coordinator/src/db/trades.rs b/coordinator/src/db/trades.rs index a7a93a0dd..ecfd2df67 100644 --- a/coordinator/src/db/trades.rs +++ b/coordinator/src/db/trades.rs @@ -3,8 +3,11 @@ use crate::orderbook::db::custom_types::Direction; use crate::schema::trades; use anyhow::Result; use autometrics::autometrics; +use bitcoin::hashes::hex::ToHex; use bitcoin::secp256k1::PublicKey; use diesel::prelude::*; +use hex::FromHex; +use lightning::ln::PaymentHash; use std::str::FromStr; use time::OffsetDateTime; @@ -21,6 +24,7 @@ struct Trade { direction: Direction, average_price: f32, timestamp: OffsetDateTime, + fee_payment_hash: String, } #[derive(Insertable, Debug, Clone)] @@ -34,6 +38,7 @@ struct NewTrade { collateral: i64, direction: Direction, average_price: f32, + pub fee_payment_hash: String, } #[autometrics] @@ -48,6 +53,22 @@ pub fn insert( Ok(trade.into()) } +/// Returns the position by trader pub key +#[autometrics] +pub fn is_payment_hash_registered_as_trade_fee( + conn: &mut PgConnection, + payment_hash: PaymentHash, +) -> QueryResult { + let payment_hash = payment_hash.0.to_hex(); + + let trade = trades::table + .filter(trades::fee_payment_hash.eq(payment_hash)) + .first::(conn) + .optional()?; + + Ok(trade.is_some()) +} + impl From for NewTrade { fn from(value: crate::trade::models::NewTrade) -> Self { NewTrade { @@ -59,6 +80,7 @@ impl From for NewTrade { collateral: value.collateral, direction: value.direction.into(), average_price: value.average_price, + fee_payment_hash: value.fee_payment_hash.0.to_hex(), } } } @@ -77,6 +99,9 @@ impl From for crate::trade::models::Trade { direction: value.direction.into(), average_price: value.average_price, timestamp: value.timestamp, + fee_payment_hash: PaymentHash( + <[u8; 32]>::from_hex(value.fee_payment_hash).expect("payment hash to decode"), + ), } } } diff --git a/coordinator/src/metrics.rs b/coordinator/src/metrics.rs index db010abbc..a9cb82920 100644 --- a/coordinator/src/metrics.rs +++ b/coordinator/src/metrics.rs @@ -3,7 +3,9 @@ use crate::node::storage::NodeStorage; use crate::node::Node; use lazy_static::lazy_static; use lightning::ln::channelmanager::ChannelDetails; +use lightning::util::events::Event; use opentelemetry::global; +use opentelemetry::metrics::Counter; use opentelemetry::metrics::Meter; use opentelemetry::metrics::ObservableGauge; use opentelemetry::sdk::export::metrics::aggregation; @@ -15,6 +17,7 @@ use opentelemetry::KeyValue; use opentelemetry_prometheus::PrometheusExporter; use std::sync::Arc; use std::time::Duration; +use tokio::sync::watch; use trade::ContractSymbol; use trade::Direction; @@ -58,6 +61,12 @@ lazy_static! { .i64_observable_gauge("position_margin_sats") .with_description("Current open position margin in sats") .init(); + + // trade fee metrics + pub static ref TRADE_FEES: Counter = METER + .u64_counter("trade_fees_sats") + .with_description("Accumulated trading fees collected") + .init(); } pub fn init_meter() -> PrometheusExporter { @@ -71,7 +80,12 @@ pub fn init_meter() -> PrometheusExporter { opentelemetry_prometheus::exporter(controller).init() } -pub fn collect_metrics(node: Node) { +pub fn increment_fee_counter(increment: u64) { + let cx = opentelemetry::Context::current(); + TRADE_FEES.add(&cx, increment, &[]); +} + +pub fn collect(node: Node) { let cx = opentelemetry::Context::current(); position_metrics(&cx, &node); @@ -243,3 +257,55 @@ fn node_metrics(cx: &Context, inner_node: Arc>, +) { + tokio::spawn(async move { + loop { + match node_event_receiver.changed().await { + Ok(()) => { + let event = node_event_receiver.borrow().clone(); + + if let Some(Event::PaymentClaimed { + amount_msat, + payment_hash, + .. + }) = event + { + tokio::task::spawn_blocking({ + let node = node.clone(); + move || { + let mut conn = match node.pool.get() { + Ok(conn) => conn, + Err(e) => { + tracing::error!("Failed to connect to database during node event post processing event: {e:#}"); + return; + } + }; + + match db::trades::is_payment_hash_registered_as_trade_fee( + &mut conn, + payment_hash, + ) { + Ok(true) => { + increment_fee_counter(amount_msat / 1000); + } + // irrelevant, not a fee + Ok(false) => (), + Err(e) => { + tracing::error!("Failed to load trades from database in node event post processing: {e:#}"); + } + } + } + }).await.expect("To spawn blocking thread"); + } + } + Err(e) => { + tracing::error!("Failed to receive event: {e:#}"); + } + } + } + }); +} diff --git a/coordinator/src/node.rs b/coordinator/src/node.rs index 3f20b8c68..07d089ef9 100644 --- a/coordinator/src/node.rs +++ b/coordinator/src/node.rs @@ -27,6 +27,7 @@ use dlc_manager::payout_curve::RoundingIntervals; use dlc_manager::ChannelId; use dlc_messages::Message; use lightning::ln::channelmanager::ChannelDetails; +use lightning::ln::PaymentHash; use lightning_invoice::Invoice; use ln_dlc_node::node; use ln_dlc_node::node::dlc_message_name; @@ -124,12 +125,7 @@ impl Node { #[autometrics] pub async fn trade(&self, trade_params: &TradeParams) -> Result { - let invoice = self.fee_invoice_taker(trade_params).await?; - let _fee_payment_hash = invoice.payment_hash(); - - // TODO: Save this invoice in the coordinator database - // The identifier is the payment hash - // Potentially safe the order id as meta data + let (fee_payment_hash, invoice) = self.fee_invoice_taker(trade_params).await?; match self.decide_trade_action(&trade_params.pubkey)? { TradeAction::Open => { @@ -137,7 +133,7 @@ impl Node { self.settings.read().await.allow_opening_positions, "Opening positions is disabled" ); - self.open_position(trade_params).await? + self.open_position(trade_params, fee_payment_hash).await? } TradeAction::Close(channel_id) => { let peer_id = trade_params.pubkey; @@ -154,7 +150,7 @@ impl Node { None => bail!("Failed to find open position : {}", trade_params.pubkey), }; - self.close_position(&position, closing_price, channel_id) + self.close_position(&position, closing_price, channel_id, fee_payment_hash) .await? } }; @@ -163,7 +159,11 @@ impl Node { } #[autometrics] - async fn open_position(&self, trade_params: &TradeParams) -> Result<()> { + async fn open_position( + &self, + trade_params: &TradeParams, + fee_payment_hash: PaymentHash, + ) -> Result<()> { let peer_id = trade_params.pubkey; tracing::info!(%peer_id, ?trade_params, "Opening position"); @@ -218,11 +218,15 @@ impl Node { // into the database doesn't, it is more likely to succeed in the new order. // FIXME: Note, we should not create a shadow representation (position) of the DLC struct, // but rather imply the state from the DLC. - self.persist_position_and_trade(trade_params) + self.persist_position_and_trade(trade_params, fee_payment_hash) } // Creates a position and a trade from the trade params - fn persist_position_and_trade(&self, trade_params: &TradeParams) -> Result<()> { + fn persist_position_and_trade( + &self, + trade_params: &TradeParams, + fee_payment_hash: PaymentHash, + ) -> Result<()> { let liquidation_price = liquidation_price(trade_params); let margin_coordinator = margin_coordinator(trade_params); @@ -258,6 +262,7 @@ impl Node { collateral: new_position.collateral, direction: new_position.direction, average_price: average_entry_price, + fee_payment_hash, }, )?; @@ -270,6 +275,7 @@ impl Node { position: &Position, closing_price: Decimal, channel_id: ChannelId, + fee_payment_hash: PaymentHash, ) -> Result<()> { let opening_price = Decimal::try_from(position.average_entry_price)?; @@ -309,6 +315,7 @@ impl Node { collateral: position.collateral, direction: position.direction.opposite(), average_price: closing_price.to_f32().expect("To fit into f32"), + fee_payment_hash, }, )?; diff --git a/coordinator/src/node/order_matching_fee.rs b/coordinator/src/node/order_matching_fee.rs index 3b6e612c3..89da27513 100644 --- a/coordinator/src/node/order_matching_fee.rs +++ b/coordinator/src/node/order_matching_fee.rs @@ -1,7 +1,12 @@ +use crate::db; use crate::node::Node; +use anyhow::Context; use anyhow::Result; +use bitcoin::secp256k1::ThirtyTwoByteHash; use coordinator_commons::TradeParams; +use lightning::ln::PaymentHash; use lightning_invoice::Invoice; +use ln_dlc_node::PaymentInfo; use orderbook_commons::order_matching_fee_taker; use orderbook_commons::FEE_INVOICE_DESCRIPTION_PREFIX_TAKER; @@ -9,7 +14,10 @@ use orderbook_commons::FEE_INVOICE_DESCRIPTION_PREFIX_TAKER; const INVOICE_EXPIRY: u32 = 3600; impl Node { - pub async fn fee_invoice_taker(&self, trade_params: &TradeParams) -> Result { + pub async fn fee_invoice_taker( + &self, + trade_params: &TradeParams, + ) -> Result<(PaymentHash, Invoice)> { let order_id = trade_params.filled_with.order_id; let description = format!("{FEE_INVOICE_DESCRIPTION_PREFIX_TAKER}{order_id}"); @@ -19,6 +27,17 @@ impl Node { ) .to_sat(); - self.inner.create_invoice(fee, description, INVOICE_EXPIRY) + let invoice = self + .inner + .create_invoice(fee, description, INVOICE_EXPIRY)?; + + let fee_payment_hash = PaymentHash((*invoice.payment_hash()).into_32()); + let fee_payment_info = PaymentInfo::from(invoice.clone()); + let mut conn = self.pool.get()?; + + db::payments::insert((fee_payment_hash, fee_payment_info), &mut conn) + .context("Failed to insert payment into database")?; + + Ok((fee_payment_hash, invoice)) } } diff --git a/coordinator/src/schema.rs b/coordinator/src/schema.rs index 8eeb876df..41d89de33 100644 --- a/coordinator/src/schema.rs +++ b/coordinator/src/schema.rs @@ -113,6 +113,7 @@ diesel::table! { direction -> DirectionType, average_price -> Float4, timestamp -> Timestamptz, + fee_payment_hash -> Text, } } diff --git a/coordinator/src/trade/models.rs b/coordinator/src/trade/models.rs index 5ba97da35..20f54f6d6 100644 --- a/coordinator/src/trade/models.rs +++ b/coordinator/src/trade/models.rs @@ -1,4 +1,5 @@ use bitcoin::secp256k1::PublicKey; +use lightning::ln::PaymentHash; use time::OffsetDateTime; use trade::ContractSymbol; use trade::Direction; @@ -13,6 +14,7 @@ pub struct NewTrade { pub collateral: i64, pub direction: Direction, pub average_price: f32, + pub fee_payment_hash: PaymentHash, } #[derive(Debug)] @@ -27,4 +29,5 @@ pub struct Trade { pub direction: Direction, pub average_price: f32, pub timestamp: OffsetDateTime, + pub fee_payment_hash: PaymentHash, } diff --git a/crates/ln-dlc-node/src/lib.rs b/crates/ln-dlc-node/src/lib.rs index d92d64ce7..a7ba87078 100644 --- a/crates/ln-dlc-node/src/lib.rs +++ b/crates/ln-dlc-node/src/lib.rs @@ -1,5 +1,6 @@ use crate::ln::TracingLogger; use crate::node::SubChannelManager; +use bitcoin::hashes::hex::ToHex; use bitcoin::secp256k1::PublicKey; use dlc_custom_signer::CustomKeysManager; use dlc_custom_signer::CustomSigner; @@ -15,6 +16,8 @@ use lightning::routing::gossip::P2PGossipSync; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::ProbabilisticScorer; use lightning::routing::utxo::UtxoLookup; +use lightning_invoice::Invoice; +use lightning_invoice::InvoiceDescription; use lightning_net_tokio::SocketDescriptor; use lightning_persister::FilesystemPersister; use ln_dlc_wallet::LnDlcWallet; @@ -85,7 +88,7 @@ type RequestedScid = u64; type FakeChannelPaymentRequests = Arc>>; type PendingInterceptedHtlcs = Arc>>; -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct PaymentInfo { pub preimage: Option, pub secret: Option, @@ -114,3 +117,20 @@ impl MillisatAmount { self.0 } } + +impl From for PaymentInfo { + fn from(value: Invoice) -> Self { + Self { + preimage: None, + secret: Some(*value.payment_secret()), + status: HTLCStatus::Pending, + amt_msat: MillisatAmount(value.amount_milli_satoshis()), + flow: PaymentFlow::Inbound, + timestamp: OffsetDateTime::from(value.timestamp()), + description: match value.description() { + InvoiceDescription::Direct(direct) => direct.to_string(), + InvoiceDescription::Hash(hash) => hash.0.to_hex(), + }, + } + } +} diff --git a/crates/ln-dlc-node/src/node/mod.rs b/crates/ln-dlc-node/src/node/mod.rs index a1f19ec21..262a27376 100644 --- a/crates/ln-dlc-node/src/node/mod.rs +++ b/crates/ln-dlc-node/src/node/mod.rs @@ -219,6 +219,7 @@ where ephemeral_randomness: [u8; 32], settings: LnDlcNodeSettings, oracle: OracleInfo, + event_sender: watch::Sender>, ) -> Result { let mut user_config = coordinator_config(); @@ -242,7 +243,7 @@ where user_config, settings, oracle.into(), - None, + Some(event_sender), ) }