Skip to content

Commit

Permalink
feat: check submarine swaps for expired invoices (#762)
Browse files Browse the repository at this point in the history
  • Loading branch information
michael1011 authored Jan 3, 2025
1 parent 850889d commit 9161e6a
Show file tree
Hide file tree
Showing 24 changed files with 1,085 additions and 39 deletions.
6 changes: 6 additions & 0 deletions boltzr/protos/boltzr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ service BoltzR {
rpc GetMessages (GetMessagesRequest) returns (stream GetMessagesResponse);

rpc SwapUpdate (stream SwapUpdateRequest) returns (stream SwapUpdateResponse);
rpc SendSwapUpdate (SendSwapUpdateRequest) returns (stream SendSwapUpdateResponse);

rpc StartWebHookRetries (StartWebHookRetriesRequest) returns (StartWebHookRetriesResponse);
rpc CreateWebHook (CreateWebHookRequest) returns (CreateWebHookResponse);
Expand Down Expand Up @@ -92,6 +93,11 @@ message SwapUpdateResponse {
repeated string ids = 1;
}

message SendSwapUpdateRequest {}
message SendSwapUpdateResponse {
SwapUpdate update = 1;
}

message StartWebHookRetriesRequest {}
message StartWebHookRetriesResponse {}

Expand Down
32 changes: 31 additions & 1 deletion boltzr/src/db/helpers/swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@ use crate::db::helpers::{BoxedCondition, QueryResponse};
use crate::db::models::Swap;
use crate::db::schema::swaps;
use crate::db::Pool;
use diesel::{QueryDsl, RunQueryDsl, SelectableHelper};
use crate::swap::SwapUpdate;
use diesel::prelude::*;
use diesel::{update, QueryDsl, RunQueryDsl, SelectableHelper};

pub type SwapCondition = BoxedCondition<swaps::table>;

pub trait SwapHelper {
fn get_all(&self, condition: SwapCondition) -> QueryResponse<Vec<Swap>>;
fn update_status(
&self,
id: &str,
status: SwapUpdate,
failure_reason: Option<String>,
) -> QueryResponse<usize>;
}

#[derive(Clone, Debug)]
Expand All @@ -28,4 +36,26 @@ impl SwapHelper for SwapHelperDatabase {
.filter(condition)
.load(&mut self.pool.get()?)?)
}

fn update_status(
&self,
id: &str,
status: SwapUpdate,
failure_reason: Option<String>,
) -> QueryResponse<usize> {
if let Some(failure_reason) = failure_reason {
Ok(update(swaps::dsl::swaps)
.filter(swaps::dsl::id.eq(id))
.set((
swaps::dsl::status.eq(status.to_string()),
swaps::dsl::failureReason.eq(failure_reason),
))
.execute(&mut self.pool.get()?)?)
} else {
Ok(update(swaps::dsl::swaps)
.filter(swaps::dsl::id.eq(id))
.set(swaps::dsl::status.eq(status.to_string()))
.execute(&mut self.pool.get()?)?)
}
}
}
1 change: 1 addition & 0 deletions boltzr/src/db/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ pub trait SomeSwap {

pub trait LightningSwap {
fn chain_symbol(&self) -> anyhow::Result<String>;
fn lightning_symbol(&self) -> anyhow::Result<String>;
}
17 changes: 17 additions & 0 deletions boltzr/src/db/models/reverse_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ impl LightningSwap for ReverseSwap {
pair.quote
})
}

