Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HTLC endorsement/confidence #2884

Merged
merged 4 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ sealed trait HasOptionalReplyToCommand extends Command { def replyTo_opt: Option
sealed trait ForbiddenCommandDuringSplice extends Command
sealed trait ForbiddenCommandDuringQuiescence extends Command

final case class CMD_ADD_HTLC(replyTo: ActorRef, amount: MilliSatoshi, paymentHash: ByteVector32, cltvExpiry: CltvExpiry, onion: OnionRoutingPacket, nextBlindingKey_opt: Option[PublicKey], origin: Origin.Hot, commit: Boolean = false) extends HasReplyToCommand with ForbiddenCommandDuringSplice with ForbiddenCommandDuringQuiescence
final case class CMD_ADD_HTLC(replyTo: ActorRef, amount: MilliSatoshi, paymentHash: ByteVector32, cltvExpiry: CltvExpiry, onion: OnionRoutingPacket, nextBlindingKey_opt: Option[PublicKey], confidence: Double, origin: Origin.Hot, commit: Boolean = false) extends HasReplyToCommand with ForbiddenCommandDuringSplice with ForbiddenCommandDuringQuiescence
t-bast marked this conversation as resolved.
Show resolved Hide resolved
sealed trait HtlcSettlementCommand extends HasOptionalReplyToCommand with ForbiddenCommandDuringSplice with ForbiddenCommandDuringQuiescence { def id: Long }
final case class CMD_FULFILL_HTLC(id: Long, r: ByteVector32, commit: Boolean = false, replyTo_opt: Option[ActorRef] = None) extends HtlcSettlementCommand
final case class CMD_FAIL_HTLC(id: Long, reason: Either[ByteVector, FailureMessage], delay_opt: Option[FiniteDuration] = None, commit: Boolean = false, replyTo_opt: Option[ActorRef] = None) extends HtlcSettlementCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ case class Commitments(params: ChannelParams,
return Left(HtlcValueTooSmall(params.channelId, minimum = htlcMinimum, actual = cmd.amount))
}

val add = UpdateAddHtlc(channelId, changes.localNextHtlcId, cmd.amount, cmd.paymentHash, cmd.cltvExpiry, cmd.onion, cmd.nextBlindingKey_opt)
val add = UpdateAddHtlc(channelId, changes.localNextHtlcId, cmd.amount, cmd.paymentHash, cmd.cltvExpiry, cmd.onion, cmd.nextBlindingKey_opt, cmd.confidence)
// we increment the local htlc index and add an entry to the origins map
val changes1 = changes.addLocalProposal(add).copy(localNextHtlcId = changes.localNextHtlcId + 1)
val originChannels1 = originChannels + (add.id -> cmd.origin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ object Monitoring {
PaymentNodeOutAmount.withoutTags().record(bucket, amount.truncateToSatoshi.toLong)
PaymentNodeOut.withoutTags().record(bucket)
}

private val RelayConfidence = Kamon.histogram("payment.relay.confidence", "Confidence (in percent) that the relayed HTLC will be fulfilled")
def relayFulfill(confidence: Double) = RelayConfidence.withTag("status", "fulfill").record((confidence * 100).toLong)
def relayFail(confidence: Double) = RelayConfidence.withTag("status", "fail").record((confidence * 100).toLong)
}

object Tags {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,12 @@ object OutgoingPaymentPacket {
}

/** Build the command to add an HTLC for the given recipient using the provided route. */
def buildOutgoingPayment(origin: Origin.Hot, paymentHash: ByteVector32, route: Route, recipient: Recipient): Either[OutgoingPaymentError, OutgoingPaymentPacket] = {
def buildOutgoingPayment(origin: Origin.Hot, paymentHash: ByteVector32, route: Route, recipient: Recipient, confidence: Double): Either[OutgoingPaymentError, OutgoingPaymentPacket] = {
for {
payment <- recipient.buildPayloads(paymentHash, route)
onion <- buildOnion(payment.payloads, paymentHash, Some(PaymentOnionCodecs.paymentOnionPayloadLength)) // BOLT 2 requires that associatedData == paymentHash
} yield {
val cmd = CMD_ADD_HTLC(origin.replyTo, payment.amount, paymentHash, payment.expiry, onion.packet, payment.outerBlinding_opt, origin, commit = true)
val cmd = CMD_ADD_HTLC(origin.replyTo, payment.amount, paymentHash, payment.expiry, onion.packet, payment.outerBlinding_opt, confidence, origin, commit = true)
OutgoingPaymentPacket(cmd, route.hops.head.shortChannelId, onion.sharedSecrets)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package fr.acinq.eclair.payment.relay

import akka.actor.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.{ActorRef, typed}
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.channel._
Expand Down Expand Up @@ -54,7 +54,12 @@ object ChannelRelay {
case class RelaySuccess(selectedChannelId: ByteVector32, cmdAdd: CMD_ADD_HTLC) extends RelayResult
// @formatter:on

def apply(nodeParams: NodeParams, register: ActorRef, channels: Map[ByteVector32, Relayer.OutgoingChannel], originNode: PublicKey, relayId: UUID, r: IncomingPaymentPacket.ChannelRelayPacket): Behavior[Command] =
def apply(nodeParams: NodeParams,
register: ActorRef,
channels: Map[ByteVector32, Relayer.OutgoingChannel],
originNode:PublicKey,
relayId: UUID,
r: IncomingPaymentPacket.ChannelRelayPacket): Behavior[Command] =
Behaviors.setup { context =>
Behaviors.withMdc(Logs.mdc(
category_opt = Some(Logs.LogCategory.PAYMENT),
Expand All @@ -63,7 +68,8 @@ object ChannelRelay {
nodeAlias_opt = Some(nodeParams.alias))) {
val upstream = Upstream.Hot.Channel(r.add.removeUnknownTlvs(), TimestampMilli.now(), originNode)
context.self ! DoRelay
new ChannelRelay(nodeParams, register, channels, r, upstream, context).relay(Seq.empty)
val confidence = (r.add.endorsement + 0.5) / 8
t-bast marked this conversation as resolved.
Show resolved Hide resolved
new ChannelRelay(nodeParams, register, channels, r, upstream, confidence, context).relay(Seq.empty)
}
}

Expand Down Expand Up @@ -107,6 +113,7 @@ class ChannelRelay private(nodeParams: NodeParams,
channels: Map[ByteVector32, Relayer.OutgoingChannel],
r: IncomingPaymentPacket.ChannelRelayPacket,
upstream: Upstream.Hot.Channel,
confidence: Double,
context: ActorContext[ChannelRelay.Command]) {

import ChannelRelay._
Expand All @@ -127,7 +134,7 @@ class ChannelRelay private(nodeParams: NodeParams,
case RelayFailure(cmdFail) =>
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
context.log.info("rejecting htlc reason={}", cmdFail.reason)
safeSendAndStop(r.add.channelId, cmdFail)
safeSendAndStop(r.add.channelId, cmdFail, None)
case RelaySuccess(selectedChannelId, cmdAdd) =>
context.log.info("forwarding htlc #{} from channelId={} to channelId={}", r.add.id, r.add.channelId, selectedChannelId)
register ! Register.Forward(forwardFailureAdapter, selectedChannelId, cmdAdd)
Expand All @@ -142,36 +149,39 @@ class ChannelRelay private(nodeParams: NodeParams,
context.log.warn(s"couldn't resolve downstream channel $channelId, failing htlc #${upstream.add.id}")
val cmdFail = CMD_FAIL_HTLC(upstream.add.id, Right(UnknownNextPeer()), commit = true)
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
safeSendAndStop(upstream.add.channelId, cmdFail)
safeSendAndStop(upstream.add.channelId, cmdFail, Some(channelId))

case WrappedAddResponse(addFailed: RES_ADD_FAILED[_]) =>
context.log.info("attempt failed with reason={}", addFailed.t.getClass.getSimpleName)
context.self ! DoRelay
relay(previousFailures :+ PreviouslyTried(selectedChannelId, addFailed))

case WrappedAddResponse(_: RES_SUCCESS[_]) =>
case WrappedAddResponse(r: RES_SUCCESS[_]) =>
context.log.debug("sent htlc to the downstream channel")
waitForAddSettled()
waitForAddSettled(confidence, r.channelId)
}

def waitForAddSettled(): Behavior[Command] =
def waitForAddSettled(confidence: Double, channelId: ByteVector32): Behavior[Command] =
t-bast marked this conversation as resolved.
Show resolved Hide resolved
Behaviors.receiveMessagePartial {
case WrappedAddResponse(RES_ADD_SETTLED(_, htlc, fulfill: HtlcResult.Fulfill)) =>
context.log.debug("relaying fulfill to upstream")
Metrics.relayFulfill(confidence)
val cmd = CMD_FULFILL_HTLC(upstream.add.id, fulfill.paymentPreimage, commit = true)
context.system.eventStream ! EventStream.Publish(ChannelPaymentRelayed(upstream.amountIn, htlc.amountMsat, htlc.paymentHash, upstream.add.channelId, htlc.channelId, upstream.receivedAt, TimestampMilli.now()))
recordRelayDuration(isSuccess = true)
safeSendAndStop(upstream.add.channelId, cmd)
safeSendAndStop(upstream.add.channelId, cmd, Some(channelId))

case WrappedAddResponse(RES_ADD_SETTLED(_, _, fail: HtlcResult.Fail)) =>
context.log.debug("relaying fail to upstream")
Metrics.relayFail(confidence)
Metrics.recordPaymentRelayFailed(Tags.FailureType.Remote, Tags.RelayType.Channel)
val cmd = translateRelayFailure(upstream.add.id, fail)
recordRelayDuration(isSuccess = false)
safeSendAndStop(upstream.add.channelId, cmd)
safeSendAndStop(upstream.add.channelId, cmd, Some(channelId))
}

def safeSendAndStop(channelId: ByteVector32, cmd: channel.HtlcSettlementCommand): Behavior[Command] = {
def safeSendAndStop(channelId: ByteVector32, cmd: channel.HtlcSettlementCommand, outgoingChannel_opt: Option[ByteVector32]): Behavior[Command] = {
context.log.info("cmd={}, startedAt={}, endedAt={}, confidence={}, originNode={}, outgoingChannel={}", cmd.getClass.getSimpleName, upstream.receivedAt, TimestampMilli.now(), confidence, upstream.receivedFrom, outgoingChannel_opt)
t-bast marked this conversation as resolved.
Show resolved Hide resolved
val toSend = cmd match {
case _: CMD_FULFILL_HTLC => cmd
case _: CMD_FAIL_HTLC | _: CMD_FAIL_MALFORMED_HTLC => r.payload match {
Expand Down Expand Up @@ -204,7 +214,7 @@ class ChannelRelay private(nodeParams: NodeParams,
*/
def handleRelay(previousFailures: Seq[PreviouslyTried]): RelayResult = {
val alreadyTried = previousFailures.map(_.channelId)
selectPreferredChannel(alreadyTried) match {
selectPreferredChannel(alreadyTried, confidence) match {
case None if previousFailures.nonEmpty =>
// no more channels to try
val error = previousFailures
Expand All @@ -215,7 +225,7 @@ class ChannelRelay private(nodeParams: NodeParams,
.failure
RelayFailure(CMD_FAIL_HTLC(r.add.id, Right(translateLocalError(error.t, error.channelUpdate)), commit = true))
case outgoingChannel_opt =>
relayOrFail(outgoingChannel_opt)
relayOrFail(outgoingChannel_opt, confidence)
}
}

Expand All @@ -234,7 +244,7 @@ class ChannelRelay private(nodeParams: NodeParams,
*
* If no suitable channel is found we default to the originally requested channel.
*/
def selectPreferredChannel(alreadyTried: Seq[ByteVector32]): Option[OutgoingChannel] = {
def selectPreferredChannel(alreadyTried: Seq[ByteVector32], confidence: Double): Option[OutgoingChannel] = {
val requestedShortChannelId = r.payload.outgoingChannelId
context.log.debug("selecting next channel with requestedShortChannelId={}", requestedShortChannelId)
// we filter out channels that we have already tried
Expand All @@ -243,7 +253,7 @@ class ChannelRelay private(nodeParams: NodeParams,
candidateChannels
.values
.map { channel =>
val relayResult = relayOrFail(Some(channel))
val relayResult = relayOrFail(Some(channel), confidence)
context.log.debug(s"candidate channel: channelId=${channel.channelId} availableForSend={} capacity={} channelUpdate={} result={}",
channel.commitments.availableBalanceForSend,
channel.commitments.latest.capacity,
Expand Down Expand Up @@ -291,7 +301,7 @@ class ChannelRelay private(nodeParams: NodeParams,
* channel, because some parameters don't match with our settings for that channel. In that case we directly fail the
* htlc.
*/
def relayOrFail(outgoingChannel_opt: Option[OutgoingChannelParams]): RelayResult = {
def relayOrFail(outgoingChannel_opt: Option[OutgoingChannelParams], confidence: Double): RelayResult = {
outgoingChannel_opt match {
case None =>
RelayFailure(CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true))
Expand All @@ -312,7 +322,7 @@ class ChannelRelay private(nodeParams: NodeParams,
case payload: IntermediatePayload.ChannelRelay.Blinded => Some(payload.nextBlinding)
case _: IntermediatePayload.ChannelRelay.Standard => None
}
RelaySuccess(c.channelId, CMD_ADD_HTLC(addResponseAdapter.toClassic, r.amountToForward, r.add.paymentHash, r.outgoingCltv, r.nextPacket, nextBlindingKey_opt, origin, commit = true))
RelaySuccess(c.channelId, CMD_ADD_HTLC(addResponseAdapter.toClassic, r.amountToForward, r.add.paymentHash, r.outgoingCltv, r.nextPacket, nextBlindingKey_opt, confidence, origin, commit = true))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ class NodeRelay private(nodeParams: NodeParams,

private def doSend(upstream: Upstream.Hot.Trampoline, nextPayload: IntermediatePayload.NodeRelay, nextPacket_opt: Option[OnionRoutingPacket]): Behavior[Command] = {
context.log.debug(s"relaying trampoline payment (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv})")
relay(upstream, nextPayload, nextPacket_opt)
val confidence = (upstream.received.map(_.add.endorsement).min + 0.5) / 8
relay(upstream, nextPayload, nextPacket_opt, confidence)
}

/**
Expand Down Expand Up @@ -316,12 +317,12 @@ class NodeRelay private(nodeParams: NodeParams,
context.messageAdapter[PaymentFailed](WrappedPaymentFailed)
}.toClassic

private def relay(upstream: Upstream.Hot.Trampoline, payloadOut: IntermediatePayload.NodeRelay, packetOut_opt: Option[OnionRoutingPacket]): Behavior[Command] = {
private def relay(upstream: Upstream.Hot.Trampoline, payloadOut: IntermediatePayload.NodeRelay, packetOut_opt: Option[OnionRoutingPacket], confidence: Double): Behavior[Command] = {
val displayNodeId = payloadOut match {
case payloadOut: IntermediatePayload.NodeRelay.Standard => payloadOut.outgoingNodeId
case _: IntermediatePayload.NodeRelay.ToBlindedPaths => randomKey().publicKey
}
val paymentCfg = SendPaymentConfig(relayId, relayId, None, paymentHash, displayNodeId, upstream, None, None, storeInDb = false, publishEvent = false, recordPathFindingMetrics = true)
val paymentCfg = SendPaymentConfig(relayId, relayId, None, paymentHash, displayNodeId, upstream, None, None, storeInDb = false, publishEvent = false, recordPathFindingMetrics = true, confidence)
val routeParams = computeRouteParams(nodeParams, upstream.amountIn, upstream.expiryIn, payloadOut.amountToForward, payloadOut.outgoingCltv)
payloadOut match {
case payloadOut: IntermediatePayload.NodeRelay.Standard =>
Expand Down
Loading
Loading