Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
thomash-acinq committed Aug 8, 2024
1 parent 6e8d675 commit 56e580c
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 89 deletions.
2 changes: 1 addition & 1 deletion docs/release-notes/eclair-vnext.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ To configure, edit `eclair.conf`:
```eclair.conf
// We assign reputations to our peers to prioritize payments during congestion.
// The reputation is computed as fees paid divided by what should have been paid if all payments were successful.
eclair.peer-reputation {
eclair.relay.peer-reputation {
// Set this parameter to false to disable the reputation algorithm and simply relay the incoming endorsement
// value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md,
enabled = true
Expand Down
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ class Setup(val datadir: File,
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
reputationRecorder_opt = if (nodeParams.relayParams.peerReputationConfig.enabled) {
Some(system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig, Map.empty)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder"))
Some(system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder"))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningS
import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.transactions.CommitmentSpec
import fr.acinq.eclair.transactions.Transactions._
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureMessage, FundingCreated, FundingSigned, Init, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc}
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureMessage, FundingCreated, FundingSigned, HtlcFailureMessage, Init, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc}
import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiry, CltvExpiryDelta, Features, InitFeature, MilliSatoshi, MilliSatoshiLong, RealShortChannelId, TimestampMilli, UInt64}
import scodec.bits.ByteVector

Expand Down Expand Up @@ -228,6 +228,10 @@ final case class CMD_GET_CHANNEL_STATE(replyTo: ActorRef) extends HasReplyToComm
final case class CMD_GET_CHANNEL_DATA(replyTo: ActorRef) extends HasReplyToCommand
final case class CMD_GET_CHANNEL_INFO(replyTo: akka.actor.typed.ActorRef[RES_GET_CHANNEL_INFO]) extends Command

case class OutgoingHtlcAdded(add: UpdateAddHtlc, upstream: Upstream.Hot)
case class OutgoingHtlcFailed(fail: HtlcFailureMessage)
case class OutgoingHtlcFulfilled(fulfill: UpdateFulfillHtlc)

/*
88888888b. 8888888888 .d8888b. 88888888b. ,ad8888ba, 888b 88 .d8888b. 8888888888 .d8888b.
88 "8b 88 d88P Y88b 88 "8b d8"' `"8b 8888b 88 d88P Y88b 88 d88P Y88b
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
case Right((commitments1, add)) =>
if (c.commit) self ! CMD_SIGN()
context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortIds, commitments1))
context.system.eventStream.publish(OutgoingHtlcAdded(add, c.origin.upstream))
handleCommandSuccess(c, d.copy(commitments = commitments1)) sending add
case Left(cause) => handleAddHtlcCommandError(c, cause, Some(d.channelUpdate))
}
Expand All @@ -465,6 +466,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
case Right((commitments1, origin, htlc)) =>
// we forward preimages as soon as possible to the upstream channel because it allows us to pull funds
relayer ! RES_ADD_SETTLED(origin, htlc, HtlcResult.RemoteFulfill(fulfill))
context.system.eventStream.publish(OutgoingHtlcFulfilled(fulfill))
stay() using d.copy(commitments = commitments1)
case Left(cause) => handleLocalError(cause, d, Some(fulfill))
}
Expand Down Expand Up @@ -498,12 +500,14 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
}

case Event(fail: UpdateFailHtlc, d: DATA_NORMAL) =>
context.system.eventStream.publish(OutgoingHtlcFailed(fail))
d.commitments.receiveFail(fail) match {
case Right((commitments1, _, _)) => stay() using d.copy(commitments = commitments1)
case Left(cause) => handleLocalError(cause, d, Some(fail))
}

