Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Musig2RoundFinalizeError caused by concurrency problem #493

Merged
merged 37 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c4a1452
add testcase to reproduce issue
chenyukang Jan 16, 2025
a8e99a8
fix test
chenyukang Jan 16, 2025
df3f3be
add more log
chenyukang Jan 16, 2025
88d3d3b
add testcase to reproduce issue 480
chenyukang Jan 17, 2025
566aff0
simply assertion
chenyukang Jan 17, 2025
5353710
add testcase middle hop set higher fee rate to make payment fail
chenyukang Jan 17, 2025
8c50f16
merge
chenyukang Jan 17, 2025
d434e02
keep fail tcl in tlc state
chenyukang Jan 18, 2025
3ffd019
add tlc status RemoveApplyConfirmed?
chenyukang Jan 18, 2025
e3750e2
clean up failed tlcs
chenyukang Jan 19, 2025
6f19693
cleanup
chenyukang Jan 19, 2025
2afd7b5
continue after fee error
chenyukang Jan 20, 2025
f102d0d
debug now
chenyukang Jan 20, 2025
19fc720
debug mut and avoid duplicate remove relay
chenyukang Jan 21, 2025
d6b47c1
debug remove tail tlc
chenyukang Jan 21, 2025
ff081db
add clean_up_failed_tcls
chenyukang Jan 21, 2025
bc3ad2e
debug invalid onion packet issue
chenyukang Jan 22, 2025
3a0f8a4
fix invalid onion packet issue
chenyukang Jan 22, 2025
d255901
code refactor for testing
chenyukang Jan 22, 2025
9cbe6f9
clean up
chenyukang Jan 22, 2025
b08b65e
code refactor for cleanup tlcs
chenyukang Jan 22, 2025
d1d8e0d
merge develop and resolve conflicts
chenyukang Jan 22, 2025
e21cfda
cleanup logs
chenyukang Jan 22, 2025
51feda5
add testcase for issue 475
chenyukang Jan 22, 2025
682eb38
add more tests for 475
chenyukang Jan 22, 2025
6f436fb
fix #475, musig error caused by failed tlcs
chenyukang Jan 22, 2025
1649af1
cleanup
chenyukang Jan 22, 2025
edff4f0
code refactor and cleanup
chenyukang Jan 23, 2025
07d054d
fix possible fail test with optimization enabled
chenyukang Jan 23, 2025
d8d420d
add migration for tlc state change
chenyukang Jan 23, 2025
1dc65e0
add more test for send each other
chenyukang Jan 24, 2025
38efe61
add filter failed tlc for tlc balance
chenyukang Jan 24, 2025
c5afa2f
fix failed test case
chenyukang Jan 24, 2025
a78226b
fix get_oldest_failed_tlcs
chenyukang Jan 24, 2025
d665a40
code refactoring for check insert tlc
chenyukang Jan 24, 2025
77577ab
refactoring test for payments
chenyukang Jan 26, 2025
6eb9d26
fix index error for fee not enough
chenyukang Jan 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions migrate/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 110 additions & 18 deletions src/fiber/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ use tokio::sync::oneshot;
use super::{graph::ChannelUpdateInfo, types::ForwardTlcResult};
use std::{
collections::HashSet,
fmt::{self, Debug},
fmt::{self, Debug, Display},
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
u128,
u128, u64,
};

use super::types::{ChannelUpdateChannelFlags, ChannelUpdateMessageFlags, UpdateTlcInfo};
Expand Down Expand Up @@ -144,6 +144,22 @@ pub enum ChannelCommand {
ReloadState(ReloadParams),
}

impl Display for ChannelCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ChannelCommand::TxCollaborationCommand(_) => write!(f, "TxCollaborationCommand"),
ChannelCommand::CommitmentSigned() => write!(f, "CommitmentSigned"),
ChannelCommand::AddTlc(_, _) => write!(f, "AddTlc"),
ChannelCommand::RemoveTlc(_, _) => write!(f, "RemoveTlc"),
ChannelCommand::Shutdown(_, _) => write!(f, "Shutdown"),
ChannelCommand::Update(_, _) => write!(f, "Update"),
ChannelCommand::ForwardTlcResult(_) => write!(f, "ForwardTlcResult"),
#[cfg(test)]
ChannelCommand::ReloadState(_) => write!(f, "ReloadState"),
}
}
}

