Skip to content

Commit

Permalink
monitoring funding tx mining
Browse files Browse the repository at this point in the history
  • Loading branch information
crisdut committed Mar 24, 2023
1 parent b6b6128 commit b61bcd2
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 21 deletions.
13 changes: 12 additions & 1 deletion src/bus/ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub enum CtlMsg {
/// Reports changes in the mining status for previously requested transaction tracked by an
/// on-chain service
#[display("tx_found({0})")]
TxFound(TxStatus),
TxFound(TxConfirmation),

// Routing & payments
/// Request to channel daemon to perform payment using provided route
Expand Down Expand Up @@ -260,6 +260,17 @@ pub struct TxStatus {
pub block_pos: Option<BlockPos>,
}

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Display)]
#[derive(NetworkEncode, NetworkDecode)]
#[display("{txid}, ...")]
pub struct TxConfirmation {
/// Id of a transaction previously requested to be tracked
pub txid: Txid,

/// number of block confirmations
pub confirmations: u32,
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Display, NetworkEncode, NetworkDecode)]
#[display("{client}, {status}")]
pub struct Report {
Expand Down
8 changes: 8 additions & 0 deletions src/channeld/automata/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ fn finish_signed(event: Event<BusMsg>, runtime: &mut Runtime) -> Result<ChannelA
event.endpoints,
LnMsg::FundingSigned(FundingSigned { channel_id, signature: funding.signature }),
)?;

debug!("Waiting for funding transaction {} to be mined", funding.funding_txid);
let core = runtime.state.channel.constructor();
runtime.send_ctl(event.endpoints, ServiceId::Watch, CtlMsg::Track {
txid: funding.funding_txid,
depth: core.common_params().minimum_depth,
})?;

