diff --git a/boltzr/protos/boltzr.proto b/boltzr/protos/boltzr.proto index d967ad46..07800bdc 100644 --- a/boltzr/protos/boltzr.proto +++ b/boltzr/protos/boltzr.proto @@ -11,6 +11,7 @@ service BoltzR { rpc GetMessages (GetMessagesRequest) returns (stream GetMessagesResponse); rpc SwapUpdate (stream SwapUpdateRequest) returns (stream SwapUpdateResponse); + rpc SendSwapUpdate (SendSwapUpdateRequest) returns (stream SendSwapUpdateResponse); rpc StartWebHookRetries (StartWebHookRetriesRequest) returns (StartWebHookRetriesResponse); rpc CreateWebHook (CreateWebHookRequest) returns (CreateWebHookResponse); @@ -92,6 +93,11 @@ message SwapUpdateResponse { repeated string ids = 1; } +message SendSwapUpdateRequest {} +message SendSwapUpdateResponse { + SwapUpdate update = 1; +} + message StartWebHookRetriesRequest {} message StartWebHookRetriesResponse {} diff --git a/boltzr/src/db/helpers/swap.rs b/boltzr/src/db/helpers/swap.rs index c5fc77e4..d57b024c 100644 --- a/boltzr/src/db/helpers/swap.rs +++ b/boltzr/src/db/helpers/swap.rs @@ -2,12 +2,20 @@ use crate::db::helpers::{BoxedCondition, QueryResponse}; use crate::db::models::Swap; use crate::db::schema::swaps; use crate::db::Pool; -use diesel::{QueryDsl, RunQueryDsl, SelectableHelper}; +use crate::swap::SwapUpdate; +use diesel::prelude::*; +use diesel::{update, QueryDsl, RunQueryDsl, SelectableHelper}; pub type SwapCondition = BoxedCondition; pub trait SwapHelper { fn get_all(&self, condition: SwapCondition) -> QueryResponse>; + fn update_status( + &self, + id: &str, + status: SwapUpdate, + failure_reason: Option, + ) -> QueryResponse; } #[derive(Clone, Debug)] @@ -28,4 +36,26 @@ impl SwapHelper for SwapHelperDatabase { .filter(condition) .load(&mut self.pool.get()?)?) } + + fn update_status( + &self, + id: &str, + status: SwapUpdate, + failure_reason: Option, + ) -> QueryResponse { + if let Some(failure_reason) = failure_reason { + Ok(update(swaps::dsl::swaps) + .filter(swaps::dsl::id.eq(id)) + .set(( + swaps::dsl::status.eq(status.to_string()), + swaps::dsl::failureReason.eq(failure_reason), + )) + .execute(&mut self.pool.get()?)?) + } else { + Ok(update(swaps::dsl::swaps) + .filter(swaps::dsl::id.eq(id)) + .set(swaps::dsl::status.eq(status.to_string())) + .execute(&mut self.pool.get()?)?) + } + } } diff --git a/boltzr/src/db/models/mod.rs b/boltzr/src/db/models/mod.rs index 19466b49..d0d3515f 100644 --- a/boltzr/src/db/models/mod.rs +++ b/boltzr/src/db/models/mod.rs @@ -30,4 +30,5 @@ pub trait SomeSwap { pub trait LightningSwap { fn chain_symbol(&self) -> anyhow::Result; + fn lightning_symbol(&self) -> anyhow::Result; } diff --git a/boltzr/src/db/models/reverse_swap.rs b/boltzr/src/db/models/reverse_swap.rs index 493cc614..8eb1a9ec 100644 --- a/boltzr/src/db/models/reverse_swap.rs +++ b/boltzr/src/db/models/reverse_swap.rs @@ -38,6 +38,15 @@ impl LightningSwap for ReverseSwap { pair.quote }) } + + fn lightning_symbol(&self) -> anyhow::Result { + let pair = split_pair(&self.pair)?; + Ok(if self.orderSide == OrderSide::Buy as i32 { + pair.quote + } else { + pair.base + }) + } } #[cfg(test)] @@ -72,6 +81,14 @@ mod test { assert_eq!(swap.chain_symbol().unwrap(), expected); } + #[rstest] + #[case(OrderSide::Buy, "BTC")] + #[case(OrderSide::Sell, "L-BTC")] + fn test_lightning_symbol(#[case] side: OrderSide, #[case] expected: &str) { + let swap = create_swap(Some(side)); + assert_eq!(swap.lightning_symbol().unwrap(), expected); + } + fn create_swap(order_side: Option) -> ReverseSwap { ReverseSwap { transactionId: None, diff --git a/boltzr/src/db/models/swap.rs b/boltzr/src/db/models/swap.rs index 7f041647..2aeb077c 100644 --- a/boltzr/src/db/models/swap.rs +++ b/boltzr/src/db/models/swap.rs @@ -11,6 +11,8 @@ pub struct Swap { pub pair: String, pub orderSide: i32, pub status: String, + pub failureReason: Option, + pub invoice: Option, pub lockupAddress: String, } @@ -37,6 +39,15 @@ impl LightningSwap for Swap { pair.base }) } + + fn lightning_symbol(&self) -> anyhow::Result { + let pair = split_pair(&self.pair)?; + Ok(if self.orderSide == OrderSide::Buy as i32 { + pair.base + } else { + pair.quote + }) + } } #[cfg(test)] @@ -71,6 +82,14 @@ mod test { assert_eq!(swap.chain_symbol().unwrap(), expected); } + #[rstest] + #[case(OrderSide::Buy, "L-BTC")] + #[case(OrderSide::Sell, "BTC")] + fn test_lightning_symbol(#[case] side: OrderSide, #[case] expected: &str) { + let swap = create_swap(Some(side)); + assert_eq!(swap.lightning_symbol().unwrap(), expected); + } + fn create_swap(order_side: Option) -> Swap { Swap { id: "swap id".to_string(), @@ -78,6 +97,8 @@ mod test { lockupAddress: "".to_string(), status: "transaction.mempool".to_string(), orderSide: order_side.unwrap_or(OrderSide::Buy) as i32, + invoice: None, + failureReason: None, } } } diff --git a/boltzr/src/db/schema.rs b/boltzr/src/db/schema.rs index 13bf0899..db84bb6d 100644 --- a/boltzr/src/db/schema.rs +++ b/boltzr/src/db/schema.rs @@ -11,11 +11,14 @@ diesel::table! { } diesel::table! { + #[allow(non_snake_case)] swaps (id) { id -> Text, pair -> Text, orderSide -> Integer, status -> Text, + failureReason -> Nullable, + invoice -> Nullable, lockupAddress -> Text, } } diff --git a/boltzr/src/grpc/server.rs b/boltzr/src/grpc/server.rs index e92bc966..96fa3f5a 100644 --- a/boltzr/src/grpc/server.rs +++ b/boltzr/src/grpc/server.rs @@ -151,6 +151,7 @@ where #[cfg(test)] mod server_test { use crate::api::ws; + use crate::api::ws::types::SwapStatus; use crate::chain::utils::Transaction; use crate::currencies::Currency; use crate::db::helpers::web_hook::WebHookHelper; @@ -220,6 +221,7 @@ mod server_test { #[async_trait] impl SwapManager for Manager { fn get_currency(&self, symbol: &str) -> Option; + fn listen_to_updates(&self) -> tokio::sync::broadcast::Receiver; async fn scan_mempool( &self, symbols: Option>, @@ -230,7 +232,7 @@ mod server_test { #[tokio::test] async fn test_connect() { let token = CancellationToken::new(); - let (status_tx, _) = tokio::sync::broadcast::channel::>(1); + let (status_tx, _) = tokio::sync::broadcast::channel::>(1); let server = Server::<_, _, crate::notifications::mattermost::Client>::new( token.clone(), diff --git a/boltzr/src/grpc/service.rs b/boltzr/src/grpc/service.rs index 1770675a..a8ab36ba 100644 --- a/boltzr/src/grpc/service.rs +++ b/boltzr/src/grpc/service.rs @@ -5,15 +5,17 @@ use crate::evm::RefundSigner; use crate::grpc::service::boltzr::boltz_r_server::BoltzR; use crate::grpc::service::boltzr::scan_mempool_response::Transactions; use crate::grpc::service::boltzr::sign_evm_refund_request::Contract; +use crate::grpc::service::boltzr::swap_update::{ChannelInfo, FailureDetails, TransactionInfo}; use crate::grpc::service::boltzr::{ bolt11_invoice, bolt12_invoice, decode_invoice_or_offer_response, Bolt11Invoice, Bolt12Invoice, Bolt12Offer, CreateWebHookRequest, CreateWebHookResponse, DecodeInvoiceOrOfferRequest, DecodeInvoiceOrOfferResponse, Feature, FetchInvoiceRequest, FetchInvoiceResponse, GetInfoRequest, GetInfoResponse, GetMessagesRequest, GetMessagesResponse, LogLevel, ScanMempoolRequest, ScanMempoolResponse, SendMessageRequest, SendMessageResponse, - SendWebHookRequest, SendWebHookResponse, SetLogLevelRequest, SetLogLevelResponse, - SignEvmRefundRequest, SignEvmRefundResponse, StartWebHookRetriesRequest, - StartWebHookRetriesResponse, SwapUpdateRequest, SwapUpdateResponse, + SendSwapUpdateRequest, SendSwapUpdateResponse, SendWebHookRequest, SendWebHookResponse, + SetLogLevelRequest, SetLogLevelResponse, SignEvmRefundRequest, SignEvmRefundResponse, + StartWebHookRetriesRequest, StartWebHookRetriesResponse, SwapUpdate, SwapUpdateRequest, + SwapUpdateResponse, }; use crate::grpc::status_fetcher::StatusFetcher; use crate::lightning::invoice::Invoice; @@ -258,6 +260,54 @@ where Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) } + type SendSwapUpdateStream = + Pin> + Send>>; + + #[instrument(name = "grpc::send_swap_update", skip_all)] + async fn send_swap_update( + &self, + request: Request, + ) -> Result, Status> { + extract_parent_context(&request); + + let mut update_rx = self.manager.listen_to_updates(); + + let (tx, rx) = mpsc::channel(128); + tokio::spawn(async move { + while let Ok(update) = update_rx.recv().await { + if let Err(err) = tx + .send(Ok(SendSwapUpdateResponse { + update: Some(SwapUpdate { + id: update.id, + status: update.status, + failure_reason: update.failure_reason, + zero_conf_rejected: update.zero_conf_rejected, + channel_info: update.channel_info.map(|info| ChannelInfo { + funding_transaction_id: info.funding_transaction_id, + funding_transaction_vout: info.funding_transaction_vout, + }), + failure_details: update.failure_details.map(|dt| FailureDetails { + actual: dt.actual, + expected: dt.expected, + }), + transaction_info: update.transaction.map(|tx| TransactionInfo { + id: tx.id, + hex: tx.hex, + eta: tx.eta, + }), + }), + })) + .await + { + debug!("send_swap_update stream closed: {}", err); + break; + } + } + }); + + Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) + } + #[instrument(name = "grpc::start_web_hook_retries", skip_all)] async fn start_web_hook_retries( &self, @@ -727,6 +777,7 @@ mod test { #[async_trait] impl SwapManager for Manager { fn get_currency(&self, symbol: &str) -> Option; + fn listen_to_updates(&self) -> tokio::sync::broadcast::Receiver; async fn scan_mempool( &self, symbols: Option>, diff --git a/boltzr/src/lightning/cln/mod.rs b/boltzr/src/lightning/cln/mod.rs index c32d4f64..6350c912 100644 --- a/boltzr/src/lightning/cln/mod.rs +++ b/boltzr/src/lightning/cln/mod.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::fs; use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity}; -use tracing::{info, instrument}; +use tracing::{debug, info, instrument}; #[allow(clippy::enum_variant_names)] mod cln_rpc { @@ -67,7 +67,7 @@ impl Cln { let res = self .cln .fetch_invoice(FetchinvoiceRequest { - offer, + offer: offer.clone(), amount_msat: Some(Amount { msat: amount_msat }), timeout: None, quantity: None, @@ -79,6 +79,10 @@ impl Cln { }) .await .map_err(Self::parse_error)?; + debug!( + "Fetched invoice for {}msat for offer {}", + amount_msat, offer + ); Ok(res.into_inner().invoice) } diff --git a/boltzr/src/lightning/invoice.rs b/boltzr/src/lightning/invoice.rs index bdef756e..e7611651 100644 --- a/boltzr/src/lightning/invoice.rs +++ b/boltzr/src/lightning/invoice.rs @@ -34,7 +34,15 @@ pub enum Invoice { } impl Invoice { - fn is_for_network(&self, network: wallet::Network) -> bool { + pub fn is_expired(&self) -> bool { + match self { + Invoice::Bolt11(invoice) => invoice.is_expired(), + Invoice::Offer(offer) => offer.is_expired(), + Invoice::Bolt12(invoice) => invoice.is_expired(), + } + } + + pub fn is_for_network(&self, network: wallet::Network) -> bool { let chain_hash = Self::network_to_chain_hash(network); match self { @@ -112,6 +120,7 @@ mod test { }; use crate::wallet; use bech32::FromBase32; + use rstest::*; use std::str::FromStr; const BOLT12_OFFER: &str = "lno1qgsqvgnwgcg35z6ee2h3yczraddm72xrfua9uve2rlrm9deu7xyfzrc2q3skgumxzcssyeyreggqmet8r4k6krvd3knppsx6c8v5g7tj8hcuq8lleta9ve5n"; @@ -212,4 +221,13 @@ mod test { ) ); } + + #[rstest] + #[case(BOLT11_INVOICE, true)] + #[case(BOLT12_INVOICE, true)] + #[case(BOLT12_OFFER, false)] + fn test_invoice_is_expired(#[case] invoice: &str, #[case] expected: bool) { + let res = decode(wallet::Network::Regtest, invoice).unwrap(); + assert_eq!(res.is_expired(), expected); + } } diff --git a/boltzr/src/main.rs b/boltzr/src/main.rs index 604bc4bc..73fc3c7d 100644 --- a/boltzr/src/main.rs +++ b/boltzr/src/main.rs @@ -195,11 +195,17 @@ async fn main() { let (swap_status_update_tx, _swap_status_update_rx) = tokio::sync::broadcast::channel::>(128); + let swap_manager = Arc::new(Manager::new( + cancellation_token.clone(), + currencies, + db_pool.clone(), + )); + let mut grpc_server = grpc::server::Server::new( cancellation_token.clone(), config.sidecar.grpc, log_reload_handler, - Arc::new(Manager::new(currencies, db_pool.clone())), + swap_manager.clone(), swap_status_update_tx.clone(), Box::new(db::helpers::web_hook::WebHookHelperDatabase::new(db_pool)), web_hook_caller, @@ -253,6 +259,10 @@ async fn main() { } }); + let swap_manager_handler = tokio::spawn(async move { + swap_manager.start().await; + }); + ctrlc::set_handler(move || { info!("Got shutdown signal"); cancellation_token.cancel(); @@ -269,6 +279,7 @@ async fn main() { api_handle.await.unwrap(); grpc_handle.await.unwrap(); status_ws_handler.await.unwrap(); + swap_manager_handler.await.unwrap(); notification_listener_handle.await.unwrap(); #[cfg(feature = "metrics")] diff --git a/boltzr/src/swap/expiry_checker.rs b/boltzr/src/swap/expiry_checker.rs new file mode 100644 index 00000000..27dce8e3 --- /dev/null +++ b/boltzr/src/swap/expiry_checker.rs @@ -0,0 +1,266 @@ +use crate::api::ws::types::SwapStatus; +use crate::currencies::Currencies; +use crate::db::helpers::swap::SwapHelper; +use crate::db::models::LightningSwap; +use crate::lightning::invoice; +use crate::swap::{serialize_swap_updates, SwapUpdate}; +use anyhow::anyhow; +use diesel::{BoolExpressionMethods, ExpressionMethods}; +use std::sync::Arc; +use std::time::Duration; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, instrument, trace}; + +const CHECK_INTERVAL_SECONDS: u64 = 60; +const EXPIRED_INVOICE_FAILURE_REASON: &str = "invoice expired"; + +pub struct InvoiceExpiryChecker { + cancellation_token: CancellationToken, + update_tx: tokio::sync::broadcast::Sender, + + currencies: Arc, + swap_repo: Arc, +} + +impl InvoiceExpiryChecker { + pub fn new( + cancellation_token: CancellationToken, + update_tx: tokio::sync::broadcast::Sender, + currencies: Arc, + swap_repo: Arc, + ) -> Self { + InvoiceExpiryChecker { + swap_repo, + update_tx, + currencies, + cancellation_token, + } + } + + pub async fn start(&self) { + let duration = Duration::from_secs(CHECK_INTERVAL_SECONDS); + info!( + "Checking for expired invoices of Submarine Swaps every {:#?}", + duration + ); + let mut interval = tokio::time::interval(duration); + + tokio::select! { + _ = tokio::time::sleep(duration) => {}, + _ = self.cancellation_token.cancelled() => { + return; + } + } + + loop { + tokio::select! { + _ = interval.tick() => {}, + _ = self.cancellation_token.cancelled() => { + break; + } + } + + if let Err(err) = self.check().await { + error!( + "Checking Submarine Swaps for expired invoices failed: {}", + err + ); + } + } + } + + #[instrument(name = "InvoiceExpiryChecker::check", skip_all)] + pub async fn check(&self) -> anyhow::Result<()> { + let swaps = self.swap_repo.get_all(Box::new( + crate::db::schema::swaps::dsl::status + .ne_all(serialize_swap_updates(&[ + SwapUpdate::SwapExpired, + SwapUpdate::InvoicePending, + SwapUpdate::InvoiceFailedToPay, + SwapUpdate::TransactionClaimed, + SwapUpdate::TransactionLockupFailed, + SwapUpdate::TransactionClaimPending, + ])) + .and(crate::db::schema::swaps::dsl::invoice.is_not_null()), + ))?; + trace!( + "Checking for expired invoices of {} Submarine Swaps", + swaps.len() + ); + + for swap in swaps { + let invoice = match &swap.invoice { + Some(invoice) => invoice, + None => continue, + }; + let network = match self.currencies.get(&swap.lightning_symbol()?) { + Some(cur) => cur.network, + None => { + return Err(anyhow!( + "currency {} not configured", + swap.lightning_symbol()? + )); + } + }; + + if !invoice::decode(network, invoice)?.is_expired() { + continue; + } + + info!( + "Failing Submarine Swap {} because its invoice expired already", + swap.id + ); + + let status = SwapUpdate::InvoiceFailedToPay; + let failure_reason = EXPIRED_INVOICE_FAILURE_REASON; + + self.swap_repo + .update_status(&swap.id, status, Some(failure_reason.to_string()))?; + + self.update_tx.send(SwapStatus { + id: swap.id, + status: status.to_string(), + failure_reason: Some(failure_reason.to_string()), + transaction: None, + channel_info: None, + failure_details: None, + zero_conf_rejected: None, + })?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use crate::api::ws::types::SwapStatus; + use crate::currencies::{Currencies, Currency}; + use crate::db::helpers::swap::{SwapCondition, SwapHelper}; + use crate::db::helpers::QueryResponse; + use crate::db::models::Swap; + use crate::swap::expiry_checker::{InvoiceExpiryChecker, EXPIRED_INVOICE_FAILURE_REASON}; + use crate::swap::SwapUpdate; + use crate::wallet::{Bitcoin, Network}; + use mockall::{mock, predicate}; + use std::sync::{Arc, OnceLock}; + use tokio_util::sync::CancellationToken; + + mock! { + SwapHelper {} + + impl Clone for SwapHelper { + fn clone(&self) -> Self; + } + + impl SwapHelper for SwapHelper { + fn get_all(&self, condition: SwapCondition) -> QueryResponse>; + fn update_status( + &self, + id: &str, + status: SwapUpdate, + failure_reason: Option, + ) -> QueryResponse; + } + } + + fn get_currencies() -> Currencies { + static CURRENCIES: OnceLock = OnceLock::new(); + CURRENCIES + .get_or_init(|| { + Currencies::from([( + String::from("BTC"), + Currency { + network: Network::Regtest, + wallet: Arc::new(Bitcoin::new(Network::Regtest)), + chain: Some(Arc::new(Box::new( + crate::chain::chain_client::test::get_client(), + ))), + cln: None, + lnd: None, + }, + )]) + }) + .clone() + } + + #[tokio::test] + async fn test_check_ignore_non_expired() { + let mut swap = MockSwapHelper::new(); + swap.expect_get_all().returning(|_| { + Ok(vec![ + Swap { + id: "id".to_string(), + pair: "L-BTC/BTC".to_string(), + orderSide: 1, + status: "invoice.set".to_string(), + invoice: Some("lno1qgsqvgnwgcg35z6ee2h3yczraddm72xrfua9uve2rlrm9deu7xyfzrc2q3skgumxzcssyeyreggqmet8r4k6krvd3knppsx6c8v5g7tj8hcuq8lleta9ve5n".to_string()), + failureReason: None, + lockupAddress: "".to_string(), + } + ]) + }); + + let (tx, _) = tokio::sync::broadcast::channel(1); + let checker = InvoiceExpiryChecker::new( + CancellationToken::new(), + tx, + Arc::new(get_currencies()), + Arc::new(swap), + ); + + checker.check().await.unwrap(); + } + + #[tokio::test] + async fn test_check_expired_invoice() { + let swap_id = "expired"; + + let mut swap = MockSwapHelper::new(); + swap.expect_get_all().returning(|_| { + Ok(vec![ + Swap { + id: swap_id.to_string(), + pair: "L-BTC/BTC".to_string(), + orderSide: 1, + status: "invoice.set".to_string(), + invoice: Some("lnbcrt1230p1pnwzkshsp584p434kjslfl030shwps75nvy4leq5k6psvdxn4kzsxjnptlmr3spp5nxqauehzqkx3xswjtrgx9lh5pqjxkyx0kszj0nc4m4jn7uk9gc5qdq8v9ekgesxqyjw5qcqp29qxpqysgqu6ft6p8c36khp082xng2xzmta25nlg803qjncal3fhzw8eshrsdyevhlgs970a09n95r3gtvqvvyk24vyv4506cu6cxl8ytaywrjkhcp468qnl".to_string()), + failureReason: None, + lockupAddress: "".to_string(), + } + ]) + }); + swap.expect_update_status() + .with( + predicate::eq(swap_id), + predicate::eq(SwapUpdate::InvoiceFailedToPay), + predicate::eq(Some(EXPIRED_INVOICE_FAILURE_REASON.to_string())), + ) + .returning(|_, _, _| Ok(1)); + + let (tx, mut rx) = tokio::sync::broadcast::channel(1); + let checker = InvoiceExpiryChecker::new( + CancellationToken::new(), + tx, + Arc::new(get_currencies()), + Arc::new(swap), + ); + + checker.check().await.unwrap(); + + let emitted = rx.recv().await.unwrap(); + assert_eq!( + emitted, + SwapStatus { + id: swap_id.to_string(), + status: SwapUpdate::InvoiceFailedToPay.to_string(), + failure_reason: Some(EXPIRED_INVOICE_FAILURE_REASON.to_string()), + zero_conf_rejected: None, + transaction: None, + failure_details: None, + channel_info: None, + } + ); + } +} diff --git a/boltzr/src/swap/filters.rs b/boltzr/src/swap/filters.rs index 9fdf3c20..78307c6d 100644 --- a/boltzr/src/swap/filters.rs +++ b/boltzr/src/swap/filters.rs @@ -41,6 +41,7 @@ fn get_swap_filters( SwapUpdate::InvoicePending, SwapUpdate::InvoiceFailedToPay, SwapUpdate::TransactionClaimed, + SwapUpdate::TransactionClaimPending, ])), ))?; @@ -231,6 +232,12 @@ mod test { impl SwapHelper for SwapHelper { fn get_all(&self, condition: SwapCondition) -> QueryResponse>; + fn update_status( + &self, + id: &str, + status: SwapUpdate, + failure_reason: Option, + ) -> QueryResponse; } } @@ -322,6 +329,8 @@ mod test { status: "".to_string(), pair: "BTC/BTC".to_string(), lockupAddress: address_bitcoin.to_string(), + invoice: None, + failureReason: None, }, Swap { orderSide: 1, @@ -329,6 +338,8 @@ mod test { status: "".to_string(), pair: "L-BTC/BTC".to_string(), lockupAddress: address_elements.to_string(), + invoice: None, + failureReason: None, }, ]) }); @@ -527,6 +538,8 @@ mod test { pair: "".to_string(), status: "".to_string(), lockupAddress: "".to_string(), + invoice: None, + failureReason: None, }, ); assert!(currency.cloned().is_some()); @@ -545,6 +558,8 @@ mod test { pair: "".to_string(), status: "".to_string(), lockupAddress: "".to_string(), + invoice: None, + failureReason: None, } ) .is_none()); @@ -562,6 +577,8 @@ mod test { pair: "".to_string(), status: "".to_string(), lockupAddress: "".to_string(), + invoice: None, + failureReason: None, } ) .is_none()); @@ -576,6 +593,8 @@ mod test { pair: "".to_string(), status: "".to_string(), lockupAddress: "".to_string(), + invoice: None, + failureReason: None, }; let address = "bcrt1pjcv9r3jeug6xmgug6hu0p4lux7r9996yxk9m2xxammfqq4kxdvkqhdu0h5"; @@ -594,6 +613,8 @@ mod test { pair: "".to_string(), status: "".to_string(), lockupAddress: "".to_string(), + invoice: None, + failureReason: None, }; assert_eq!(decode_script(&wallet, &swap, "invalid"), None); diff --git a/boltzr/src/swap/manager.rs b/boltzr/src/swap/manager.rs index 3a93743a..55dea502 100644 --- a/boltzr/src/swap/manager.rs +++ b/boltzr/src/swap/manager.rs @@ -1,20 +1,25 @@ +use crate::api::ws::types::SwapStatus; use crate::chain::utils::Transaction; use crate::currencies::{Currencies, Currency}; use crate::db::helpers::chain_swap::{ChainSwapHelper, ChainSwapHelperDatabase}; use crate::db::helpers::reverse_swap::{ReverseSwapHelper, ReverseSwapHelperDatabase}; use crate::db::helpers::swap::{SwapHelper, SwapHelperDatabase}; use crate::db::Pool; +use crate::swap::expiry_checker::InvoiceExpiryChecker; use crate::swap::filters::get_input_output_filters; use anyhow::{anyhow, Result}; use async_trait::async_trait; use std::collections::HashMap; use std::sync::Arc; +use tokio_util::sync::CancellationToken; use tracing::{debug, info}; #[async_trait] pub trait SwapManager { fn get_currency(&self, symbol: &str) -> Option; + fn listen_to_updates(&self) -> tokio::sync::broadcast::Receiver; + async fn scan_mempool( &self, symbols: Option>, @@ -23,7 +28,10 @@ pub trait SwapManager { #[derive(Clone)] pub struct Manager { - currencies: Currencies, + update_tx: tokio::sync::broadcast::Sender, + + currencies: Arc, + cancellation_token: CancellationToken, swap_repo: Arc, chain_swap_repo: Arc, @@ -31,14 +39,33 @@ pub struct Manager { } impl Manager { - pub fn new(currencies: Currencies, pool: Pool) -> Self { + pub fn new(cancellation_token: CancellationToken, currencies: Currencies, pool: Pool) -> Self { + let (update_tx, _) = tokio::sync::broadcast::channel::(128); + Manager { - currencies, + update_tx, + cancellation_token, + currencies: Arc::new(currencies), swap_repo: Arc::new(SwapHelperDatabase::new(pool.clone())), chain_swap_repo: Arc::new(ChainSwapHelperDatabase::new(pool.clone())), reverse_swap_repo: Arc::new(ReverseSwapHelperDatabase::new(pool)), } } + + pub async fn start(&self) { + let expiry_checker = InvoiceExpiryChecker::new( + self.cancellation_token.clone(), + self.update_tx.clone(), + self.currencies.clone(), + self.swap_repo.clone(), + ); + + tokio::spawn(async move { + expiry_checker.start().await; + }) + .await + .unwrap(); + } } #[async_trait] @@ -47,6 +74,10 @@ impl SwapManager for Manager { self.currencies.get(symbol).cloned() } + fn listen_to_updates(&self) -> tokio::sync::broadcast::Receiver { + self.update_tx.subscribe() + } + async fn scan_mempool( &self, symbols: Option>, diff --git a/boltzr/src/swap/mod.rs b/boltzr/src/swap/mod.rs index 380bc389..2b61a0e3 100644 --- a/boltzr/src/swap/mod.rs +++ b/boltzr/src/swap/mod.rs @@ -1,3 +1,4 @@ +mod expiry_checker; mod filters; pub mod manager; mod status; diff --git a/boltzr/src/swap/status.rs b/boltzr/src/swap/status.rs index e9704b97..3672693d 100644 --- a/boltzr/src/swap/status.rs +++ b/boltzr/src/swap/status.rs @@ -13,8 +13,12 @@ pub enum SwapUpdate { TransactionMempool, #[strum(serialize = "transaction.confirmed")] TransactionConfirmed, + #[strum(serialize = "transaction.claim.pending")] + TransactionClaimPending, #[strum(serialize = "transaction.claimed")] TransactionClaimed, + #[strum(serialize = "transaction.lockupFailed")] + TransactionLockupFailed, #[strum(serialize = "transaction.server.mempool")] TransactionServerMempool, diff --git a/lib/proto/sidecar/boltzr_grpc_pb.d.ts b/lib/proto/sidecar/boltzr_grpc_pb.d.ts index 96084449..7cf2b1d0 100644 --- a/lib/proto/sidecar/boltzr_grpc_pb.d.ts +++ b/lib/proto/sidecar/boltzr_grpc_pb.d.ts @@ -13,6 +13,7 @@ interface IBoltzRService extends grpc.ServiceDefinition; responseDeserialize: grpc.deserialize; } +interface IBoltzRService_ISendSwapUpdate extends grpc.MethodDefinition { + path: "/boltzr.BoltzR/SendSwapUpdate"; + requestStream: false; + responseStream: true; + requestSerialize: grpc.serialize; + requestDeserialize: grpc.deserialize; + responseSerialize: grpc.serialize; + responseDeserialize: grpc.deserialize; +} interface IBoltzRService_IStartWebHookRetries extends grpc.MethodDefinition { path: "/boltzr.BoltzR/StartWebHookRetries"; requestStream: false; @@ -139,6 +149,7 @@ export interface IBoltzRServer extends grpc.UntypedServiceImplementation { sendMessage: grpc.handleUnaryCall; getMessages: grpc.handleServerStreamingCall; swapUpdate: grpc.handleBidiStreamingCall; + sendSwapUpdate: grpc.handleServerStreamingCall; startWebHookRetries: grpc.handleUnaryCall; createWebHook: grpc.handleUnaryCall; sendWebHook: grpc.handleUnaryCall; @@ -163,6 +174,8 @@ export interface IBoltzRClient { swapUpdate(): grpc.ClientDuplexStream; swapUpdate(options: Partial): grpc.ClientDuplexStream; swapUpdate(metadata: grpc.Metadata, options?: Partial): grpc.ClientDuplexStream; + sendSwapUpdate(request: boltzr_pb.SendSwapUpdateRequest, options?: Partial): grpc.ClientReadableStream; + sendSwapUpdate(request: boltzr_pb.SendSwapUpdateRequest, metadata?: grpc.Metadata, options?: Partial): grpc.ClientReadableStream; startWebHookRetries(request: boltzr_pb.StartWebHookRetriesRequest, callback: (error: grpc.ServiceError | null, response: boltzr_pb.StartWebHookRetriesResponse) => void): grpc.ClientUnaryCall; startWebHookRetries(request: boltzr_pb.StartWebHookRetriesRequest, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: boltzr_pb.StartWebHookRetriesResponse) => void): grpc.ClientUnaryCall; startWebHookRetries(request: boltzr_pb.StartWebHookRetriesRequest, metadata: grpc.Metadata, options: Partial, callback: (error: grpc.ServiceError | null, response: boltzr_pb.StartWebHookRetriesResponse) => void): grpc.ClientUnaryCall; @@ -201,6 +214,8 @@ export class BoltzRClient extends grpc.Client implements IBoltzRClient { public getMessages(request: boltzr_pb.GetMessagesRequest, metadata?: grpc.Metadata, options?: Partial): grpc.ClientReadableStream; public swapUpdate(options?: Partial): grpc.ClientDuplexStream; public swapUpdate(metadata?: grpc.Metadata, options?: Partial): grpc.ClientDuplexStream; + public sendSwapUpdate(request: boltzr_pb.SendSwapUpdateRequest, options?: Partial): grpc.ClientReadableStream; + public sendSwapUpdate(request: boltzr_pb.SendSwapUpdateRequest, metadata?: grpc.Metadata, options?: Partial): grpc.ClientReadableStream; public startWebHookRetries(request: boltzr_pb.StartWebHookRetriesRequest, callback: (error: grpc.ServiceError | null, response: boltzr_pb.StartWebHookRetriesResponse) => void): grpc.ClientUnaryCall; public startWebHookRetries(request: boltzr_pb.StartWebHookRetriesRequest, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: boltzr_pb.StartWebHookRetriesResponse) => void): grpc.ClientUnaryCall; public startWebHookRetries(request: boltzr_pb.StartWebHookRetriesRequest, metadata: grpc.Metadata, options: Partial, callback: (error: grpc.ServiceError | null, response: boltzr_pb.StartWebHookRetriesResponse) => void): grpc.ClientUnaryCall; diff --git a/lib/proto/sidecar/boltzr_grpc_pb.js b/lib/proto/sidecar/boltzr_grpc_pb.js index 82aa3003..ae7c9e37 100644 --- a/lib/proto/sidecar/boltzr_grpc_pb.js +++ b/lib/proto/sidecar/boltzr_grpc_pb.js @@ -158,6 +158,28 @@ function deserialize_boltzr_SendMessageResponse(buffer_arg) { return boltzr_pb.SendMessageResponse.deserializeBinary(new Uint8Array(buffer_arg)); } +function serialize_boltzr_SendSwapUpdateRequest(arg) { + if (!(arg instanceof boltzr_pb.SendSwapUpdateRequest)) { + throw new Error('Expected argument of type boltzr.SendSwapUpdateRequest'); + } + return Buffer.from(arg.serializeBinary()); +} + +function deserialize_boltzr_SendSwapUpdateRequest(buffer_arg) { + return boltzr_pb.SendSwapUpdateRequest.deserializeBinary(new Uint8Array(buffer_arg)); +} + +function serialize_boltzr_SendSwapUpdateResponse(arg) { + if (!(arg instanceof boltzr_pb.SendSwapUpdateResponse)) { + throw new Error('Expected argument of type boltzr.SendSwapUpdateResponse'); + } + return Buffer.from(arg.serializeBinary()); +} + +function deserialize_boltzr_SendSwapUpdateResponse(buffer_arg) { + return boltzr_pb.SendSwapUpdateResponse.deserializeBinary(new Uint8Array(buffer_arg)); +} + function serialize_boltzr_SendWebHookRequest(arg) { if (!(arg instanceof boltzr_pb.SendWebHookRequest)) { throw new Error('Expected argument of type boltzr.SendWebHookRequest'); @@ -325,6 +347,17 @@ var BoltzRService = exports.BoltzRService = { responseSerialize: serialize_boltzr_SwapUpdateResponse, responseDeserialize: deserialize_boltzr_SwapUpdateResponse, }, + sendSwapUpdate: { + path: '/boltzr.BoltzR/SendSwapUpdate', + requestStream: false, + responseStream: true, + requestType: boltzr_pb.SendSwapUpdateRequest, + responseType: boltzr_pb.SendSwapUpdateResponse, + requestSerialize: serialize_boltzr_SendSwapUpdateRequest, + requestDeserialize: deserialize_boltzr_SendSwapUpdateRequest, + responseSerialize: serialize_boltzr_SendSwapUpdateResponse, + responseDeserialize: deserialize_boltzr_SendSwapUpdateResponse, + }, startWebHookRetries: { path: '/boltzr.BoltzR/StartWebHookRetries', requestStream: false, diff --git a/lib/proto/sidecar/boltzr_pb.d.ts b/lib/proto/sidecar/boltzr_pb.d.ts index c3d74c81..5a95f3bc 100644 --- a/lib/proto/sidecar/boltzr_pb.d.ts +++ b/lib/proto/sidecar/boltzr_pb.d.ts @@ -343,6 +343,46 @@ export namespace SwapUpdateResponse { } } +export class SendSwapUpdateRequest extends jspb.Message { + + serializeBinary(): Uint8Array; + toObject(includeInstance?: boolean): SendSwapUpdateRequest.AsObject; + static toObject(includeInstance: boolean, msg: SendSwapUpdateRequest): SendSwapUpdateRequest.AsObject; + static extensions: {[key: number]: jspb.ExtensionFieldInfo}; + static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo}; + static serializeBinaryToWriter(message: SendSwapUpdateRequest, writer: jspb.BinaryWriter): void; + static deserializeBinary(bytes: Uint8Array): SendSwapUpdateRequest; + static deserializeBinaryFromReader(message: SendSwapUpdateRequest, reader: jspb.BinaryReader): SendSwapUpdateRequest; +} + +export namespace SendSwapUpdateRequest { + export type AsObject = { + } +} + +export class SendSwapUpdateResponse extends jspb.Message { + + hasUpdate(): boolean; + clearUpdate(): void; + getUpdate(): SwapUpdate | undefined; + setUpdate(value?: SwapUpdate): SendSwapUpdateResponse; + + serializeBinary(): Uint8Array; + toObject(includeInstance?: boolean): SendSwapUpdateResponse.AsObject; + static toObject(includeInstance: boolean, msg: SendSwapUpdateResponse): SendSwapUpdateResponse.AsObject; + static extensions: {[key: number]: jspb.ExtensionFieldInfo}; + static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo}; + static serializeBinaryToWriter(message: SendSwapUpdateResponse, writer: jspb.BinaryWriter): void; + static deserializeBinary(bytes: Uint8Array): SendSwapUpdateResponse; + static deserializeBinaryFromReader(message: SendSwapUpdateResponse, reader: jspb.BinaryReader): SendSwapUpdateResponse; +} + +export namespace SendSwapUpdateResponse { + export type AsObject = { + update?: SwapUpdate.AsObject, + } +} + export class StartWebHookRetriesRequest extends jspb.Message { serializeBinary(): Uint8Array; diff --git a/lib/proto/sidecar/boltzr_pb.js b/lib/proto/sidecar/boltzr_pb.js index 44a319d2..54c8ec0c 100644 --- a/lib/proto/sidecar/boltzr_pb.js +++ b/lib/proto/sidecar/boltzr_pb.js @@ -46,6 +46,8 @@ goog.exportSymbol('proto.boltzr.ScanMempoolResponse', null, global); goog.exportSymbol('proto.boltzr.ScanMempoolResponse.Transactions', null, global); goog.exportSymbol('proto.boltzr.SendMessageRequest', null, global); goog.exportSymbol('proto.boltzr.SendMessageResponse', null, global); +goog.exportSymbol('proto.boltzr.SendSwapUpdateRequest', null, global); +goog.exportSymbol('proto.boltzr.SendSwapUpdateResponse', null, global); goog.exportSymbol('proto.boltzr.SendWebHookRequest', null, global); goog.exportSymbol('proto.boltzr.SendWebHookResponse', null, global); goog.exportSymbol('proto.boltzr.SetLogLevelRequest', null, global); @@ -355,6 +357,48 @@ if (goog.DEBUG && !COMPILED) { */ proto.boltzr.SwapUpdateResponse.displayName = 'proto.boltzr.SwapUpdateResponse'; } +/** + * Generated by JsPbCodeGenerator. + * @param {Array=} opt_data Optional initial data array, typically from a + * server response, or constructed directly in Javascript. The array is used + * in place and becomes part of the constructed object. It is not cloned. + * If no data is provided, the constructed object will be empty, but still + * valid. + * @extends {jspb.Message} + * @constructor + */ +proto.boltzr.SendSwapUpdateRequest = function(opt_data) { + jspb.Message.initialize(this, opt_data, 0, -1, null, null); +}; +goog.inherits(proto.boltzr.SendSwapUpdateRequest, jspb.Message); +if (goog.DEBUG && !COMPILED) { + /** + * @public + * @override + */ + proto.boltzr.SendSwapUpdateRequest.displayName = 'proto.boltzr.SendSwapUpdateRequest'; +} +/** + * Generated by JsPbCodeGenerator. + * @param {Array=} opt_data Optional initial data array, typically from a + * server response, or constructed directly in Javascript. The array is used + * in place and becomes part of the constructed object. It is not cloned. + * If no data is provided, the constructed object will be empty, but still + * valid. + * @extends {jspb.Message} + * @constructor + */ +proto.boltzr.SendSwapUpdateResponse = function(opt_data) { + jspb.Message.initialize(this, opt_data, 0, -1, null, null); +}; +goog.inherits(proto.boltzr.SendSwapUpdateResponse, jspb.Message); +if (goog.DEBUG && !COMPILED) { + /** + * @public + * @override + */ + proto.boltzr.SendSwapUpdateResponse.displayName = 'proto.boltzr.SendSwapUpdateResponse'; +} /** * Generated by JsPbCodeGenerator. * @param {Array=} opt_data Optional initial data array, typically from a @@ -3090,6 +3134,258 @@ proto.boltzr.SwapUpdateResponse.prototype.clearIdsList = function() { +if (jspb.Message.GENERATE_TO_OBJECT) { +/** + * Creates an object representation of this proto. + * Field names that are reserved in JavaScript and will be renamed to pb_name. + * Optional fields that are not set will be set to undefined. + * To access a reserved field use, foo.pb_, eg, foo.pb_default. + * For the list of reserved names please see: + * net/proto2/compiler/js/internal/generator.cc#kKeyword. + * @param {boolean=} opt_includeInstance Deprecated. whether to include the + * JSPB instance for transitional soy proto support: + * http://goto/soy-param-migration + * @return {!Object} + */ +proto.boltzr.SendSwapUpdateRequest.prototype.toObject = function(opt_includeInstance) { + return proto.boltzr.SendSwapUpdateRequest.toObject(opt_includeInstance, this); +}; + + +/** + * Static version of the {@see toObject} method. + * @param {boolean|undefined} includeInstance Deprecated. Whether to include + * the JSPB instance for transitional soy proto support: + * http://goto/soy-param-migration + * @param {!proto.boltzr.SendSwapUpdateRequest} msg The msg instance to transform. + * @return {!Object} + * @suppress {unusedLocalVariables} f is only used for nested messages + */ +proto.boltzr.SendSwapUpdateRequest.toObject = function(includeInstance, msg) { + var f, obj = { + + }; + + if (includeInstance) { + obj.$jspbMessageInstance = msg; + } + return obj; +}; +} + + +/** + * Deserializes binary data (in protobuf wire format). + * @param {jspb.ByteSource} bytes The bytes to deserialize. + * @return {!proto.boltzr.SendSwapUpdateRequest} + */ +proto.boltzr.SendSwapUpdateRequest.deserializeBinary = function(bytes) { + var reader = new jspb.BinaryReader(bytes); + var msg = new proto.boltzr.SendSwapUpdateRequest; + return proto.boltzr.SendSwapUpdateRequest.deserializeBinaryFromReader(msg, reader); +}; + + +/** + * Deserializes binary data (in protobuf wire format) from the + * given reader into the given message object. + * @param {!proto.boltzr.SendSwapUpdateRequest} msg The message object to deserialize into. + * @param {!jspb.BinaryReader} reader The BinaryReader to use. + * @return {!proto.boltzr.SendSwapUpdateRequest} + */ +proto.boltzr.SendSwapUpdateRequest.deserializeBinaryFromReader = function(msg, reader) { + while (reader.nextField()) { + if (reader.isEndGroup()) { + break; + } + var field = reader.getFieldNumber(); + switch (field) { + default: + reader.skipField(); + break; + } + } + return msg; +}; + + +/** + * Serializes the message to binary data (in protobuf wire format). + * @return {!Uint8Array} + */ +proto.boltzr.SendSwapUpdateRequest.prototype.serializeBinary = function() { + var writer = new jspb.BinaryWriter(); + proto.boltzr.SendSwapUpdateRequest.serializeBinaryToWriter(this, writer); + return writer.getResultBuffer(); +}; + + +/** + * Serializes the given message to binary data (in protobuf wire + * format), writing to the given BinaryWriter. + * @param {!proto.boltzr.SendSwapUpdateRequest} message + * @param {!jspb.BinaryWriter} writer + * @suppress {unusedLocalVariables} f is only used for nested messages + */ +proto.boltzr.SendSwapUpdateRequest.serializeBinaryToWriter = function(message, writer) { + var f = undefined; +}; + + + + + +if (jspb.Message.GENERATE_TO_OBJECT) { +/** + * Creates an object representation of this proto. + * Field names that are reserved in JavaScript and will be renamed to pb_name. + * Optional fields that are not set will be set to undefined. + * To access a reserved field use, foo.pb_, eg, foo.pb_default. + * For the list of reserved names please see: + * net/proto2/compiler/js/internal/generator.cc#kKeyword. + * @param {boolean=} opt_includeInstance Deprecated. whether to include the + * JSPB instance for transitional soy proto support: + * http://goto/soy-param-migration + * @return {!Object} + */ +proto.boltzr.SendSwapUpdateResponse.prototype.toObject = function(opt_includeInstance) { + return proto.boltzr.SendSwapUpdateResponse.toObject(opt_includeInstance, this); +}; + + +/** + * Static version of the {@see toObject} method. + * @param {boolean|undefined} includeInstance Deprecated. Whether to include + * the JSPB instance for transitional soy proto support: + * http://goto/soy-param-migration + * @param {!proto.boltzr.SendSwapUpdateResponse} msg The msg instance to transform. + * @return {!Object} + * @suppress {unusedLocalVariables} f is only used for nested messages + */ +proto.boltzr.SendSwapUpdateResponse.toObject = function(includeInstance, msg) { + var f, obj = { + update: (f = msg.getUpdate()) && proto.boltzr.SwapUpdate.toObject(includeInstance, f) + }; + + if (includeInstance) { + obj.$jspbMessageInstance = msg; + } + return obj; +}; +} + + +/** + * Deserializes binary data (in protobuf wire format). + * @param {jspb.ByteSource} bytes The bytes to deserialize. + * @return {!proto.boltzr.SendSwapUpdateResponse} + */ +proto.boltzr.SendSwapUpdateResponse.deserializeBinary = function(bytes) { + var reader = new jspb.BinaryReader(bytes); + var msg = new proto.boltzr.SendSwapUpdateResponse; + return proto.boltzr.SendSwapUpdateResponse.deserializeBinaryFromReader(msg, reader); +}; + + +/** + * Deserializes binary data (in protobuf wire format) from the + * given reader into the given message object. + * @param {!proto.boltzr.SendSwapUpdateResponse} msg The message object to deserialize into. + * @param {!jspb.BinaryReader} reader The BinaryReader to use. + * @return {!proto.boltzr.SendSwapUpdateResponse} + */ +proto.boltzr.SendSwapUpdateResponse.deserializeBinaryFromReader = function(msg, reader) { + while (reader.nextField()) { + if (reader.isEndGroup()) { + break; + } + var field = reader.getFieldNumber(); + switch (field) { + case 1: + var value = new proto.boltzr.SwapUpdate; + reader.readMessage(value,proto.boltzr.SwapUpdate.deserializeBinaryFromReader); + msg.setUpdate(value); + break; + default: + reader.skipField(); + break; + } + } + return msg; +}; + + +/** + * Serializes the message to binary data (in protobuf wire format). + * @return {!Uint8Array} + */ +proto.boltzr.SendSwapUpdateResponse.prototype.serializeBinary = function() { + var writer = new jspb.BinaryWriter(); + proto.boltzr.SendSwapUpdateResponse.serializeBinaryToWriter(this, writer); + return writer.getResultBuffer(); +}; + + +/** + * Serializes the given message to binary data (in protobuf wire + * format), writing to the given BinaryWriter. + * @param {!proto.boltzr.SendSwapUpdateResponse} message + * @param {!jspb.BinaryWriter} writer + * @suppress {unusedLocalVariables} f is only used for nested messages + */ +proto.boltzr.SendSwapUpdateResponse.serializeBinaryToWriter = function(message, writer) { + var f = undefined; + f = message.getUpdate(); + if (f != null) { + writer.writeMessage( + 1, + f, + proto.boltzr.SwapUpdate.serializeBinaryToWriter + ); + } +}; + + +/** + * optional SwapUpdate update = 1; + * @return {?proto.boltzr.SwapUpdate} + */ +proto.boltzr.SendSwapUpdateResponse.prototype.getUpdate = function() { + return /** @type{?proto.boltzr.SwapUpdate} */ ( + jspb.Message.getWrapperField(this, proto.boltzr.SwapUpdate, 1)); +}; + + +/** + * @param {?proto.boltzr.SwapUpdate|undefined} value + * @return {!proto.boltzr.SendSwapUpdateResponse} returns this +*/ +proto.boltzr.SendSwapUpdateResponse.prototype.setUpdate = function(value) { + return jspb.Message.setWrapperField(this, 1, value); +}; + + +/** + * Clears the message field making it undefined. + * @return {!proto.boltzr.SendSwapUpdateResponse} returns this + */ +proto.boltzr.SendSwapUpdateResponse.prototype.clearUpdate = function() { + return this.setUpdate(undefined); +}; + + +/** + * Returns whether this field is set. + * @return {boolean} + */ +proto.boltzr.SendSwapUpdateResponse.prototype.hasUpdate = function() { + return jspb.Message.getField(this, 1) != null; +}; + + + + + if (jspb.Message.GENERATE_TO_OBJECT) { /** * Creates an object representation of this proto. diff --git a/lib/service/EventHandler.ts b/lib/service/EventHandler.ts index dc1668c1..c5c56e55 100644 --- a/lib/service/EventHandler.ts +++ b/lib/service/EventHandler.ts @@ -55,7 +55,7 @@ class EventHandler extends TypedEventEmitter<{ }> { constructor( private logger: Logger, - private nursery: SwapNursery, + public nursery: SwapNursery, ) { super(); diff --git a/lib/sidecar/Sidecar.ts b/lib/sidecar/Sidecar.ts index c66c5b33..57eb2c66 100644 --- a/lib/sidecar/Sidecar.ts +++ b/lib/sidecar/Sidecar.ts @@ -1,20 +1,32 @@ -import { ClientDuplexStream, Metadata } from '@grpc/grpc-js'; +import { + ClientDuplexStream, + ClientReadableStream, + Metadata, +} from '@grpc/grpc-js'; import { Status } from '@grpc/grpc-js/build/src/constants'; import child_process from 'node:child_process'; import path from 'path'; -import BaseClient from '../BaseClient'; import type { BaseClientEvents } from '../BaseClient'; +import BaseClient from '../BaseClient'; import { ConfigType } from '../Config'; import Logger, { LogLevel } from '../Logger'; import { sleep } from '../PromiseUtils'; -import { formatError, getVersion } from '../Utils'; +import { + formatError, + getChainCurrency, + getVersion, + splitPairId, + stringify, +} from '../Utils'; import SwapInfos from '../api/SwapInfos'; import { ClientStatus, SwapUpdateEvent } from '../consts/Enums'; +import SwapRepository from '../db/repositories/SwapRepository'; import { satToMsat } from '../lightning/ChannelUtils'; import { grpcOptions, unaryCall } from '../lightning/GrpcUtils'; import { createSsl } from '../lightning/cln/Types'; import { BoltzRClient } from '../proto/sidecar/boltzr_grpc_pb'; import * as sidecarrpc from '../proto/sidecar/boltzr_pb'; +import { SendSwapUpdateRequest } from '../proto/sidecar/boltzr_pb'; import EventHandler, { SwapUpdate } from '../service/EventHandler'; import DecodedInvoice from './DecodedInvoice'; @@ -58,10 +70,12 @@ class Sidecar extends BaseClient< private swapInfos!: SwapInfos; private eventHandler!: EventHandler; - private subscribeSwapUpatesCall?: ClientDuplexStream< + + private subscribeSwapUpdatesCall?: ClientDuplexStream< sidecarrpc.SwapUpdateRequest, sidecarrpc.SwapUpdateResponse >; + private subscribeSendSwapUpdatesCall?: ClientReadableStream; constructor( logger: Logger, @@ -141,7 +155,8 @@ class Sidecar extends BaseClient< }; public disconnect = (): void => { - this.subscribeSwapUpatesCall?.cancel(); + this.subscribeSwapUpdatesCall?.cancel(); + this.subscribeSendSwapUpdatesCall?.cancel(); this.clearReconnectTimer(); @@ -367,13 +382,13 @@ class Sidecar extends BaseClient< return req; }; - if (this.subscribeSwapUpatesCall !== undefined) { - this.subscribeSwapUpatesCall.cancel(); + if (this.subscribeSwapUpdatesCall !== undefined) { + this.subscribeSwapUpdatesCall.cancel(); } - this.subscribeSwapUpatesCall = this.client!.swapUpdate(this.clientMeta); + this.subscribeSwapUpdatesCall = this.client!.swapUpdate(this.clientMeta); - this.subscribeSwapUpatesCall.on( + this.subscribeSwapUpdatesCall.on( 'data', async (data: sidecarrpc.SwapUpdateResponse) => { const status = ( @@ -388,37 +403,129 @@ class Sidecar extends BaseClient< return; } - this.subscribeSwapUpatesCall!.write(serializeSwapUpdate(status)); + this.subscribeSwapUpdatesCall!.write(serializeSwapUpdate(status)); }, ); - this.subscribeSwapUpatesCall.on('error', (err) => { + this.subscribeSwapUpdatesCall.on('error', (err) => { this.logger.warn( `Swap updates streaming call threw error: ${formatError(err)}`, ); - this.subscribeSwapUpatesCall = undefined; + this.subscribeSwapUpdatesCall = undefined; }); - this.subscribeSwapUpatesCall.on('end', () => { + this.subscribeSwapUpdatesCall.on('end', () => { this.eventHandler.removeAllListeners(); - if (this.subscribeSwapUpatesCall !== undefined) { - this.subscribeSwapUpatesCall.cancel(); - this.subscribeSwapUpatesCall = undefined; + if (this.subscribeSwapUpdatesCall !== undefined) { + this.subscribeSwapUpdatesCall.cancel(); + this.subscribeSwapUpdatesCall = undefined; } }); this.eventHandler.on('swap.update', async ({ id, status }) => { - if (this.subscribeSwapUpatesCall === undefined) { + if (this.subscribeSwapUpdatesCall === undefined) { return; } - this.subscribeSwapUpatesCall.write(serializeSwapUpdate([{ id, status }])); + this.subscribeSwapUpdatesCall.write( + serializeSwapUpdate([{ id, status }]), + ); await this.sendWebHook(id, status.status); }); }; + private subscribeSendSwapUpdates = () => { + if (this.subscribeSendSwapUpdatesCall !== undefined) { + this.subscribeSendSwapUpdatesCall.cancel(); + } + + this.subscribeSendSwapUpdatesCall = this.client!.sendSwapUpdate( + new SendSwapUpdateRequest(), + this.clientMeta, + ); + + this.subscribeSendSwapUpdatesCall.on( + 'data', + async (data: sidecarrpc.SendSwapUpdateResponse) => { + const update = data.getUpdate(); + if (update === undefined) { + return; + } + + try { + await this.handleSentSwapUpdate(update); + } catch (e) { + this.logger.error( + `Handling sent swap update (${stringify(data.toObject())}) failed: ${formatError(e)}`, + ); + } + }, + ); + + this.subscribeSendSwapUpdatesCall.on('error', (err) => { + this.logger.warn( + `Send swap updates streaming call threw error: ${formatError(err)}`, + ); + this.subscribeSwapUpdatesCall = undefined; + }); + + this.subscribeSendSwapUpdatesCall.on('end', () => { + if (this.subscribeSwapUpdatesCall !== undefined) { + this.subscribeSwapUpdatesCall.cancel(); + this.subscribeSwapUpdatesCall = undefined; + } + }); + }; + + private handleSentSwapUpdate = async (update: sidecarrpc.SwapUpdate) => { + switch (update.getStatus()) { + case SwapUpdateEvent.InvoiceFailedToPay: { + const swap = await SwapRepository.getSwap({ + id: update.getId(), + }); + if (swap === null) { + this.logger.warn( + `Could not find swap for update with id: ${update.getId()}`, + ); + return; + } + + const { base, quote } = splitPairId(swap.pair); + const chainCurrency = getChainCurrency( + base, + quote, + swap.orderSide, + false, + ); + + const currency = + this.eventHandler.nursery.currencies.get(chainCurrency); + + if (currency !== undefined && currency.chainClient !== undefined) { + const wallet = + this.eventHandler.nursery.walletManager.wallets.get(chainCurrency)!; + currency.chainClient.removeOutputFilter( + wallet.decodeAddress(swap.lockupAddress), + ); + } + + this.eventHandler.nursery.emit( + SwapUpdateEvent.InvoiceFailedToPay, + swap, + ); + return; + } + default: { + this.logger.warn( + `Got swap update that could not be handled: ${stringify(update.toObject())}`, + ); + return; + } + } + }; + public rescanMempool = async (symbols?: string[]) => { const req = new sidecarrpc.ScanMempoolRequest(); if (symbols !== undefined) { @@ -484,6 +591,7 @@ class Sidecar extends BaseClient< if (withSubscriptions) { this.subscribeSwapUpdates(); + this.subscribeSendSwapUpdates(); } this.setClientStatus(ClientStatus.Connected); diff --git a/lib/swap/SwapNursery.ts b/lib/swap/SwapNursery.ts index 27cb6546..3d11fa24 100644 --- a/lib/swap/SwapNursery.ts +++ b/lib/swap/SwapNursery.ts @@ -108,7 +108,7 @@ class SwapNursery extends TypedEventEmitter { private readonly pendingPaymentTracker: PendingPaymentTracker; // Maps - private currencies = new Map(); + public currencies = new Map(); // Locks public readonly lock = new AsyncLock({ @@ -128,7 +128,7 @@ class SwapNursery extends TypedEventEmitter { private nodeSwitch: NodeSwitch, private rateProvider: RateProvider, timeoutDeltaProvider: TimeoutDeltaProvider, - private walletManager: WalletManager, + public walletManager: WalletManager, private swapOutputType: SwapOutputType, private retryInterval: number, blocks: Blocks, diff --git a/test/integration/sidecar/Sidecar.spec.ts b/test/integration/sidecar/Sidecar.spec.ts index 516a8942..9d64afa4 100644 --- a/test/integration/sidecar/Sidecar.spec.ts +++ b/test/integration/sidecar/Sidecar.spec.ts @@ -1,19 +1,20 @@ +import { OrderSide, SwapUpdateEvent } from '../../../lib/consts/Enums'; +import SwapRepository from '../../../lib/db/repositories/SwapRepository'; import * as noderpc from '../../../lib/proto/cln/node_pb'; +import * as sidecarrpc from '../../../lib/proto/sidecar/boltzr_pb'; import { InvoiceType } from '../../../lib/sidecar/DecodedInvoice'; import Sidecar from '../../../lib/sidecar/Sidecar'; import { clnClient } from '../Nodes'; import { sidecar, startSidecar } from './Utils'; describe('Sidecar', () => { + const eventHandler = { on: jest.fn(), removeAllListeners: jest.fn() } as any; + beforeAll(async () => { startSidecar(); await Promise.all([ - sidecar.connect( - { on: jest.fn(), removeAllListeners: jest.fn() } as any, - {} as any, - false, - ), - clnClient.connect(false), + sidecar.connect(eventHandler, {} as any, false), + clnClient.connect(), ]); }); @@ -42,4 +43,70 @@ describe('Sidecar', () => { const decoded = await sidecar.decodeInvoiceOrOffer(invoice); expect(decoded.type).toEqual(InvoiceType.Bolt12Invoice); }); + + describe('handleSentSwapUpdate', () => { + test(`should handle status ${SwapUpdateEvent.InvoiceFailedToPay}`, async () => { + const id = 'failed'; + + const update = new sidecarrpc.SwapUpdate(); + update.setId(id); + update.setStatus(SwapUpdateEvent.InvoiceFailedToPay); + + const swap = { + id, + pair: 'L-BTC/BTC', + lockupAddress: 'bc1', + orderSide: OrderSide.BUY, + }; + SwapRepository.getSwap = jest.fn().mockResolvedValue(swap); + + const chainClient = { removeOutputFilter: jest.fn() }; + const wallet = { + decodeAddress: jest.fn().mockReturnValue('decoded'), + }; + + eventHandler.nursery = { + emit: jest.fn(), + currencies: new Map([ + [ + 'BTC', + { + chainClient, + }, + ], + ]), + walletManager: { + wallets: new Map([['BTC', wallet]]), + }, + }; + + await sidecar['handleSentSwapUpdate'](update); + + expect(wallet.decodeAddress).toHaveBeenCalledTimes(1); + expect(wallet.decodeAddress).toHaveBeenCalledWith(swap.lockupAddress); + + expect(chainClient.removeOutputFilter).toHaveBeenCalledTimes(1); + expect(chainClient.removeOutputFilter).toHaveBeenCalledWith('decoded'); + + expect(eventHandler.nursery.emit).toHaveBeenCalledTimes(1); + expect(eventHandler.nursery.emit).toHaveBeenCalledWith( + SwapUpdateEvent.InvoiceFailedToPay, + swap, + ); + }); + + test.each` + status + ${SwapUpdateEvent.InvoicePaid} + ${SwapUpdateEvent.SwapCreated} + ${SwapUpdateEvent.InvoiceExpired} + ${SwapUpdateEvent.TransactionClaimPending} + `('should ignore status $status', async ({ status }) => { + const update = new sidecarrpc.SwapUpdate(); + update.setId('test'); + update.setStatus(status); + + await sidecar['handleSentSwapUpdate'](update); + }); + }); });