#[cfg(test)]
#[derive(Debug)]
pub struct ReloadParams {
Expand Down Expand Up @@ -716,9 +732,9 @@ where
) {
let previous_balance = state.get_local_balance();
let pending_tlcs = if inbound {
state.tlc_state.received_tlcs.tlcs.iter_mut()
state.tlc_state.received_tlcs.tlcs.iter()
} else {
state.tlc_state.offered_tlcs.tlcs.iter_mut()
state.tlc_state.offered_tlcs.tlcs.iter()
};
let settled_tlcs: Vec<_> = pending_tlcs
.filter(|tlc| {
Expand All @@ -737,6 +753,7 @@ where
.await
.expect("expect remove tlc success");
}

if state.get_local_balance() != previous_balance {
state.update_graph_for_local_channel_change(&self.network);
state.update_graph_for_remote_channel_change(&self.network);
Expand Down Expand Up @@ -775,6 +792,7 @@ where
// There's no shared secret stored in the received TLC, use the one found in the peeled onion packet.
&error.shared_secret,
);

self.register_retryable_tlc_remove(
myself,
state,
Expand Down Expand Up @@ -830,6 +848,7 @@ where
assert!(previous_channel_id != state.get_id());

let remove_reason = remove_reason.clone().backward(&tlc_info.shared_secret);

self.register_retryable_relay_tlc_remove(
myself,
state,
Expand Down Expand Up @@ -1098,6 +1117,10 @@ where
tlc_id: TLCId,
) -> Result<(), ProcessingChannelError> {
let channel_id = state.get_id();
if state.tlc_state.applied_remove_tlcs.contains(&tlc_id) {
return Ok(());
}
state.tlc_state.applied_remove_tlcs.insert(tlc_id);
chenyukang marked this conversation as resolved.
Show resolved Hide resolved
let (tlc_info, remove_reason) = state.remove_tlc_with_reason(tlc_id)?;
if matches!(remove_reason, RemoveTlcReason::RemoveTlcFulfill(_))
&& self.store.get_invoice(&tlc_info.payment_hash).is_some()
Expand Down Expand Up @@ -1188,6 +1211,7 @@ where
)));
}
};
state.clean_up_failed_tlcs();
let (funding_tx_partial_signature, commitment_tx_partial_signature) =
state.build_and_sign_commitment_tx()?;
let commitment_signed = CommitmentSigned {
Expand Down Expand Up @@ -2294,15 +2318,21 @@ where
.await
{
error!(
"Error while processing channel message: {:?} with message: {:?}",
error, message
"{:?} Error while processing channel message: {:?} with message: {:?}",
state.get_local_peer_id(),
error,
message
);
debug_event!(&self.network, &format!("{:?}", error));
}
}
ChannelActorMessage::Command(command) => {
if let Err(err) = self.handle_command(&myself, state, command).await {
error!("Error while processing channel command: {:?}", err);
error!(
"{:?} Error while processing channel command: {:?}",
state.get_local_peer_id(),
err
);
}
}
ChannelActorMessage::Event(e) => {
Expand Down Expand Up @@ -2516,6 +2546,7 @@ pub struct TlcInfo {
/// ^^^^ ^^^^
///
pub previous_tlc: Option<(Hash256, TLCId)>,
pub removed_confirmed_at: Option<u64>,
chenyukang marked this conversation as resolved.
Show resolved Hide resolved
}

// When we are forwarding a TLC, we need to know the previous TLC information.
Expand Down Expand Up @@ -2545,6 +2576,8 @@ impl Debug for TlcInfo {
.field("status", &self.status)
.field("amount", &self.amount)
.field("removed_reason", &self.removed_reason)
.field("payment_hash", &self.payment_hash)
.field("removed_confirmed_at", &self.removed_confirmed_at)
.finish()
}
}
Expand Down Expand Up @@ -2581,6 +2614,20 @@ impl TlcInfo {
self.status.as_inbound_status()
}

pub fn is_remove_comfirmed(&self) -> bool {
match self.status {
TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed) => true,
TlcStatus::Outbound(OutboundTlcStatus::RemoveWaitAck) => true,
TlcStatus::Inbound(InboundTlcStatus::RemoveAckConfirmed) => true,
_ => false,
}
}

pub fn is_fail_remove_confirmed(&self) -> bool {
matches!(self.removed_reason, Some(RemoveTlcReason::RemoveTlcFail(_)))
&& self.is_remove_comfirmed()
}

