diff --git a/docs/release-notes/eclair-vnext.md b/docs/release-notes/eclair-vnext.md index bb101b59d5..1ac72fda6b 100644 --- a/docs/release-notes/eclair-vnext.md +++ b/docs/release-notes/eclair-vnext.md @@ -37,7 +37,7 @@ To configure, edit `eclair.conf`: ```eclair.conf // We assign reputations to our peers to prioritize payments during congestion. // The reputation is computed as fees paid divided by what should have been paid if all payments were successful. -eclair.peer-reputation { +eclair.relay.peer-reputation { // Set this parameter to false to disable the reputation algorithm and simply relay the incoming endorsement // value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md, enabled = true diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index e2dfcf810f..6cf298e1fa 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -362,7 +362,7 @@ class Setup(val datadir: File, paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume)) triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer") reputationRecorder_opt = if (nodeParams.relayParams.peerReputationConfig.enabled) { - Some(system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig, Map.empty)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder")) + Some(system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder")) } else { None } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala index 05aeb6f985..434c578b03 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala @@ -26,7 +26,7 @@ import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningS import fr.acinq.eclair.io.Peer import fr.acinq.eclair.transactions.CommitmentSpec import fr.acinq.eclair.transactions.Transactions._ -import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureMessage, FundingCreated, FundingSigned, Init, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc} +import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureMessage, FundingCreated, FundingSigned, HtlcFailureMessage, Init, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc} import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiry, CltvExpiryDelta, Features, InitFeature, MilliSatoshi, MilliSatoshiLong, RealShortChannelId, TimestampMilli, UInt64} import scodec.bits.ByteVector @@ -228,6 +228,10 @@ final case class CMD_GET_CHANNEL_STATE(replyTo: ActorRef) extends HasReplyToComm final case class CMD_GET_CHANNEL_DATA(replyTo: ActorRef) extends HasReplyToCommand final case class CMD_GET_CHANNEL_INFO(replyTo: akka.actor.typed.ActorRef[RES_GET_CHANNEL_INFO]) extends Command +case class OutgoingHtlcAdded(add: UpdateAddHtlc, upstream: Upstream.Hot) +case class OutgoingHtlcFailed(fail: HtlcFailureMessage) +case class OutgoingHtlcFulfilled(fulfill: UpdateFulfillHtlc) + /* 88888888b. 8888888888 .d8888b. 88888888b. ,ad8888ba, 888b 88 .d8888b. 8888888888 .d8888b. 88 "8b 88 d88P Y88b 88 "8b d8"' `"8b 8888b 88 d88P Y88b 88 d88P Y88b diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index e55d86d612..525a4be00b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -439,6 +439,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with case Right((commitments1, add)) => if (c.commit) self ! CMD_SIGN() context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortIds, commitments1)) + context.system.eventStream.publish(OutgoingHtlcAdded(add, c.origin.upstream)) handleCommandSuccess(c, d.copy(commitments = commitments1)) sending add case Left(cause) => handleAddHtlcCommandError(c, cause, Some(d.channelUpdate)) } @@ -465,6 +466,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with case Right((commitments1, origin, htlc)) => // we forward preimages as soon as possible to the upstream channel because it allows us to pull funds relayer ! RES_ADD_SETTLED(origin, htlc, HtlcResult.RemoteFulfill(fulfill)) + context.system.eventStream.publish(OutgoingHtlcFulfilled(fulfill)) stay() using d.copy(commitments = commitments1) case Left(cause) => handleLocalError(cause, d, Some(fulfill)) } @@ -498,12 +500,14 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with } case Event(fail: UpdateFailHtlc, d: DATA_NORMAL) => + context.system.eventStream.publish(OutgoingHtlcFailed(fail)) d.commitments.receiveFail(fail) match { case Right((commitments1, _, _)) => stay() using d.copy(commitments = commitments1) case Left(cause) => handleLocalError(cause, d, Some(fail)) } case Event(fail: UpdateFailMalformedHtlc, d: DATA_NORMAL) => + context.system.eventStream.publish(OutgoingHtlcFailed(fail)) d.commitments.receiveFailMalformed(fail) match { case Right((commitments1, _, _)) => stay() using d.copy(commitments = commitments1) case Left(cause) => handleLocalError(cause, d, Some(fail)) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala index e53deeddab..25d128b1a8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala @@ -30,7 +30,7 @@ import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags} import fr.acinq.eclair.payment.relay.Relayer.{OutgoingChannel, OutgoingChannelParams} import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket} import fr.acinq.eclair.reputation.ReputationRecorder -import fr.acinq.eclair.reputation.ReputationRecorder.{CancelRelay, GetConfidence, RecordResult} +import fr.acinq.eclair.reputation.ReputationRecorder.GetConfidence import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload import fr.acinq.eclair.wire.protocol._ @@ -59,7 +59,7 @@ object ChannelRelay { def apply(nodeParams: NodeParams, register: ActorRef, - reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]], + reputationRecorder_opt: Option[typed.ActorRef[GetConfidence]], channels: Map[ByteVector32, Relayer.OutgoingChannel], originNode: PublicKey, relayId: UUID, @@ -73,7 +73,7 @@ object ChannelRelay { val upstream = Upstream.Hot.Channel(r.add.removeUnknownTlvs(), TimestampMilli.now(), originNode) reputationRecorder_opt match { case Some(reputationRecorder) => - reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), originNode, r.add.endorsement, relayId, r.relayFeeMsat) + reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), upstream, r.relayFeeMsat) case None => val confidence = (r.add.endorsement + 0.5) / 8 context.self ! WrappedConfidence(confidence) @@ -123,7 +123,7 @@ object ChannelRelay { */ class ChannelRelay private(nodeParams: NodeParams, register: ActorRef, - reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]], + reputationRecorder_opt: Option[typed.ActorRef[GetConfidence]], channels: Map[ByteVector32, Relayer.OutgoingChannel], r: IncomingPaymentPacket.ChannelRelayPacket, upstream: Upstream.Hot.Channel, @@ -149,7 +149,6 @@ class ChannelRelay private(nodeParams: NodeParams, case RelayFailure(cmdFail) => Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel) context.log.info("rejecting htlc reason={}", cmdFail.reason) - reputationRecorder_opt.foreach(_ ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId)) safeSendAndStop(r.add.channelId, cmdFail) case RelaySuccess(selectedChannelId, cmdAdd) => context.log.info("forwarding htlc #{} from channelId={} to channelId={}", r.add.id, r.add.channelId, selectedChannelId) @@ -165,7 +164,6 @@ class ChannelRelay private(nodeParams: NodeParams, context.log.warn(s"couldn't resolve downstream channel $channelId, failing htlc #${upstream.add.id}") val cmdFail = CMD_FAIL_HTLC(upstream.add.id, Right(UnknownNextPeer()), commit = true) Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel) - reputationRecorder_opt.foreach(_ ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId)) safeSendAndStop(upstream.add.channelId, cmdFail) case WrappedAddResponse(addFailed: RES_ADD_FAILED[_]) => @@ -178,7 +176,7 @@ class ChannelRelay private(nodeParams: NodeParams, waitForAddSettled(r.channelId) } - def waitForAddSettled(channelId: ByteVector32): Behavior[Command] = + def waitForAddSettled(channelId: ByteVector32): Behavior[Command] = { Behaviors.receiveMessagePartial { case WrappedAddResponse(RES_ADD_SETTLED(_, htlc, fulfill: HtlcResult.Fulfill)) => context.log.info("relaying fulfill to upstream, startedAt={}, endedAt={}, confidence={}, originNode={}, outgoingChannel={}", upstream.receivedAt, TimestampMilli.now(), confidence, upstream.receivedFrom, channelId) @@ -196,6 +194,7 @@ class ChannelRelay private(nodeParams: NodeParams, recordRelayDuration(isSuccess = false) safeSendAndStop(upstream.add.channelId, cmd) } + } def safeSendAndStop(channelId: ByteVector32, cmd: channel.HtlcSettlementCommand): Behavior[Command] = { val toSend = cmd match { @@ -343,7 +342,6 @@ class ChannelRelay private(nodeParams: NodeParams, } private def recordRelayDuration(isSuccess: Boolean): Unit = { - reputationRecorder_opt.foreach(_ ! RecordResult(upstream.receivedFrom, r.add.endorsement, relayId, isSuccess)) Metrics.RelayedPaymentDuration .withTag(Tags.Relay, Tags.RelayType.Channel) .withTag(Tags.Success, isSuccess) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala index d5d4d11b7f..4dae47e66d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala @@ -59,7 +59,7 @@ object ChannelRelayer { def apply(nodeParams: NodeParams, register: ActorRef, - reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]], + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.GetConfidence]], channels: Map[ByteVector32, Relayer.OutgoingChannel] = Map.empty, scid2channels: Map[ShortChannelId, ByteVector32] = Map.empty, node2channels: mutable.MultiDict[PublicKey, ByteVector32] = mutable.MultiDict.empty): Behavior[Command] = diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala index 8710f9fb8f..bb6fdf4cf0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala @@ -36,7 +36,7 @@ import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.{PreimageReceived, import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPaymentToNode import fr.acinq.eclair.payment.send._ -import fr.acinq.eclair.reputation.ReputationRecorder +import fr.acinq.eclair.reputation.ReputationRecorder.GetTrampolineConfidence import fr.acinq.eclair.router.Router.RouteParams import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound} import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload @@ -87,7 +87,7 @@ object NodeRelay { def apply(nodeParams: NodeParams, parent: typed.ActorRef[NodeRelayer.Command], register: ActorRef, - reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]], + reputationRecorder_opt: Option[typed.ActorRef[GetTrampolineConfidence]], relayId: UUID, nodeRelayPacket: NodeRelayPacket, outgoingPaymentFactory: OutgoingPaymentFactory, @@ -186,7 +186,7 @@ object NodeRelay { class NodeRelay private(nodeParams: NodeParams, parent: akka.actor.typed.ActorRef[NodeRelayer.Command], register: ActorRef, - reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]], + reputationRecorder_opt: Option[typed.ActorRef[GetTrampolineConfidence]], relayId: UUID, paymentHash: ByteVector32, paymentSecret: ByteVector32, @@ -264,11 +264,8 @@ class NodeRelay private(nodeParams: NodeParams, private def doSend(upstream: Upstream.Hot.Trampoline, nextPayload: IntermediatePayload.NodeRelay, nextPacket_opt: Option[OnionRoutingPacket]): Behavior[Command] = { context.log.debug(s"relaying trampoline payment (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv})") val totalFee = upstream.amountIn - nextPayload.amountToForward - val fees = upstream.received.foldLeft(Map.empty[ReputationRecorder.PeerEndorsement, MilliSatoshi])((fees, r) => - fees.updatedWith(ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement))(fee => - Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong))) reputationRecorder_opt match { - case Some(reputationRecorder) => reputationRecorder ! ReputationRecorder.GetTrampolineConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), fees, relayId) + case Some(reputationRecorder) => reputationRecorder ! GetTrampolineConfidence(context.messageAdapter(confidence => WrappedConfidence(confidence.value)), upstream, totalFee) case None => context.self ! WrappedConfidence((upstream.received.map(_.add.endorsement).min + 0.5) / 8) } Behaviors.receiveMessagePartial { @@ -303,11 +300,6 @@ class NodeRelay private(nodeParams: NodeParams, case WrappedPaymentSent(paymentSent) => context.log.debug("trampoline payment fully resolved downstream") success(upstream, fulfilledUpstream, paymentSent) - val totalFee = upstream.amountIn - paymentSent.amountWithFees - val fees = upstream.received.foldLeft(Map.empty[ReputationRecorder.PeerEndorsement, MilliSatoshi])((fees, r) => - fees.updatedWith(ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement))(fee => - Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong))) - reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineSuccess(fees, relayId)) recordRelayDuration(startedAt, isSuccess = true) stopping() case WrappedPaymentFailed(PaymentFailed(_, _, failures, _)) => @@ -315,7 +307,6 @@ class NodeRelay private(nodeParams: NodeParams, if (!fulfilledUpstream) { rejectPayment(upstream, translateError(nodeParams, failures, upstream, nextPayload)) } - reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineFailure(upstream.received.map(r => ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement)).toSet, relayId)) recordRelayDuration(startedAt, isSuccess = fulfilledUpstream) stopping() } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala index 8db24382b4..b249eba3ef 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala @@ -61,7 +61,7 @@ object NodeRelayer { */ def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, - reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]], + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.GetTrampolineConfidence]], outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], router: akka.actor.ActorRef, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala index f74c0aca67..dbfeb3ad2f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala @@ -16,9 +16,10 @@ package fr.acinq.eclair.reputation +import fr.acinq.bitcoin.scalacompat.ByteVector32 +import fr.acinq.eclair.reputation.Reputation.HtlcId import fr.acinq.eclair.{MilliSatoshi, TimestampMilli} -import java.util.UUID import scala.concurrent.duration.FiniteDuration /** @@ -26,7 +27,7 @@ import scala.concurrent.duration.FiniteDuration */ /** - * Local reputation for a given incoming node, that should be track for each incoming endorsement level. + * Local reputation for a given incoming node, that should be tracked for each incoming endorsement level. * * @param pastWeight How much fees we would have collected in the past if all payments had succeeded (exponential moving average). * @param pastScore How much fees we have collected in the past (exponential moving average). @@ -36,51 +37,47 @@ import scala.concurrent.duration.FiniteDuration * @param maxRelayDuration Duration after which payments are penalized for staying pending too long. * @param pendingMultiplier How much to penalize pending payments. */ -case class Reputation(pastWeight: Double, pastScore: Double, lastSettlementAt: TimestampMilli, pending: Map[UUID, Reputation.PendingPayment], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) { +case class Reputation(pastWeight: Double, pastScore: Double, lastSettlementAt: TimestampMilli, pending: Map[HtlcId, Reputation.PendingPayment], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) { private def decay(now: TimestampMilli): Double = scala.math.pow(0.5, (now - lastSettlementAt) / halfLife) private def pendingWeight(now: TimestampMilli): Double = pending.values.map(_.weight(now, maxRelayDuration, pendingMultiplier)).sum /** - * Register a payment to relay and estimate the confidence that it will succeed. - * - * @return (updated reputation, confidence) + * Estimate the confidence that a payment will succeed. */ - def attempt(relayId: UUID, fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): (Reputation, Double) = { + def getConfidence(fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): Double = { val d = decay(now) - val newReputation = copy(pending = pending + (relayId -> Reputation.PendingPayment(fee, now))) - val confidence = d * pastScore / (d * pastWeight + newReputation.pendingWeight(now)) - (newReputation, confidence) + d * pastScore / (d * pastWeight + pendingWeight(now) + fee.toLong.toDouble * pendingMultiplier) } /** - * Mark a previously registered payment as failed without trying to relay it (usually because its confidence was too low). + * Register a pending relay. * * @return updated reputation */ - def cancel(relayId: UUID): Reputation = copy(pending = pending - relayId) + def attempt(htlcId: HtlcId, fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): Reputation = + copy(pending = pending + (htlcId -> Reputation.PendingPayment(fee, now))) /** * When a payment is settled, we record whether it succeeded and how long it took. * - * @param feeOverride When relaying trampoline payments, the actual fee is only known when the payment succeeds. This - * is used instead of the fee upper bound that was known when first attempting the relay. * @return updated reputation */ - def record(relayId: UUID, isSuccess: Boolean, feeOverride: Option[MilliSatoshi] = None, now: TimestampMilli = TimestampMilli.now()): Reputation = { - pending.get(relayId) match { + def record(htlcId: HtlcId, isSuccess: Boolean, now: TimestampMilli = TimestampMilli.now()): Reputation = { + pending.get(htlcId) match { case Some(p) => val d = decay(now) - val p1 = p.copy(fee = feeOverride.getOrElse(p.fee)) - val newWeight = d * pastWeight + p1.weight(now, maxRelayDuration, 1.0) - val newScore = d * pastScore + (if (isSuccess) p1.fee.toLong.toDouble else 0) - Reputation(newWeight, newScore, now, pending - relayId, halfLife, maxRelayDuration, pendingMultiplier) + val newWeight = d * pastWeight + p.weight(now, maxRelayDuration, 1.0) + val newScore = d * pastScore + (if (isSuccess) p.fee.toLong.toDouble else 0) + Reputation(newWeight, newScore, now, pending - htlcId, halfLife, maxRelayDuration, pendingMultiplier) case None => this } } } object Reputation { + case class HtlcId(channelId: ByteVector32, id: Long) + /** We're relaying that payment and are waiting for it to settle. */ case class PendingPayment(fee: MilliSatoshi, startedAt: TimestampMilli) { def weight(now: TimestampMilli, minDuration: FiniteDuration, multiplier: Double): Double = { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala index 77982ebfbe..d4893d74bc 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala @@ -16,12 +16,15 @@ package fr.acinq.eclair.reputation -import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.{ActorContext, Behaviors} import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.MilliSatoshi - -import java.util.UUID +import fr.acinq.eclair.channel.Upstream.Hot +import fr.acinq.eclair.channel.{OutgoingHtlcAdded, OutgoingHtlcFailed, OutgoingHtlcFulfilled, Upstream} +import fr.acinq.eclair.reputation.Reputation.HtlcId +import fr.acinq.eclair.wire.protocol.{UpdateFailHtlc, UpdateFailMalformedHtlc} /** * Created by thomash on 21/07/2023. @@ -30,16 +33,11 @@ import java.util.UUID object ReputationRecorder { // @formatter:off sealed trait Command - - sealed trait ChannelRelayCommand extends Command - case class GetConfidence(replyTo: ActorRef[Confidence], originNode: PublicKey, endorsement: Int, relayId: UUID, fee: MilliSatoshi) extends ChannelRelayCommand - case class CancelRelay(originNode: PublicKey, endorsement: Int, relayId: UUID) extends ChannelRelayCommand - case class RecordResult(originNode: PublicKey, endorsement: Int, relayId: UUID, isSuccess: Boolean) extends ChannelRelayCommand - - sealed trait TrampolineRelayCommand extends Command - case class GetTrampolineConfidence(replyTo: ActorRef[Confidence], fees: Map[PeerEndorsement, MilliSatoshi], relayId: UUID) extends TrampolineRelayCommand - case class RecordTrampolineFailure(upstream: Set[PeerEndorsement], relayId: UUID) extends TrampolineRelayCommand - case class RecordTrampolineSuccess(fees: Map[PeerEndorsement, MilliSatoshi], relayId: UUID) extends TrampolineRelayCommand + case class GetConfidence(replyTo: ActorRef[Confidence], upstream: Upstream.Hot.Channel, fee: MilliSatoshi) extends Command + case class GetTrampolineConfidence(replyTo: ActorRef[Confidence], upstream: Upstream.Hot.Trampoline, fee: MilliSatoshi) extends Command + private case class WrappedOutgoingHtlcAdded(added: OutgoingHtlcAdded) extends Command + private case class WrappedOutgoingHtlcFailed(failed: OutgoingHtlcFailed) extends Command + private case class WrappedOutgoingHtlcFulfilled(fulfilled: OutgoingHtlcFulfilled) extends Command // @formatter:on /** @@ -51,40 +49,74 @@ object ReputationRecorder { /** Confidence that the outgoing HTLC will succeed. */ case class Confidence(value: Double) - def apply(reputationConfig: Reputation.Config, reputations: Map[PeerEndorsement, Reputation]): Behavior[Command] = { + def apply(config: Reputation.Config): Behavior[Command] = + Behaviors.setup(context => { + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcFailed)) + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcFulfilled)) + new ReputationRecorder(config, context).run(Map.empty, Map.empty) + }) +} + +class ReputationRecorder(config: Reputation.Config, context: ActorContext[ReputationRecorder.Command]) { + + import ReputationRecorder._ + + private def getReputation(reputations: Map[PeerEndorsement, Reputation], channel: Hot.Channel): Reputation = + reputations.getOrElse(PeerEndorsement(channel.receivedFrom, channel.add.endorsement), Reputation.init(config)) + + private def updateReputation(reputations: Map[PeerEndorsement, Reputation], channel: Hot.Channel, newReputation: Reputation): Map[PeerEndorsement, Reputation] = + reputations.updated(PeerEndorsement(channel.receivedFrom, channel.add.endorsement), newReputation) + + def run(reputations: Map[PeerEndorsement, Reputation], pending: Map[HtlcId, Upstream.Hot]): Behavior[Command] = Behaviors.receiveMessage { - case GetConfidence(replyTo, originNode, endorsement, relayId, fee) => - val (updatedReputation, confidence) = reputations.getOrElse(PeerEndorsement(originNode, endorsement), Reputation.init(reputationConfig)).attempt(relayId, fee) + case GetConfidence(replyTo, upstream, fee) => + val confidence = getReputation(reputations, upstream).getConfidence(fee) replyTo ! Confidence(confidence) - ReputationRecorder(reputationConfig, reputations.updated(PeerEndorsement(originNode, endorsement), updatedReputation)) - case CancelRelay(originNode, endorsement, relayId) => - val updatedReputation = reputations.getOrElse(PeerEndorsement(originNode, endorsement), Reputation.init(reputationConfig)).cancel(relayId) - ReputationRecorder(reputationConfig, reputations.updated(PeerEndorsement(originNode, endorsement), updatedReputation)) - case RecordResult(originNode, endorsement, relayId, isSuccess) => - val updatedReputation = reputations.getOrElse(PeerEndorsement(originNode, endorsement), Reputation.init(reputationConfig)).record(relayId, isSuccess) - ReputationRecorder(reputationConfig, reputations.updated(PeerEndorsement(originNode, endorsement), updatedReputation)) - case GetTrampolineConfidence(replyTo, fees, relayId) => - val (confidence, updatedReputations) = fees.foldLeft((1.0, reputations)) { - case ((c, r), (peerEndorsement, fee)) => - val (updatedReputation, confidence) = reputations.getOrElse(peerEndorsement, Reputation.init(reputationConfig)).attempt(relayId, fee) - (c.min(confidence), r.updated(peerEndorsement, updatedReputation)) - } + Behaviors.same + case GetTrampolineConfidence(replyTo, upstream, totalFee) => + val confidence = + upstream.received + .groupMapReduce(r => PeerEndorsement(r.receivedFrom, r.add.endorsement))(_.add.amountMsat)(_ + _) + .map { + case (peerEndorsement, amount) => + val fee = amount * totalFee.toLong / upstream.amountIn.toLong + reputations.getOrElse(peerEndorsement, Reputation.init(config)).getConfidence(fee) + } + .min replyTo ! Confidence(confidence) - ReputationRecorder(reputationConfig, updatedReputations) - case RecordTrampolineFailure(keys, relayId) => - val updatedReputations = keys.foldLeft(reputations) { - case (r, peerEndorsement) => - val updatedReputation = reputations.getOrElse(peerEndorsement, Reputation.init(reputationConfig)).record(relayId, isSuccess = false) - r.updated(peerEndorsement, updatedReputation) + Behaviors.same + case WrappedOutgoingHtlcAdded(OutgoingHtlcAdded(add, upstream)) => + val htlcId = HtlcId(add.channelId, add.id) + upstream match { + case channel: Hot.Channel => + val fee = channel.amountIn - add.amountMsat + val newReputations = updateReputation(reputations, channel, getReputation(reputations, channel).attempt(htlcId, fee)) + run(newReputations, pending + (htlcId -> upstream)) + case trampoline: Hot.Trampoline => + run(???, pending + (htlcId -> upstream)) + case Upstream.Local(id) => Behaviors.same + } + case WrappedOutgoingHtlcFailed(OutgoingHtlcFailed(fail)) => + val htlcId = fail match { + case UpdateFailHtlc(channelId, id, _, _) => HtlcId(channelId, id) + case UpdateFailMalformedHtlc(channelId, id, _, _, _) => HtlcId(channelId, id) + } + pending.get(htlcId) match { + case Some(channel: Hot.Channel) => + val newReputations = updateReputation(reputations, channel, getReputation(reputations, channel).record(htlcId, isSuccess = false)) + run(newReputations, pending - htlcId) + case Some(trampoline: Hot.Trampoline) => ??? + case _ => ??? } - ReputationRecorder(reputationConfig, updatedReputations) - case RecordTrampolineSuccess(fees, relayId) => - val updatedReputations = fees.foldLeft(reputations) { - case (r, (peerEndorsement, fee)) => - val updatedReputation = reputations.getOrElse(peerEndorsement, Reputation.init(reputationConfig)).record(relayId, isSuccess = true, Some(fee)) - r.updated(peerEndorsement, updatedReputation) + case WrappedOutgoingHtlcFulfilled(OutgoingHtlcFulfilled(fulfill)) => + val htlcId = HtlcId(fulfill.channelId, fulfill.id) + pending.get(htlcId) match { + case Some(channel: Hot.Channel) => + val newReputations = updateReputation(reputations, channel, getReputation(reputations, channel).record(htlcId, isSuccess = true)) + run(newReputations, pending - htlcId) + case Some(trampoline: Hot.Trampoline) => ??? + case _ => ??? } - ReputationRecorder(reputationConfig, updatedReputations) } - } }