case Event(fail: UpdateFailMalformedHtlc, d: DATA_NORMAL) =>
context.system.eventStream.publish(OutgoingHtlcFailed(fail))
d.commitments.receiveFailMalformed(fail) match {
case Right((commitments1, _, _)) => stay() using d.copy(commitments = commitments1)
case Left(cause) => handleLocalError(cause, d, Some(fail))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.payment.relay.Relayer.{OutgoingChannel, OutgoingChannelParams}
import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket}
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.reputation.ReputationRecorder.{CancelRelay, GetConfidence, RecordResult}
import fr.acinq.eclair.reputation.ReputationRecorder.GetConfidence
import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
import fr.acinq.eclair.wire.protocol._
Expand Down Expand Up @@ -59,7 +59,7 @@ object ChannelRelay {

def apply(nodeParams: NodeParams,
register: ActorRef,
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]],
reputationRecorder_opt: Option[typed.ActorRef[GetConfidence]],
channels: Map[ByteVector32, Relayer.OutgoingChannel],
originNode: PublicKey,
relayId: UUID,
Expand All @@ -73,7 +73,7 @@ object ChannelRelay {
val upstream = Upstream.Hot.Channel(r.add.removeUnknownTlvs(), TimestampMilli.now(), originNode)
reputationRecorder_opt match {
case Some(reputationRecorder) =>
reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), originNode, r.add.endorsement, relayId, r.relayFeeMsat)
reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), upstream, r.relayFeeMsat)
case None =>
val confidence = (r.add.endorsement + 0.5) / 8
context.self ! WrappedConfidence(confidence)
Expand Down Expand Up @@ -123,7 +123,7 @@ object ChannelRelay {
*/
class ChannelRelay private(nodeParams: NodeParams,
register: ActorRef,
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]],
reputationRecorder_opt: Option[typed.ActorRef[GetConfidence]],
channels: Map[ByteVector32, Relayer.OutgoingChannel],
r: IncomingPaymentPacket.ChannelRelayPacket,
upstream: Upstream.Hot.Channel,
Expand All @@ -149,7 +149,6 @@ class ChannelRelay private(nodeParams: NodeParams,
case RelayFailure(cmdFail) =>
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
context.log.info("rejecting htlc reason={}", cmdFail.reason)
reputationRecorder_opt.foreach(_ ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId))
safeSendAndStop(r.add.channelId, cmdFail)
case RelaySuccess(selectedChannelId, cmdAdd) =>
context.log.info("forwarding htlc #{} from channelId={} to channelId={}", r.add.id, r.add.channelId, selectedChannelId)
Expand All @@ -165,7 +164,6 @@ class ChannelRelay private(nodeParams: NodeParams,
context.log.warn(s"couldn't resolve downstream channel $channelId, failing htlc #${upstream.add.id}")
val cmdFail = CMD_FAIL_HTLC(upstream.add.id, Right(UnknownNextPeer()), commit = true)
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
reputationRecorder_opt.foreach(_ ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId))
safeSendAndStop(upstream.add.channelId, cmdFail)

case WrappedAddResponse(addFailed: RES_ADD_FAILED[_]) =>
Expand All @@ -178,7 +176,7 @@ class ChannelRelay private(nodeParams: NodeParams,
waitForAddSettled(r.channelId)
}

def waitForAddSettled(channelId: ByteVector32): Behavior[Command] =
def waitForAddSettled(channelId: ByteVector32): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case WrappedAddResponse(RES_ADD_SETTLED(_, htlc, fulfill: HtlcResult.Fulfill)) =>
context.log.info("relaying fulfill to upstream, startedAt={}, endedAt={}, confidence={}, originNode={}, outgoingChannel={}", upstream.receivedAt, TimestampMilli.now(), confidence, upstream.receivedFrom, channelId)
Expand All @@ -196,6 +194,7 @@ class ChannelRelay private(nodeParams: NodeParams,
recordRelayDuration(isSuccess = false)
safeSendAndStop(upstream.add.channelId, cmd)
}
}

