Skip to content

Commit

Permalink
Reputation is recorded from channel events
Browse files Browse the repository at this point in the history
  • Loading branch information
thomash-acinq committed Sep 25, 2024
1 parent da1b1a6 commit 9973d1e
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 111 deletions.
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class Setup(val datadir: File,
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
peerReadyManager = system.spawn(Behaviors.supervise(PeerReadyManager()).onFailure(typed.SupervisorStrategy.restart), name = "peer-ready-manager")
reputationRecorder_opt = if (nodeParams.relayParams.peerReputationConfig.enabled) {
Some(system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig, Map.empty)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder"))
Some(system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder"))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningS
import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.transactions.CommitmentSpec
import fr.acinq.eclair.transactions.Transactions._
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureMessage, FundingCreated, FundingSigned, Init, LiquidityAds, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc}
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureMessage, FundingCreated, FundingSigned, HtlcFailureMessage, Init, LiquidityAds, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc}
import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiry, CltvExpiryDelta, Features, InitFeature, MilliSatoshi, MilliSatoshiLong, RealShortChannelId, TimestampMilli, UInt64}
import scodec.bits.ByteVector

Expand Down Expand Up @@ -244,6 +244,10 @@ final case class CMD_GET_CHANNEL_STATE(replyTo: ActorRef) extends HasReplyToComm
final case class CMD_GET_CHANNEL_DATA(replyTo: ActorRef) extends HasReplyToCommand
final case class CMD_GET_CHANNEL_INFO(replyTo: akka.actor.typed.ActorRef[RES_GET_CHANNEL_INFO]) extends Command

case class OutgoingHtlcAdded(add: UpdateAddHtlc, upstream: Upstream.Hot, fee: MilliSatoshi)
case class OutgoingHtlcFailed(fail: HtlcFailureMessage)
case class OutgoingHtlcFulfilled(fulfill: UpdateFulfillHtlc)

