Skip to content

Commit

Permalink
Save payment fee in database and report on prometheus counter
Browse files Browse the repository at this point in the history
  • Loading branch information
da-kami committed Jul 17, 2023
1 parent 3a1a93a commit c70f478
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 19 deletions.
7 changes: 7 additions & 0 deletions coordinator/migrations/2023-07-13-010300_trade_fees/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file should undo anything in `up.sql`
DELETE FROM
payments
WHERE
payment_hash = '6f9b8c95c2ba7b1857b19f975372308161fedf50feb78a252200135a41875210';
ALTER TABLE
trades DROP COLUMN IF EXISTS "fee_payment_hash";
26 changes: 26 additions & 0 deletions coordinator/migrations/2023-07-13-010300_trade_fees/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- Your SQL goes here
-- insert a payment with amount zero that is a dummy default payment that corresponds to this actual zero-amount invoice:
-- lnbcrt0m1pj26wczdq50fjhymeqd9h8vmmfvdjsnp4qtwk40kf07d8fzlhdt2s9vqyeczarvk37safua4a0kz7wellkq3vjpp5d7dce9wzhfa3s4a3n7t4xu3ss9slah6sl6mc5ffzqqf45sv82ggqsp54cmyvxklz48ap60guqsln4wuh2ranlap3grg2djt5ykz7005fc6s9qyysgqcqpcxq8z7ps9nqepcz2l2s4qrp6qeks68qry55ylcz542c8a6m6fffmhlmhtn9nnf458ngv83pk0473xfecdk7m7maumqk4jvaymdg7fpgn9tujcehxpqqte97xd
-- We insert this dummy payment so we are able to sum up fees on already existing trades and make the fee payment hash a mandatory field.
INSERT INTO
payments (
payment_hash,
htlc_status,
amount_msat,
flow,
payment_timestamp,
description
)
VALUES
(
'6f9b8c95c2ba7b1857b19f975372308161fedf50feb78a252200135a41875210',
'Succeeded',
0,
'Inbound',
CURRENT_TIMESTAMP,
'zero amount payment dummy default value'
);
ALTER TABLE
trades
ADD
COLUMN "fee_payment_hash" TEXT NOT NULL DEFAULT '6f9b8c95c2ba7b1857b19f975372308161fedf50feb78a252200135a41875210' REFERENCES payments(payment_hash);
30 changes: 27 additions & 3 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Result;
use coordinator::cli::Opts;
use coordinator::db;
use coordinator::logger;
use coordinator::metrics::collect_metrics;
use coordinator::metrics;
use coordinator::metrics::init_meter;
use coordinator::node::connection;
use coordinator::node::storage::NodeStorage;
Expand All @@ -17,6 +17,9 @@ use coordinator::settings::Settings;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use diesel::PgConnection;
use hex::FromHex;
use lightning::ln::PaymentHash;
use lightning::util::events::Event;
use ln_dlc_node::seed::Bip39Seed;
use rand::thread_rng;
use rand::RngCore;
Expand All @@ -27,6 +30,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::watch;
use tokio::task::spawn_blocking;
use tracing::metadata::LevelFilter;
use trade::bitmex_client::BitmexClient;
Expand Down Expand Up @@ -86,6 +90,8 @@ async fn main() -> Result<()> {
let mut conn = pool.get()?;
run_migration(&mut conn);

let (node_event_sender, node_event_receiver) = watch::channel::<Option<Event>>(None);

let node = Arc::new(ln_dlc_node::node::Node::new_coordinator(
"10101.finance",
network,
Expand All @@ -99,6 +105,7 @@ async fn main() -> Result<()> {
ephemeral_randomness,
settings.ln_dlc.clone(),
opts.get_oracle_info(),
node_event_sender,
)?);

let node = Node::new(node, pool.clone());
Expand Down Expand Up @@ -130,12 +137,14 @@ async fn main() -> Result<()> {
}
});

metrics::collect_metrics_based_on_node_events(node.clone(), node_event_receiver);