fn get_hash(&self) -> ShortHash {
self.payment_hash.as_ref()[..20]
.try_into()
Expand Down Expand Up @@ -2691,6 +2738,7 @@ pub struct TlcState {
pub received_tlcs: PendingTlcs,
pub retryable_tlc_operations: Vec<RetryableTlcOperation>,
pub applied_add_tlcs: HashSet<TLCId>,
pub applied_remove_tlcs: HashSet<TLCId>,
chenyukang marked this conversation as resolved.
Show resolved Hide resolved
pub waiting_ack: bool,
}

Expand Down Expand Up @@ -2787,6 +2835,7 @@ impl TlcState {

pub fn apply_remove_tlc(&mut self, tlc_id: TLCId) {
self.applied_add_tlcs.remove(&tlc_id);
self.applied_remove_tlcs.remove(&tlc_id);
if tlc_id.is_offered() {
self.offered_tlcs.tlcs.retain(|tlc| tlc.tlc_id != tlc_id);
} else {
Expand Down Expand Up @@ -2900,7 +2949,7 @@ impl TlcState {
self.need_another_commitment_signed()
}

pub fn update_for_revoke_and_ack(&mut self) -> bool {
pub fn update_for_revoke_and_ack(&mut self, commitment_numbers: CommitmentNumbers) -> bool {
self.set_waiting_ack(false);
for tlc in self.offered_tlcs.tlcs.iter_mut() {
match tlc.outbound_status() {
Expand All @@ -2912,6 +2961,7 @@ impl TlcState {
}
OutboundTlcStatus::RemoveWaitAck => {
tlc.status = TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed);
tlc.removed_confirmed_at = Some(commitment_numbers.get_local());
}
_ => {}
}
Expand All @@ -2927,6 +2977,7 @@ impl TlcState {
}
InboundTlcStatus::LocalRemoved => {
tlc.status = TlcStatus::Inbound(InboundTlcStatus::RemoveAckConfirmed);
tlc.removed_confirmed_at = Some(commitment_numbers.get_remote());
}
_ => {}
}
Expand Down Expand Up @@ -4376,6 +4427,7 @@ impl ChannelActorState {
self.get_remote_commitment_number().to_be_bytes().as_slice(),
]
.concat();

let message = blake2b_256(
[
to_local_output.as_slice(),
Expand All @@ -4386,7 +4438,6 @@ impl ChannelActorState {
]
.concat(),
);

sign_ctx.sign(message.as_slice())?
};

Expand Down Expand Up @@ -4555,15 +4606,22 @@ impl ChannelActorState {
)));
}
let payment_hash = tlc.payment_hash;
if let Some(tlc) = self
let tlc_infos: Vec<_> = self
.tlc_state
.all_tlcs()
.find(|tlc| tlc.payment_hash == payment_hash)
{
return Err(ProcessingChannelError::RepeatedProcessing(format!(
"Trying to insert tlc with duplicate payment hash {:?} with tlc {:?}",
payment_hash, tlc
)));
.filter(|tlc| tlc.payment_hash == payment_hash)
.map(|info| info.clone())
.collect();
if !tlc_infos.is_empty() {
if tlc_infos.iter().all(|t| t.is_fail_remove_confirmed()) {
// If all the tlcs with the same payment hash are confirmed to be failed,
// then it's safe to insert the new tlc, the old tlcs will be removed later.
} else {
return Err(ProcessingChannelError::RepeatedProcessing(format!(
"Trying to insert tlc with duplicate payment hash {:?} with tlcs {:?}",
payment_hash, tlc_infos
)));
}
}
if tlc.is_offered() {
let sent_tlc_value = self.get_offered_tlc_balance();
Expand Down Expand Up @@ -4628,8 +4686,8 @@ impl ChannelActorState {

debug!("Updated local balance to {} and remote balance to {} by removing tlc {:?} with reason {:?}",
to_local_amount, to_remote_amount, tlc_id, reason);
self.tlc_state.apply_remove_tlc(tlc_id);
}
self.tlc_state.apply_remove_tlc(tlc_id);
debug!(
"Removed tlc payment_hash {:?} with reason {:?}",
current.payment_hash, reason
Expand All @@ -4638,6 +4696,31 @@ impl ChannelActorState {
Ok((current.clone(), reason))
}

pub fn clean_up_failed_tlcs(&mut self) {
let mut failed_tlcs: Vec<_> = self
.tlc_state
.received_tlcs
.tlcs
.iter()
.chain(self.tlc_state.offered_tlcs.tlcs.iter())
.filter(|tlc| {
matches!(tlc.removed_reason, Some(RemoveTlcReason::RemoveTlcFail(_)))
&& matches!(
tlc.status,
TlcStatus::Inbound(InboundTlcStatus::RemoveAckConfirmed)
| TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed)
| TlcStatus::Outbound(OutboundTlcStatus::RemoveWaitAck)
)
})
.map(|tlc| (tlc.tlc_id, tlc.removed_confirmed_at.unwrap_or(u64::MAX)))
.collect();

if failed_tlcs.len() >= 3 {
chenyukang marked this conversation as resolved.
Show resolved Hide resolved
failed_tlcs.sort_by(|a, b| a.1.cmp(&b.1));
self.tlc_state.apply_remove_tlc(failed_tlcs[0].0);
}
}

pub fn get_local_channel_public_keys(&self) -> &ChannelBasePublicKeys {
&self.local_channel_public_keys
}
Expand Down Expand Up @@ -5136,6 +5219,7 @@ impl ChannelActorState {
TLCId::Received(prev_tlc.prev_tlc_id),
)
}),
removed_confirmed_at: None,
}
}