/*
88888888b. 8888888888 .d8888b. 88888888b. ,ad8888ba, 888b 88 .d8888b. 8888888888 .d8888b.
88 "8b 88 d88P Y88b 88 "8b d8"' `"8b 8888b 88 d88P Y88b 88 d88P Y88b
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
case Right((commitments1, add)) =>
if (c.commit) self ! CMD_SIGN()
context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortIds, commitments1))
context.system.eventStream.publish(OutgoingHtlcAdded(add, c.origin.upstream, nodeFee(d.channelUpdate.relayFees, add.amountMsat)))
handleCommandSuccess(c, d.copy(commitments = commitments1)) sending add
case Left(cause) => handleAddHtlcCommandError(c, cause, Some(d.channelUpdate))
}
Expand All @@ -466,6 +467,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
case Right((commitments1, origin, htlc)) =>
// we forward preimages as soon as possible to the upstream channel because it allows us to pull funds
relayer ! RES_ADD_SETTLED(origin, htlc, HtlcResult.RemoteFulfill(fulfill))
context.system.eventStream.publish(OutgoingHtlcFulfilled(fulfill))
stay() using d.copy(commitments = commitments1)
case Left(cause) => handleLocalError(cause, d, Some(fulfill))
}
Expand Down Expand Up @@ -499,12 +501,14 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
}

case Event(fail: UpdateFailHtlc, d: DATA_NORMAL) =>
context.system.eventStream.publish(OutgoingHtlcFailed(fail))
d.commitments.receiveFail(fail) match {
case Right((commitments1, _, _)) => stay() using d.copy(commitments = commitments1)
case Left(cause) => handleLocalError(cause, d, Some(fail))
}

case Event(fail: UpdateFailMalformedHtlc, d: DATA_NORMAL) =>
context.system.eventStream.publish(OutgoingHtlcFailed(fail))
d.commitments.receiveFailMalformed(fail) match {
case Right((commitments1, _, _)) => stay() using d.copy(commitments = commitments1)
case Left(cause) => handleLocalError(cause, d, Some(fail))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.payment.relay.Relayer.{OutgoingChannel, OutgoingChannelParams}
import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket}
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.reputation.ReputationRecorder.{CancelRelay, GetConfidence, RecordResult}
import fr.acinq.eclair.reputation.ReputationRecorder.GetConfidence
import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
import fr.acinq.eclair.wire.protocol._
Expand Down Expand Up @@ -65,7 +65,7 @@ object ChannelRelay {

def apply(nodeParams: NodeParams,
register: ActorRef,
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]],
reputationRecorder_opt: Option[typed.ActorRef[GetConfidence]],
channels: Map[ByteVector32, Relayer.OutgoingChannel],
originNode: PublicKey,
relayId: UUID,
Expand All @@ -79,14 +79,14 @@ object ChannelRelay {
val upstream = Upstream.Hot.Channel(r.add.removeUnknownTlvs(), TimestampMilli.now(), originNode)
reputationRecorder_opt match {
case Some(reputationRecorder) =>
reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), originNode, r.add.endorsement, relayId, r.relayFeeMsat)
reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), upstream, r.relayFeeMsat)
case None =>
val confidence = (r.add.endorsement + 0.5) / 8
context.self ! WrappedConfidence(confidence)
}
Behaviors.receiveMessagePartial {
case WrappedConfidence(confidence) =>
new ChannelRelay(nodeParams, register, reputationRecorder_opt, channels, r, upstream, confidence, context, relayId).start()
new ChannelRelay(nodeParams, register, channels, r, upstream, confidence, context).start()
}
}
}
Expand Down Expand Up @@ -128,13 +128,11 @@ object ChannelRelay {
*/
class ChannelRelay private(nodeParams: NodeParams,
register: ActorRef,
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]],
channels: Map[ByteVector32, Relayer.OutgoingChannel],
r: IncomingPaymentPacket.ChannelRelayPacket,
upstream: Upstream.Hot.Channel,
confidence: Double,
context: ActorContext[ChannelRelay.Command],
relayId: UUID) {
context: ActorContext[ChannelRelay.Command]) {

import ChannelRelay._

Expand Down Expand Up @@ -200,7 +198,6 @@ class ChannelRelay private(nodeParams: NodeParams,
case RelayFailure(cmdFail) =>
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
context.log.info("rejecting htlc reason={}", cmdFail.reason)
reputationRecorder_opt.foreach(_ ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId))
safeSendAndStop(r.add.channelId, cmdFail)
case RelayNeedsFunding(nextNodeId, cmdFail) =>
val cmd = Peer.ProposeOnTheFlyFunding(onTheFlyFundingResponseAdapter, r.amountToForward, r.add.paymentHash, r.outgoingCltv, r.nextPacket, nextBlindingKey_opt, upstream)
Expand All @@ -220,7 +217,6 @@ class ChannelRelay private(nodeParams: NodeParams,
context.log.warn(s"couldn't resolve downstream channel $channelId, failing htlc #${upstream.add.id}")
val cmdFail = CMD_FAIL_HTLC(upstream.add.id, Right(UnknownNextPeer()), commit = true)
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
reputationRecorder_opt.foreach(_ ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId))
safeSendAndStop(upstream.add.channelId, cmdFail)

case WrappedAddResponse(addFailed: RES_ADD_FAILED[_]) =>
Expand Down Expand Up @@ -431,11 +427,9 @@ class ChannelRelay private(nodeParams: NodeParams,
featureOk && liquidityIssue && relayParamsOk
}

private def recordRelayDuration(isSuccess: Boolean): Unit = {
reputationRecorder_opt.foreach(_ ! RecordResult(upstream.receivedFrom, r.add.endorsement, relayId, isSuccess))
private def recordRelayDuration(isSuccess: Boolean): Unit =
Metrics.RelayedPaymentDuration
.withTag(Tags.Relay, Tags.RelayType.Channel)
.withTag(Tags.Success, isSuccess)
.record((TimestampMilli.now() - upstream.receivedAt).toMillis, TimeUnit.MILLISECONDS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object ChannelRelayer {

def apply(nodeParams: NodeParams,
register: ActorRef,
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]],
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.GetConfidence]],
channels: Map[ByteVector32, Relayer.OutgoingChannel] = Map.empty,
scid2channels: Map[ShortChannelId, ByteVector32] = Map.empty,
node2channels: mutable.MultiDict[PublicKey, ByteVector32] = mutable.MultiDict.empty): Behavior[Command] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPaymentToNode
import fr.acinq.eclair.payment.send._
import fr.acinq.eclair.router.Router.{ChannelHop, HopRelayParams, Route, RouteParams}
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.reputation.ReputationRecorder.GetTrampolineConfidence
import fr.acinq.eclair.router.Router.RouteParams
import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound}
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
Expand Down Expand Up @@ -91,7 +92,7 @@ object NodeRelay {
def apply(nodeParams: NodeParams,
parent: typed.ActorRef[NodeRelayer.Command],
register: ActorRef,
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]],
reputationRecorder_opt: Option[typed.ActorRef[GetTrampolineConfidence]],
relayId: UUID,
nodeRelayPacket: NodeRelayPacket,
outgoingPaymentFactory: OutgoingPaymentFactory,
Expand Down Expand Up @@ -215,7 +216,7 @@ object NodeRelay {
class NodeRelay private(nodeParams: NodeParams,
parent: akka.actor.typed.ActorRef[NodeRelayer.Command],
register: ActorRef,
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]],
reputationRecorder_opt: Option[typed.ActorRef[GetTrampolineConfidence]],
relayId: UUID,
paymentHash: ByteVector32,
paymentSecret: ByteVector32,
Expand Down Expand Up @@ -336,11 +337,8 @@ class NodeRelay private(nodeParams: NodeParams,
// We only make one try when it's a direct payment to a wallet.
val maxPaymentAttempts = if (walletNodeId_opt.isDefined) 1 else nodeParams.maxPaymentAttempts
val totalFee = upstream.amountIn - payloadOut.amountToForward
val fees = upstream.received.foldLeft(Map.empty[ReputationRecorder.PeerEndorsement, MilliSatoshi])((fees, r) =>
fees.updatedWith(ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement))(fee =>
Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong)))
reputationRecorder_opt match {
case Some(reputationRecorder) => reputationRecorder ! ReputationRecorder.GetTrampolineConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), fees, relayId)
case Some(reputationRecorder) => reputationRecorder ! GetTrampolineConfidence(context.messageAdapter(confidence => WrappedConfidence(confidence.value)), upstream, totalFee)
case None => context.self ! WrappedConfidence((upstream.received.map(_.add.endorsement).min + 0.5) / 8)
}
Behaviors.receiveMessagePartial {
Expand Down Expand Up @@ -396,16 +394,10 @@ class NodeRelay private(nodeParams: NodeParams,
case WrappedPaymentSent(paymentSent) =>
context.log.debug("trampoline payment fully resolved downstream")
success(upstream, fulfilledUpstream, paymentSent)
val totalFee = upstream.amountIn - paymentSent.amountWithFees
val fees = upstream.received.foldLeft(Map.empty[ReputationRecorder.PeerEndorsement, MilliSatoshi])((fees, r) =>
fees.updatedWith(ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement))(fee =>
Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong)))
reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineSuccess(fees, relayId))
recordRelayDuration(startedAt, isSuccess = true)
stopping()
case _: WrappedPaymentFailed if fulfilledUpstream =>
context.log.warn("trampoline payment failed downstream but was fulfilled upstream")
reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineFailure(upstream.received.map(r => ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement)).toSet, relayId))
recordRelayDuration(startedAt, isSuccess = true)
stopping()
case WrappedPaymentFailed(PaymentFailed(_, _, failures, _)) =>
Expand All @@ -415,7 +407,6 @@ class NodeRelay private(nodeParams: NodeParams,
attemptOnTheFlyFunding(upstream, walletNodeId, recipient, nextPayload, failures, startedAt)
case _ =>
rejectPayment(upstream, translateError(nodeParams, failures, upstream, nextPayload))
reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineFailure(upstream.received.map(r => ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement)).toSet, relayId))
recordRelayDuration(startedAt, isSuccess = false)
stopping()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object NodeRelayer {
*/
def apply(nodeParams: NodeParams,
register: akka.actor.ActorRef,
reputationRecorder_opt: Option[ActorRef[ReputationRecorder.TrampolineRelayCommand]],
reputationRecorder_opt: Option[ActorRef[ReputationRecorder.GetTrampolineConfidence]],
outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory,
router: akka.actor.ActorRef,
children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@

package fr.acinq.eclair.reputation

import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.eclair.reputation.Reputation.HtlcId
import fr.acinq.eclair.{MilliSatoshi, TimestampMilli}

import java.util.UUID
import scala.concurrent.duration.FiniteDuration

/**
* Created by thomash on 21/07/2023.
*/

/**
* Local reputation for a given incoming node, that should be track for each incoming endorsement level.
* Local reputation for a given incoming node, that should be tracked for each incoming endorsement level.
*
* @param pastWeight How much fees we would have collected in the past if all payments had succeeded (exponential moving average).
* @param pastScore How much fees we have collected in the past (exponential moving average).
Expand All @@ -36,51 +37,47 @@ import scala.concurrent.duration.FiniteDuration
* @param maxRelayDuration Duration after which payments are penalized for staying pending too long.
* @param pendingMultiplier How much to penalize pending payments.
*/
case class Reputation(pastWeight: Double, pastScore: Double, lastSettlementAt: TimestampMilli, pending: Map[UUID, Reputation.PendingPayment], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) {
case class Reputation(pastWeight: Double, pastScore: Double, lastSettlementAt: TimestampMilli, pending: Map[HtlcId, Reputation.PendingPayment], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) {
private def decay(now: TimestampMilli): Double = scala.math.pow(0.5, (now - lastSettlementAt) / halfLife)

private def pendingWeight(now: TimestampMilli): Double = pending.values.map(_.weight(now, maxRelayDuration, pendingMultiplier)).sum

/**
* Register a payment to relay and estimate the confidence that it will succeed.
*
* @return (updated reputation, confidence)
* Estimate the confidence that a payment will succeed.
*/
def attempt(relayId: UUID, fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): (Reputation, Double) = {
def getConfidence(fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): Double = {
val d = decay(now)
val newReputation = copy(pending = pending + (relayId -> Reputation.PendingPayment(fee, now)))
val confidence = d * pastScore / (d * pastWeight + newReputation.pendingWeight(now))
(newReputation, confidence)
d * pastScore / (d * pastWeight + pendingWeight(now) + fee.toLong.toDouble * pendingMultiplier)
}

/**
* Mark a previously registered payment as failed without trying to relay it (usually because its confidence was too low).
* Register a pending relay.
*
* @return updated reputation
*/
def cancel(relayId: UUID): Reputation = copy(pending = pending - relayId)
def attempt(htlcId: HtlcId, fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): Reputation =
copy(pending = pending + (htlcId -> Reputation.PendingPayment(fee, now)))

/**
* When a payment is settled, we record whether it succeeded and how long it took.
*
* @param feeOverride When relaying trampoline payments, the actual fee is only known when the payment succeeds. This
* is used instead of the fee upper bound that was known when first attempting the relay.
* @return updated reputation
*/
def record(relayId: UUID, isSuccess: Boolean, feeOverride: Option[MilliSatoshi] = None, now: TimestampMilli = TimestampMilli.now()): Reputation = {
pending.get(relayId) match {
def record(htlcId: HtlcId, isSuccess: Boolean, now: TimestampMilli = TimestampMilli.now()): Reputation = {
pending.get(htlcId) match {
case Some(p) =>
val d = decay(now)
val p1 = p.copy(fee = feeOverride.getOrElse(p.fee))
val newWeight = d * pastWeight + p1.weight(now, maxRelayDuration, 1.0)
val newScore = d * pastScore + (if (isSuccess) p1.fee.toLong.toDouble else 0)
Reputation(newWeight, newScore, now, pending - relayId, halfLife, maxRelayDuration, pendingMultiplier)
val newWeight = d * pastWeight + p.weight(now, maxRelayDuration, if (isSuccess) 1.0 else 0.0)
val newScore = d * pastScore + (if (isSuccess) p.fee.toLong.toDouble else 0)
Reputation(newWeight, newScore, now, pending - htlcId, halfLife, maxRelayDuration, pendingMultiplier)
case None => this
}
}
}

object Reputation {
case class HtlcId(channelId: ByteVector32, id: Long)

/** We're relaying that payment and are waiting for it to settle. */
case class PendingPayment(fee: MilliSatoshi, startedAt: TimestampMilli) {
def weight(now: TimestampMilli, minDuration: FiniteDuration, multiplier: Double): Double = {
Expand Down
Loading

0 comments on commit 9973d1e

Please sign in to comment.