Ok(ChannelAccept::Funded)
}
wrong_msg => {
Expand Down
6 changes: 5 additions & 1 deletion src/channeld/automata/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,11 @@ fn complete_funding(

let txid = runtime.state.channel.funding().txid();
debug!("Waiting for funding transaction {} to be mined", txid);
runtime.send_ctl(event.endpoints, ServiceId::Watch, CtlMsg::Track { txid, depth: 0 })?;
let core = runtime.state.channel.constructor();
runtime.send_ctl(event.endpoints, ServiceId::Watch, CtlMsg::Track {
txid,
depth: core.common_params().minimum_depth,
})?;

Ok(ChannelPropose::Published)
}
Expand Down
16 changes: 7 additions & 9 deletions src/watchd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn run(config: Config) -> Result<(), Error> {
rx.bind("inproc://electrum-bridge")?;

let (sender, receiver) = mpsc::channel::<ElectrumUpdate>();
let electrum_worker = ElectrumWorker::with(sender, &config.electrum_url, 5)?;
let electrum_worker = ElectrumWorker::with(sender, &config.electrum_url, 15)?;

debug!("Starting electrum watcher thread");
let watcher_runtime = WatcherRuntime::with(receiver, tx)?;
Expand Down Expand Up @@ -88,16 +88,14 @@ impl WatcherRuntime {
// TODO: Forward all electrum notifications over the bridge
// self.send_over_bridge(msg.into()).expect("watcher bridge is halted");
match msg {
ElectrumUpdate::TxBatch(transactions, _) => {
ElectrumUpdate::TxConfirmations(transactions, _) => {
for transaction in transactions {
self.send_over_bridge(BusMsg::Ctl(CtlMsg::TxFound(crate::bus::TxStatus {
txid: transaction.txid(),
block_pos: None,
})))
.expect("unable forward electrum notifications over the bridge");
self.send_over_bridge(BusMsg::Ctl(CtlMsg::TxFound(transaction)))
.expect("unable forward electrum notifications over the bridge");
}
}
ElectrumUpdate::Connecting
ElectrumUpdate::TxBatch(..)
| ElectrumUpdate::Connecting
| ElectrumUpdate::Connected
| ElectrumUpdate::Complete
| ElectrumUpdate::FeeEstimate(..)
Expand Down Expand Up @@ -170,7 +168,7 @@ impl Runtime {
match request {
CtlMsg::TxFound(tx_status) => {
if let Some((required_height, service_id)) = self.track_list.get(&tx_status.txid) {
if *required_height >= tx_status.block_pos.map(|b| b.pos).unwrap_or_default() {
if *required_height <= tx_status.confirmations {
let service_id = service_id.clone();
self.untrack(tx_status.txid);
match self.electrum_worker.untrack_transaction(tx_status.txid) {
Expand Down
56 changes: 46 additions & 10 deletions src/watchd/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@

// TODO: Consider making it part of descriptor wallet onchain library

use std::collections::BTreeMap;
use std::sync::mpsc;
use std::thread::{self, JoinHandle};
use std::time::Duration;

use bitcoin::{Transaction, Txid};
use bitcoin::{Script, Transaction, Txid};
use electrum_client::{Client as ElectrumClient, ElectrumApi, HeaderNotification};

use crate::bus::TxConfirmation;

#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display, Error, From)]
#[display("failed electrum watcher channel")]
#[from(mpsc::SendError<ElectrumCmd>)]
Expand Down Expand Up @@ -48,6 +51,9 @@ pub enum ElectrumUpdate {
#[display("tx_batch(...)")]
TxBatch(Vec<Transaction>, f32),

#[display("tx_confirmations(...)")]
TxConfirmations(Vec<TxConfirmation>, u32),

#[display("channel_disconnected")]
ChannelDisconnected,

Expand Down Expand Up @@ -82,7 +88,6 @@ impl ElectrumWorker {
.spawn(move || loop {
thread::sleep(Duration::from_secs(interval));
sender.send(ElectrumCmd::GetTrasactions).expect("Electrum thread is dead");
sender.send(ElectrumCmd::PopHeader).expect("Electrum thread is dead")
})
.expect("unable to start blockchain watcher pacemaker thread");

Expand Down Expand Up @@ -190,20 +195,53 @@ impl ElectrumProcessor {
&mut self,
txids: &Vec<Txid>,
) -> Result<Option<ElectrumUpdate>, electrum_client::Error> {
if self.tracks.is_empty() {
if self.tracks.is_empty() || txids.is_empty() {
return Ok(None);
}
let transactions = self.client.batch_transaction_get(txids)?;
let scripts: Vec<Script> =
transactions.into_iter().map(|tx| tx.output[0].script_pubkey.clone()).collect();

let hist = self.client.batch_script_get_history(&scripts)?;

let mut items = vec![];
hist.into_iter().for_each(|mut item| items.append(&mut item));

if items.is_empty() {
return Ok(None);
}
self.client.batch_transaction_get(txids).map(|res| Some(ElectrumUpdate::TxBatch(res, 0.0)))

let transactions: BTreeMap<Txid, i32> =
items.into_iter().map(|h| (h.tx_hash, h.height)).collect();

let min_height = transactions.clone().into_iter().map(|(_, h)| h).min();
let min_height = min_height.unwrap_or_default();

let block_headers = self.client.block_headers(min_height as usize, 50)?;
let block_total = block_headers.headers.len() as i32;

let confirmations: BTreeMap<Txid, i32> = transactions
.into_iter()
.filter(|(_, height)| min_height + block_total > height.to_owned())
.collect();

let confirmations: Vec<TxConfirmation> = confirmations
.into_iter()
.map(|(tx_id, height)| TxConfirmation {
txid: tx_id,
confirmations: (min_height + block_total - height) as u32,
})
.collect();

Ok(Some(ElectrumUpdate::TxConfirmations(confirmations.clone(), confirmations.len() as u32)))
}

fn track_transaction(
&mut self,
txid: Txid,
) -> Result<Option<ElectrumUpdate>, electrum_client::Error> {
self.tracks.push(txid);
self.client
.transaction_get(&txid.clone())
.map(|res| Some(ElectrumUpdate::TxBatch([res].to_vec(), 0.0)))
self.get_transactions(&self.tracks.clone())
}

fn untrack_transaction(
Expand All @@ -212,8 +250,6 @@ impl ElectrumProcessor {
) -> Result<Option<ElectrumUpdate>, electrum_client::Error> {
let index = self.tracks.iter().position(|x| *x == txid).unwrap();
self.tracks.remove(index);
self.client
.transaction_get(&txid.clone())
.map(|res| Some(ElectrumUpdate::TxBatch([res].to_vec(), 0.0)))
self.get_transactions(&self.tracks.clone())
}
}

0 comments on commit b61bcd2

Please sign in to comment.