Expand All @@ -5155,6 +5239,7 @@ impl ChannelActorState {
created_at: self.get_current_commitment_numbers(),
removed_reason: None,
previous_tlc: None,
removed_confirmed_at: None,
};
Ok(tlc_info)
}
Expand Down Expand Up @@ -5557,6 +5642,7 @@ impl ChannelActorState {
}
};

self.clean_up_failed_tlcs();
let (commitment_tx, settlement_data) = self.verify_and_complete_tx(
commitment_signed.funding_tx_partial_signature,
commitment_signed.commitment_tx_partial_signature,
Expand Down Expand Up @@ -5880,6 +5966,7 @@ impl ChannelActorState {
]
.concat(),
);

let aggregated_signature =
sign_ctx.sign_and_aggregate(message.as_slice(), revocation_partial_signature)?;
RevocationData {
Expand Down Expand Up @@ -5912,6 +5999,7 @@ impl ChannelActorState {
]
.concat(),
);

let aggregated_signature =
sign_ctx.sign_and_aggregate(message.as_slice(), commitment_tx_partial_signature)?;

Expand All @@ -5928,7 +6016,9 @@ impl ChannelActorState {
self.increment_local_commitment_number();
self.append_remote_commitment_point(next_per_commitment_point);

let need_commitment_signed = self.tlc_state.update_for_revoke_and_ack();
let need_commitment_signed = self
.tlc_state
.update_for_revoke_and_ack(self.get_current_commitment_numbers());
network
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::RevokeAndAckReceived(
Expand Down Expand Up @@ -6736,6 +6826,7 @@ impl ChannelActorState {
}
}
}

let to_local_value = self.to_local_amount + received_fullfilled - offered_pending;
let to_remote_value = self.to_remote_amount + offered_fullfilled - received_pending;

Expand Down Expand Up @@ -6788,6 +6879,7 @@ impl ChannelActorState {
.pack(),
)
.build();

let to_remote_output_data = Bytes::default();
if for_remote {
(
Expand Down
15 changes: 13 additions & 2 deletions src/fiber/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ impl InternalResult {

let Some(index) = error_index else {
error!("Error index not found in the route: {:?}", tlc_err);
return need_to_retry;
// if the error node is not in the route,
// and we can not penalize the source node (which is ourself)
// it's better to stop the payment session
return false;
};

let len = nodes.len();
Expand Down Expand Up @@ -254,7 +257,15 @@ impl InternalResult {
TlcErrorCode::PermanentChannelFailure => {
self.fail_pair(nodes, index + 1);
}
TlcErrorCode::FeeInsufficient | TlcErrorCode::IncorrectTlcExpiry => {
TlcErrorCode::FeeInsufficient => {
need_to_retry = true;
eprintln!("need_to_retry: {:?}, index: {:?}", need_to_retry, index);
chenyukang marked this conversation as resolved.
Show resolved Hide resolved
self.fail_pair_balanced(nodes, index);
if index > 1 {
self.succeed_range_pairs(nodes, 0, index - 1);
}
}
TlcErrorCode::IncorrectTlcExpiry => {
need_to_retry = false;
if index == 1 {
self.fail_node(nodes, 1);
Expand Down
Loading
Loading