def safeSendAndStop(channelId: ByteVector32, cmd: channel.HtlcSettlementCommand): Behavior[Command] = {
val toSend = cmd match {
Expand Down Expand Up @@ -343,7 +342,6 @@ class ChannelRelay private(nodeParams: NodeParams,
}

private def recordRelayDuration(isSuccess: Boolean): Unit = {
reputationRecorder_opt.foreach(_ ! RecordResult(upstream.receivedFrom, r.add.endorsement, relayId, isSuccess))
Metrics.RelayedPaymentDuration
.withTag(Tags.Relay, Tags.RelayType.Channel)
.withTag(Tags.Success, isSuccess)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object ChannelRelayer {

def apply(nodeParams: NodeParams,
register: ActorRef,
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]],
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.GetConfidence]],
channels: Map[ByteVector32, Relayer.OutgoingChannel] = Map.empty,
scid2channels: Map[ShortChannelId, ByteVector32] = Map.empty,
node2channels: mutable.MultiDict[PublicKey, ByteVector32] = mutable.MultiDict.empty): Behavior[Command] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.{PreimageReceived,
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig
import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPaymentToNode
import fr.acinq.eclair.payment.send._
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.reputation.ReputationRecorder.GetTrampolineConfidence
import fr.acinq.eclair.router.Router.RouteParams
import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound}
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
Expand Down Expand Up @@ -87,7 +87,7 @@ object NodeRelay {
def apply(nodeParams: NodeParams,
parent: typed.ActorRef[NodeRelayer.Command],
register: ActorRef,
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]],
reputationRecorder_opt: Option[typed.ActorRef[GetTrampolineConfidence]],
relayId: UUID,
nodeRelayPacket: NodeRelayPacket,
outgoingPaymentFactory: OutgoingPaymentFactory,
Expand Down Expand Up @@ -186,7 +186,7 @@ object NodeRelay {
class NodeRelay private(nodeParams: NodeParams,
parent: akka.actor.typed.ActorRef[NodeRelayer.Command],
register: ActorRef,
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]],
reputationRecorder_opt: Option[typed.ActorRef[GetTrampolineConfidence]],
relayId: UUID,
paymentHash: ByteVector32,
paymentSecret: ByteVector32,
Expand Down Expand Up @@ -264,11 +264,8 @@ class NodeRelay private(nodeParams: NodeParams,
private def doSend(upstream: Upstream.Hot.Trampoline, nextPayload: IntermediatePayload.NodeRelay, nextPacket_opt: Option[OnionRoutingPacket]): Behavior[Command] = {
context.log.debug(s"relaying trampoline payment (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv})")
val totalFee = upstream.amountIn - nextPayload.amountToForward
val fees = upstream.received.foldLeft(Map.empty[ReputationRecorder.PeerEndorsement, MilliSatoshi])((fees, r) =>
fees.updatedWith(ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement))(fee =>
Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong)))
reputationRecorder_opt match {
case Some(reputationRecorder) => reputationRecorder ! ReputationRecorder.GetTrampolineConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), fees, relayId)
case Some(reputationRecorder) => reputationRecorder ! GetTrampolineConfidence(context.messageAdapter(confidence => WrappedConfidence(confidence.value)), upstream, totalFee)
case None => context.self ! WrappedConfidence((upstream.received.map(_.add.endorsement).min + 0.5) / 8)
}
Behaviors.receiveMessagePartial {
Expand Down Expand Up @@ -303,19 +300,13 @@ class NodeRelay private(nodeParams: NodeParams,
case WrappedPaymentSent(paymentSent) =>
context.log.debug("trampoline payment fully resolved downstream")
success(upstream, fulfilledUpstream, paymentSent)
val totalFee = upstream.amountIn - paymentSent.amountWithFees
val fees = upstream.received.foldLeft(Map.empty[ReputationRecorder.PeerEndorsement, MilliSatoshi])((fees, r) =>
fees.updatedWith(ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement))(fee =>
Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong)))
reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineSuccess(fees, relayId))
recordRelayDuration(startedAt, isSuccess = true)
stopping()
case WrappedPaymentFailed(PaymentFailed(_, _, failures, _)) =>
context.log.debug(s"trampoline payment failed downstream")
if (!fulfilledUpstream) {
rejectPayment(upstream, translateError(nodeParams, failures, upstream, nextPayload))
}
reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineFailure(upstream.received.map(r => ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement)).toSet, relayId))
recordRelayDuration(startedAt, isSuccess = fulfilledUpstream)
stopping()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object NodeRelayer {
*/
def apply(nodeParams: NodeParams,
register: akka.actor.ActorRef,
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]],
reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.GetTrampolineConfidence]],
outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory,
triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command],
router: akka.actor.ActorRef,
Expand Down
Loading

0 comments on commit 56e580c

Please sign in to comment.