fn lightning_symbol(&self) -> anyhow::Result<String> {
let pair = split_pair(&self.pair)?;
Ok(if self.orderSide == OrderSide::Buy as i32 {
pair.quote
} else {
pair.base
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -72,6 +81,14 @@ mod test {
assert_eq!(swap.chain_symbol().unwrap(), expected);
}

#[rstest]
#[case(OrderSide::Buy, "BTC")]
#[case(OrderSide::Sell, "L-BTC")]
fn test_lightning_symbol(#[case] side: OrderSide, #[case] expected: &str) {
let swap = create_swap(Some(side));
assert_eq!(swap.lightning_symbol().unwrap(), expected);
}

fn create_swap(order_side: Option<OrderSide>) -> ReverseSwap {
ReverseSwap {
transactionId: None,
Expand Down
21 changes: 21 additions & 0 deletions boltzr/src/db/models/swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub struct Swap {
pub pair: String,
pub orderSide: i32,
pub status: String,
pub failureReason: Option<String>,
pub invoice: Option<String>,
pub lockupAddress: String,
}

Expand All @@ -37,6 +39,15 @@ impl LightningSwap for Swap {
pair.base
})
}

fn lightning_symbol(&self) -> anyhow::Result<String> {
let pair = split_pair(&self.pair)?;
Ok(if self.orderSide == OrderSide::Buy as i32 {
pair.base
} else {
pair.quote
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -71,13 +82,23 @@ mod test {
assert_eq!(swap.chain_symbol().unwrap(), expected);
}

#[rstest]
#[case(OrderSide::Buy, "L-BTC")]
#[case(OrderSide::Sell, "BTC")]
fn test_lightning_symbol(#[case] side: OrderSide, #[case] expected: &str) {
let swap = create_swap(Some(side));
assert_eq!(swap.lightning_symbol().unwrap(), expected);
}

fn create_swap(order_side: Option<OrderSide>) -> Swap {
Swap {
id: "swap id".to_string(),
pair: "L-BTC/BTC".to_string(),
lockupAddress: "".to_string(),
status: "transaction.mempool".to_string(),
orderSide: order_side.unwrap_or(OrderSide::Buy) as i32,
invoice: None,
failureReason: None,
}
}
}
3 changes: 3 additions & 0 deletions boltzr/src/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ diesel::table! {
}

diesel::table! {
#[allow(non_snake_case)]
swaps (id) {
id -> Text,
pair -> Text,
orderSide -> Integer,
status -> Text,
failureReason -> Nullable<Text>,
invoice -> Nullable<Text>,
lockupAddress -> Text,
}
}
Expand Down
4 changes: 3 additions & 1 deletion boltzr/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ where
#[cfg(test)]
mod server_test {
use crate::api::ws;
use crate::api::ws::types::SwapStatus;
use crate::chain::utils::Transaction;
use crate::currencies::Currency;
use crate::db::helpers::web_hook::WebHookHelper;
Expand Down Expand Up @@ -220,6 +221,7 @@ mod server_test {
#[async_trait]
impl SwapManager for Manager {
fn get_currency(&self, symbol: &str) -> Option<Currency>;
fn listen_to_updates(&self) -> tokio::sync::broadcast::Receiver<SwapStatus>;
async fn scan_mempool(
&self,
symbols: Option<Vec<String>>,
Expand All @@ -230,7 +232,7 @@ mod server_test {
#[tokio::test]
async fn test_connect() {
let token = CancellationToken::new();
let (status_tx, _) = tokio::sync::broadcast::channel::<Vec<ws::types::SwapStatus>>(1);
let (status_tx, _) = tokio::sync::broadcast::channel::<Vec<SwapStatus>>(1);

let server = Server::<_, _, crate::notifications::mattermost::Client<Commands>>::new(
token.clone(),
Expand Down
57 changes: 54 additions & 3 deletions boltzr/src/grpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ use crate::evm::RefundSigner;
use crate::grpc::service::boltzr::boltz_r_server::BoltzR;
use crate::grpc::service::boltzr::scan_mempool_response::Transactions;
use crate::grpc::service::boltzr::sign_evm_refund_request::Contract;
use crate::grpc::service::boltzr::swap_update::{ChannelInfo, FailureDetails, TransactionInfo};
use crate::grpc::service::boltzr::{
bolt11_invoice, bolt12_invoice, decode_invoice_or_offer_response, Bolt11Invoice, Bolt12Invoice,
Bolt12Offer, CreateWebHookRequest, CreateWebHookResponse, DecodeInvoiceOrOfferRequest,
DecodeInvoiceOrOfferResponse, Feature, FetchInvoiceRequest, FetchInvoiceResponse,
GetInfoRequest, GetInfoResponse, GetMessagesRequest, GetMessagesResponse, LogLevel,
ScanMempoolRequest, ScanMempoolResponse, SendMessageRequest, SendMessageResponse,
SendWebHookRequest, SendWebHookResponse, SetLogLevelRequest, SetLogLevelResponse,
SignEvmRefundRequest, SignEvmRefundResponse, StartWebHookRetriesRequest,
StartWebHookRetriesResponse, SwapUpdateRequest, SwapUpdateResponse,
SendSwapUpdateRequest, SendSwapUpdateResponse, SendWebHookRequest, SendWebHookResponse,
SetLogLevelRequest, SetLogLevelResponse, SignEvmRefundRequest, SignEvmRefundResponse,
StartWebHookRetriesRequest, StartWebHookRetriesResponse, SwapUpdate, SwapUpdateRequest,
SwapUpdateResponse,
};
use crate::grpc::status_fetcher::StatusFetcher;
use crate::lightning::invoice::Invoice;
Expand Down Expand Up @@ -258,6 +260,54 @@ where
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}

type SendSwapUpdateStream =
Pin<Box<dyn Stream<Item = Result<SendSwapUpdateResponse, Status>> + Send>>;

#[instrument(name = "grpc::send_swap_update", skip_all)]
async fn send_swap_update(
&self,
request: Request<SendSwapUpdateRequest>,
) -> Result<Response<Self::SendSwapUpdateStream>, Status> {
extract_parent_context(&request);

let mut update_rx = self.manager.listen_to_updates();

let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
while let Ok(update) = update_rx.recv().await {
if let Err(err) = tx
.send(Ok(SendSwapUpdateResponse {
update: Some(SwapUpdate {
id: update.id,
status: update.status,
failure_reason: update.failure_reason,
zero_conf_rejected: update.zero_conf_rejected,
channel_info: update.channel_info.map(|info| ChannelInfo {
funding_transaction_id: info.funding_transaction_id,
funding_transaction_vout: info.funding_transaction_vout,
}),
failure_details: update.failure_details.map(|dt| FailureDetails {
actual: dt.actual,
expected: dt.expected,
}),
transaction_info: update.transaction.map(|tx| TransactionInfo {
id: tx.id,
hex: tx.hex,
eta: tx.eta,
}),
}),
}))
.await
{
debug!("send_swap_update stream closed: {}", err);
break;
}
}
});

Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}

#[instrument(name = "grpc::start_web_hook_retries", skip_all)]
async fn start_web_hook_retries(
&self,
Expand Down Expand Up @@ -727,6 +777,7 @@ mod test {
#[async_trait]
impl SwapManager for Manager {
fn get_currency(&self, symbol: &str) -> Option<Currency>;
fn listen_to_updates(&self) -> tokio::sync::broadcast::Receiver<crate::api::ws::types::SwapStatus>;
async fn scan_mempool(
&self,
symbols: Option<Vec<String>>,
Expand Down
8 changes: 6 additions & 2 deletions boltzr/src/lightning/cln/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::fs;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
use tracing::{info, instrument};
use tracing::{debug, info, instrument};

#[allow(clippy::enum_variant_names)]
mod cln_rpc {
Expand Down Expand Up @@ -67,7 +67,7 @@ impl Cln {
let res = self
.cln
.fetch_invoice(FetchinvoiceRequest {
offer,
offer: offer.clone(),
amount_msat: Some(Amount { msat: amount_msat }),
timeout: None,
quantity: None,
Expand All @@ -79,6 +79,10 @@ impl Cln {
})
.await
.map_err(Self::parse_error)?;
debug!(
"Fetched invoice for {}msat for offer {}",
amount_msat, offer
);

Ok(res.into_inner().invoice)
}
Expand Down
20 changes: 19 additions & 1 deletion boltzr/src/lightning/invoice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ pub enum Invoice {
}

impl Invoice {
fn is_for_network(&self, network: wallet::Network) -> bool {
pub fn is_expired(&self) -> bool {
match self {
Invoice::Bolt11(invoice) => invoice.is_expired(),
Invoice::Offer(offer) => offer.is_expired(),
Invoice::Bolt12(invoice) => invoice.is_expired(),
}
}

pub fn is_for_network(&self, network: wallet::Network) -> bool {
let chain_hash = Self::network_to_chain_hash(network);

match self {
Expand Down Expand Up @@ -112,6 +120,7 @@ mod test {
};
use crate::wallet;
use bech32::FromBase32;
use rstest::*;
use std::str::FromStr;

const BOLT12_OFFER: &str = "lno1qgsqvgnwgcg35z6ee2h3yczraddm72xrfua9uve2rlrm9deu7xyfzrc2q3skgumxzcssyeyreggqmet8r4k6krvd3knppsx6c8v5g7tj8hcuq8lleta9ve5n";
Expand Down Expand Up @@ -212,4 +221,13 @@ mod test {
)
);
}

#[rstest]
#[case(BOLT11_INVOICE, true)]
#[case(BOLT12_INVOICE, true)]
#[case(BOLT12_OFFER, false)]
fn test_invoice_is_expired(#[case] invoice: &str, #[case] expected: bool) {
let res = decode(wallet::Network::Regtest, invoice).unwrap();
assert_eq!(res.is_expired(), expected);
}
}
13 changes: 12 additions & 1 deletion boltzr/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,17 @@ async fn main() {
let (swap_status_update_tx, _swap_status_update_rx) =
tokio::sync::broadcast::channel::<Vec<ws::types::SwapStatus>>(128);

let swap_manager = Arc::new(Manager::new(
cancellation_token.clone(),
currencies,
db_pool.clone(),
));

let mut grpc_server = grpc::server::Server::new(
cancellation_token.clone(),
config.sidecar.grpc,
log_reload_handler,
Arc::new(Manager::new(currencies, db_pool.clone())),
swap_manager.clone(),
swap_status_update_tx.clone(),
Box::new(db::helpers::web_hook::WebHookHelperDatabase::new(db_pool)),
web_hook_caller,
Expand Down Expand Up @@ -253,6 +259,10 @@ async fn main() {
}
});

let swap_manager_handler = tokio::spawn(async move {
swap_manager.start().await;
});

ctrlc::set_handler(move || {
info!("Got shutdown signal");
cancellation_token.cancel();
Expand All @@ -269,6 +279,7 @@ async fn main() {
api_handle.await.unwrap();
grpc_handle.await.unwrap();
status_ws_handler.await.unwrap();
swap_manager_handler.await.unwrap();
notification_listener_handle.await.unwrap();

#[cfg(feature = "metrics")]
Expand Down
Loading

0 comments on commit 9161e6a

Please sign in to comment.