tokio::spawn({
let node = node.clone();
async move {
loop {
let node = node.clone();
spawn_blocking(move || collect_metrics(node))
spawn_blocking(move || metrics::collect(node))
.await
.expect("To spawn blocking thread");
tokio::time::sleep(PROCESS_PROMETHEUS_METRICS).await;
Expand Down Expand Up @@ -218,8 +227,23 @@ async fn main() -> Result<()> {
}
};

// Upon collab closing an expired position we cannot charge a fee using an
// invoice. This dummy hash exists in the database to
// represent zero-amount invoices.
let zero_amount_payment_hash_dummy = PaymentHash(
<[u8; 32]>::from_hex(
"6f9b8c95c2ba7b1857b19f975372308161fedf50feb78a252200135a41875210",
)
.expect("static payment hash to decode"),
);

match node
.close_position(position, closing_price, channel_id)
.close_position(
position,
closing_price,
channel_id,
zero_amount_payment_hash_dummy,
)
.await
{
Ok(_) => tracing::info!(
Expand Down
25 changes: 25 additions & 0 deletions coordinator/src/db/trades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use crate::orderbook::db::custom_types::Direction;
use crate::schema::trades;
use anyhow::Result;
use autometrics::autometrics;
use bitcoin::hashes::hex::ToHex;
use bitcoin::secp256k1::PublicKey;
use diesel::prelude::*;
use hex::FromHex;
use lightning::ln::PaymentHash;
use std::str::FromStr;
use time::OffsetDateTime;

Expand All @@ -21,6 +24,7 @@ struct Trade {
direction: Direction,
average_price: f32,
timestamp: OffsetDateTime,
fee_payment_hash: String,
}

#[derive(Insertable, Debug, Clone)]
Expand All @@ -34,6 +38,7 @@ struct NewTrade {
collateral: i64,
direction: Direction,
average_price: f32,
pub fee_payment_hash: String,
}

#[autometrics]
Expand All @@ -48,6 +53,22 @@ pub fn insert(
Ok(trade.into())
}

/// Returns the position by trader pub key
#[autometrics]
pub fn is_payment_hash_registered_as_trade_fee(
conn: &mut PgConnection,
payment_hash: PaymentHash,
) -> QueryResult<bool> {
let payment_hash = payment_hash.0.to_hex();

let trade = trades::table
.filter(trades::fee_payment_hash.eq(payment_hash))
.first::<Trade>(conn)
.optional()?;

Ok(trade.is_some())
}

impl From<crate::trade::models::NewTrade> for NewTrade {
fn from(value: crate::trade::models::NewTrade) -> Self {
NewTrade {
Expand All @@ -59,6 +80,7 @@ impl From<crate::trade::models::NewTrade> for NewTrade {
collateral: value.collateral,
direction: value.direction.into(),
average_price: value.average_price,
fee_payment_hash: value.fee_payment_hash.0.to_hex(),
}
}
}
Expand All @@ -77,6 +99,9 @@ impl From<Trade> for crate::trade::models::Trade {
direction: value.direction.into(),
average_price: value.average_price,
timestamp: value.timestamp,
fee_payment_hash: PaymentHash(
<[u8; 32]>::from_hex(value.fee_payment_hash).expect("payment hash to decode"),
),
}
}
}
68 changes: 67 additions & 1 deletion coordinator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use crate::node::storage::NodeStorage;
use crate::node::Node;
use lazy_static::lazy_static;
use lightning::ln::channelmanager::ChannelDetails;
use lightning::util::events::Event;
use opentelemetry::global;
use opentelemetry::metrics::Counter;
use opentelemetry::metrics::Meter;
use opentelemetry::metrics::ObservableGauge;
use opentelemetry::sdk::export::metrics::aggregation;
Expand All @@ -15,6 +17,7 @@ use opentelemetry::KeyValue;
use opentelemetry_prometheus::PrometheusExporter;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use trade::ContractSymbol;
use trade::Direction;

Expand Down Expand Up @@ -58,6 +61,12 @@ lazy_static! {
.i64_observable_gauge("position_margin_sats")
.with_description("Current open position margin in sats")
.init();

// trade fee metrics
pub static ref TRADE_FEES: Counter<u64> = METER
.u64_counter("trade_fees_sats")
.with_description("Accumulated trading fees collected")
.init();
}

pub fn init_meter() -> PrometheusExporter {
Expand All @@ -71,7 +80,12 @@ pub fn init_meter() -> PrometheusExporter {
opentelemetry_prometheus::exporter(controller).init()
}

pub fn collect_metrics(node: Node) {
pub fn increment_fee_counter(increment: u64) {
let cx = opentelemetry::Context::current();
TRADE_FEES.add(&cx, increment, &[]);
}

pub fn collect(node: Node) {
let cx = opentelemetry::Context::current();
position_metrics(&cx, &node);

Expand Down Expand Up @@ -243,3 +257,55 @@ fn node_metrics(cx: &Context, inner_node: Arc<ln_dlc_node::node::Node<NodeStorag
}
}
}

pub fn collect_metrics_based_on_node_events(
node: Node,
mut node_event_receiver: watch::Receiver<Option<Event>>,
) {
tokio::spawn(async move {
loop {
match node_event_receiver.changed().await {
Ok(()) => {
let event = node_event_receiver.borrow().clone();

if let Some(Event::PaymentClaimed {
amount_msat,
payment_hash,
..
}) = event
{
tokio::task::spawn_blocking({
let node = node.clone();
move || {
let mut conn = match node.pool.get() {
Ok(conn) => conn,
Err(e) => {
tracing::error!("Failed to connect to database during node event post processing event: {e:#}");
return;
}
};

match db::trades::is_payment_hash_registered_as_trade_fee(
&mut conn,
payment_hash,
) {
Ok(true) => {
increment_fee_counter(amount_msat / 1000);
}
// irrelevant, not a fee
Ok(false) => (),
Err(e) => {
tracing::error!("Failed to load trades from database in node event post processing: {e:#}");
}
}
}
}).await.expect("To spawn blocking thread");
}
}
Err(e) => {
tracing::error!("Failed to receive event: {e:#}");
}
}
}
});
}
29 changes: 18 additions & 11 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use dlc_manager::payout_curve::RoundingIntervals;
use dlc_manager::ChannelId;
use dlc_messages::Message;
use lightning::ln::channelmanager::ChannelDetails;
use lightning::ln::PaymentHash;
use lightning_invoice::Invoice;
use ln_dlc_node::node;
use ln_dlc_node::node::dlc_message_name;
Expand Down Expand Up @@ -124,20 +125,15 @@ impl Node {

#[autometrics]
pub async fn trade(&self, trade_params: &TradeParams) -> Result<Invoice> {
let invoice = self.fee_invoice_taker(trade_params).await?;
let _fee_payment_hash = invoice.payment_hash();

// TODO: Save this invoice in the coordinator database
// The identifier is the payment hash
// Potentially safe the order id as meta data
let (fee_payment_hash, invoice) = self.fee_invoice_taker(trade_params).await?;

match self.decide_trade_action(&trade_params.pubkey)? {
TradeAction::Open => {
ensure!(
self.settings.read().await.allow_opening_positions,
"Opening positions is disabled"
);
self.open_position(trade_params).await?
self.open_position(trade_params, fee_payment_hash).await?
}
TradeAction::Close(channel_id) => {
let peer_id = trade_params.pubkey;
Expand All @@ -154,7 +150,7 @@ impl Node {
None => bail!("Failed to find open position : {}", trade_params.pubkey),
};

self.close_position(&position, closing_price, channel_id)
self.close_position(&position, closing_price, channel_id, fee_payment_hash)
.await?
}
};
Expand All @@ -163,7 +159,11 @@ impl Node {
}

#[autometrics]
async fn open_position(&self, trade_params: &TradeParams) -> Result<()> {
async fn open_position(
&self,
trade_params: &TradeParams,
fee_payment_hash: PaymentHash,
) -> Result<()> {
let peer_id = trade_params.pubkey;
tracing::info!(%peer_id, ?trade_params, "Opening position");

Expand Down Expand Up @@ -218,11 +218,15 @@ impl Node {
// into the database doesn't, it is more likely to succeed in the new order.
// FIXME: Note, we should not create a shadow representation (position) of the DLC struct,
// but rather imply the state from the DLC.
self.persist_position_and_trade(trade_params)
self.persist_position_and_trade(trade_params, fee_payment_hash)
}

// Creates a position and a trade from the trade params
fn persist_position_and_trade(&self, trade_params: &TradeParams) -> Result<()> {
fn persist_position_and_trade(
&self,
trade_params: &TradeParams,
fee_payment_hash: PaymentHash,
) -> Result<()> {
let liquidation_price = liquidation_price(trade_params);
let margin_coordinator = margin_coordinator(trade_params);

Expand Down Expand Up @@ -258,6 +262,7 @@ impl Node {
collateral: new_position.collateral,
direction: new_position.direction,
average_price: average_entry_price,
fee_payment_hash,
},
)?;

Expand All @@ -270,6 +275,7 @@ impl Node {
position: &Position,
closing_price: Decimal,
channel_id: ChannelId,
fee_payment_hash: PaymentHash,
) -> Result<()> {
let opening_price = Decimal::try_from(position.average_entry_price)?;

Expand Down Expand Up @@ -309,6 +315,7 @@ impl Node {
collateral: position.collateral,
direction: position.direction.opposite(),
average_price: closing_price.to_f32().expect("To fit into f32"),
fee_payment_hash,
},
)?;

Expand Down
Loading

0 comments on commit c70f478

Please sign in to comment.