Skip to content

Commit

Permalink
Wake up wallet nodes before relaying messages or payments (#2865)
Browse files Browse the repository at this point in the history
We refactor `NodeRelay.scala` to re-order some steps. The steps are:

1. Fully receive the incoming payment
2. Resolve the next node (unwrap blinded paths if needed)
3. Wake-up the next node if necessary (mobile wallet)
4. Relay outgoing payment

Note that we introduce a wake-up step, that can be extended to include
mobile notifications. We introduce that same wake-up step in channel
relay and message relay. We also allow relaying data to contain a wallet
`node_id` instead of an scid. When that's the case, we start by waking
up that wallet node before we try relaying onion messages or payments.

This wake-up step doesn't contain any logic right now apart from waiting
for the peer to connect, if it isn't connected already. But it can easily be
extended to send a mobile notification to prompt the wallet to connect.
  • Loading branch information
t-bast authored Aug 28, 2024
1 parent c440007 commit fcd88b0
Show file tree
Hide file tree
Showing 32 changed files with 1,011 additions and 576 deletions.
7 changes: 7 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,13 @@ eclair {
max-no-channels = 250 // maximum number of incoming connections from peers that do not have any channels with us
}

// When relaying payments or messages to mobile peers who are disconnected, we may try to wake them up using a mobile
// notification system, or we attempt connecting to the last known address.
peer-wake-up {
enabled = false
timeout = 60 seconds
}

auto-reconnect = true
initial-random-reconnect-delay = 5 seconds // we add a random delay before the first reconnection attempt, capped by this value
max-reconnect-interval = 1 hour // max interval between two reconnection attempts, after the exponential backoff period
Expand Down
11 changes: 8 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import fr.acinq.eclair.crypto.Noise.KeyPair
import fr.acinq.eclair.crypto.keymanager.{ChannelKeyManager, NodeKeyManager, OnChainKeyManager}
import fr.acinq.eclair.db._
import fr.acinq.eclair.io.MessageRelay.{RelayAll, RelayChannelsOnly, RelayPolicy}
import fr.acinq.eclair.io.PeerConnection
import fr.acinq.eclair.io.{PeerConnection, PeerReadyNotifier}
import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
import fr.acinq.eclair.router.Announcements.AddressException
Expand Down Expand Up @@ -87,7 +87,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
blockchainWatchdogSources: Seq[String],
onionMessageConfig: OnionMessageConfig,
purgeInvoicesInterval: Option[FiniteDuration],
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config) {
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config,
peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey

val nodeId: PublicKey = nodeKeyManager.nodeId
Expand Down Expand Up @@ -611,7 +612,11 @@ object NodeParams extends Logging {
revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(
batchSize = config.getInt("db.revoked-htlc-info-cleaner.batch-size"),
interval = FiniteDuration(config.getDuration("db.revoked-htlc-info-cleaner.interval").getSeconds, TimeUnit.SECONDS)
)
),
peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(
enabled = config.getBoolean("peer-wake-up.enabled"),
timeout = FiniteDuration(config.getDuration("peer-wake-up.timeout").getSeconds, TimeUnit.SECONDS)
),
)
}
}
3 changes: 2 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ class Setup(val datadir: File,
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
peerReadyManager = system.spawn(Behaviors.supervise(PeerReadyManager()).onFailure(typed.SupervisorStrategy.restart), name = "peer-ready-manager")
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
_ = relayer ! PostRestartHtlcCleaner.Init(channels)
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
Expand Down
102 changes: 55 additions & 47 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/MessageRelay.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,18 @@ object MessageRelay {
policy: RelayPolicy,
replyTo_opt: Option[typed.ActorRef[Status]]) extends Command
case class WrappedPeerInfo(peerInfo: PeerInfoResponse) extends Command
case class WrappedConnectionResult(result: PeerConnection.ConnectionResult) extends Command
case class WrappedOptionalNodeId(nodeId_opt: Option[PublicKey]) extends Command
private case class WrappedConnectionResult(result: PeerConnection.ConnectionResult) extends Command
private case class WrappedOptionalNodeId(nodeId_opt: Option[PublicKey]) extends Command
private case class WrappedPeerReadyResult(result: PeerReadyNotifier.Result) extends Command

sealed trait Status {
val messageId: ByteVector32
}
sealed trait Status { val messageId: ByteVector32 }
case class Sent(messageId: ByteVector32) extends Status
sealed trait Failure extends Status
case class AgainstPolicy(messageId: ByteVector32, policy: RelayPolicy) extends Failure {
override def toString: String = s"Relay prevented by policy $policy"
}
case class ConnectionFailure(messageId: ByteVector32, failure: PeerConnection.ConnectionResult.Failure) extends Failure {
override def toString: String = s"Can't connect to peer: ${failure.toString}"
}
case class Disconnected(messageId: ByteVector32) extends Failure {
override def toString: String = "Peer is not connected"
}
case class UnknownChannel(messageId: ByteVector32, channelId: ShortChannelId) extends Failure {
override def toString: String = s"Unknown channel: $channelId"
}
case class DroppedMessage(messageId: ByteVector32, reason: DropReason) extends Failure {
override def toString: String = s"Message dropped: $reason"
}
case class AgainstPolicy(messageId: ByteVector32, policy: RelayPolicy) extends Failure { override def toString: String = s"Relay prevented by policy $policy" }
case class ConnectionFailure(messageId: ByteVector32, failure: PeerConnection.ConnectionResult.Failure) extends Failure { override def toString: String = s"Can't connect to peer: ${failure.toString}" }
case class Disconnected(messageId: ByteVector32) extends Failure { override def toString: String = "Peer is not connected" }
case class UnknownChannel(messageId: ByteVector32, channelId: ShortChannelId) extends Failure { override def toString: String = s"Unknown channel: $channelId" }
case class DroppedMessage(messageId: ByteVector32, reason: DropReason) extends Failure { override def toString: String = s"Message dropped: $reason" }

sealed trait RelayPolicy
case object RelayChannelsOnly extends RelayPolicy
Expand Down Expand Up @@ -106,15 +95,15 @@ private class MessageRelay(nodeParams: NodeParams,
def queryNextNodeId(msg: OnionMessage, nextNode: Either[ShortChannelId, EncodedNodeId]): Behavior[Command] = {
nextNode match {
case Left(outgoingChannelId) if outgoingChannelId == ShortChannelId.toSelf =>
withNextNodeId(msg, nodeParams.nodeId)
withNextNodeId(msg, EncodedNodeId.WithPublicKey.Plain(nodeParams.nodeId))
case Left(outgoingChannelId) =>
register ! Register.GetNextNodeId(context.messageAdapter(WrappedOptionalNodeId), outgoingChannelId)
waitForNextNodeId(msg, outgoingChannelId)
case Right(EncodedNodeId.ShortChannelIdDir(isNode1, scid)) =>
router ! Router.GetNodeId(context.messageAdapter(WrappedOptionalNodeId), scid, isNode1)
waitForNextNodeId(msg, scid)
case Right(encodedNodeId: EncodedNodeId.WithPublicKey) =>
withNextNodeId(msg, encodedNodeId.publicKey)
withNextNodeId(msg, encodedNodeId)
}
}

Expand All @@ -127,34 +116,39 @@ private class MessageRelay(nodeParams: NodeParams,
Behaviors.stopped
case WrappedOptionalNodeId(Some(nextNodeId)) =>
log.info("found outgoing node {} for channel {}", nextNodeId, channelId)
withNextNodeId(msg, nextNodeId)
withNextNodeId(msg, EncodedNodeId.WithPublicKey.Plain(nextNodeId))
}
}

private def withNextNodeId(msg: OnionMessage, nextNodeId: PublicKey): Behavior[Command] = {
if (nextNodeId == nodeParams.nodeId) {
OnionMessages.process(nodeParams.privateKey, msg) match {
case OnionMessages.DropMessage(reason) =>
Metrics.OnionMessagesNotRelayed.withTag(Tags.Reason, reason.getClass.getSimpleName).increment()
replyTo_opt.foreach(_ ! DroppedMessage(messageId, reason))
Behaviors.stopped
case OnionMessages.SendMessage(nextNode, nextMessage) =>
// We need to repeat the process until we identify the (real) next node, or find out that we're the recipient.
queryNextNodeId(nextMessage, nextNode)
case received: OnionMessages.ReceiveMessage =>
context.system.eventStream ! EventStream.Publish(received)
replyTo_opt.foreach(_ ! Sent(messageId))
Behaviors.stopped
}
} else {
policy match {
case RelayChannelsOnly =>
switchboard ! GetPeerInfo(context.messageAdapter(WrappedPeerInfo), prevNodeId)
waitForPreviousPeerForPolicyCheck(msg, nextNodeId)
case RelayAll =>
switchboard ! Peer.Connect(nextNodeId, None, context.messageAdapter(WrappedConnectionResult).toClassic, isPersistent = false)
waitForConnection(msg, nextNodeId)
}
private def withNextNodeId(msg: OnionMessage, nextNodeId: EncodedNodeId.WithPublicKey): Behavior[Command] = {
nextNodeId match {
case EncodedNodeId.WithPublicKey.Plain(nodeId) if nodeId == nodeParams.nodeId =>
OnionMessages.process(nodeParams.privateKey, msg) match {
case OnionMessages.DropMessage(reason) =>
Metrics.OnionMessagesNotRelayed.withTag(Tags.Reason, reason.getClass.getSimpleName).increment()
replyTo_opt.foreach(_ ! DroppedMessage(messageId, reason))
Behaviors.stopped
case OnionMessages.SendMessage(nextNode, nextMessage) =>
// We need to repeat the process until we identify the (real) next node, or find out that we're the recipient.
queryNextNodeId(nextMessage, nextNode)
case received: OnionMessages.ReceiveMessage =>
context.system.eventStream ! EventStream.Publish(received)
replyTo_opt.foreach(_ ! Sent(messageId))
Behaviors.stopped
}
case EncodedNodeId.WithPublicKey.Plain(nodeId) =>
policy match {
case RelayChannelsOnly =>
switchboard ! GetPeerInfo(context.messageAdapter(WrappedPeerInfo), prevNodeId)
waitForPreviousPeerForPolicyCheck(msg, nodeId)
case RelayAll =>
switchboard ! Peer.Connect(nodeId, None, context.messageAdapter(WrappedConnectionResult).toClassic, isPersistent = false)
waitForConnection(msg, nodeId)
}
case EncodedNodeId.WithPublicKey.Wallet(nodeId) =>
val notifier = context.spawnAnonymous(PeerReadyNotifier(nodeId, timeout_opt = Some(Left(nodeParams.peerWakeUpConfig.timeout))))
notifier ! PeerReadyNotifier.NotifyWhenPeerReady(context.messageAdapter(WrappedPeerReadyResult))
waitForWalletNodeUp(msg, nodeId)
}
}

Expand Down Expand Up @@ -197,4 +191,18 @@ private class MessageRelay(nodeParams: NodeParams,
Behaviors.stopped
}
}

private def waitForWalletNodeUp(msg: OnionMessage, nextNodeId: PublicKey): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case WrappedPeerReadyResult(r: PeerReadyNotifier.PeerReady) =>
log.info("successfully woke up {}: relaying onion message", nextNodeId)
r.peer ! Peer.RelayOnionMessage(messageId, msg, replyTo_opt)
Behaviors.stopped
case WrappedPeerReadyResult(_: PeerReadyNotifier.PeerUnavailable) =>
Metrics.OnionMessagesNotRelayed.withTag(Tags.Reason, Tags.Reasons.ConnectionFailure).increment()
log.info("could not wake up {}: onion message cannot be relayed", nextNodeId)
replyTo_opt.foreach(_ ! Disconnected(messageId))
Behaviors.stopped
}
}
}
Loading

0 comments on commit fcd88b0

Please sign in to comment.