Skip to content

Commit

Permalink
Avoid herd effect watching local channels
Browse files Browse the repository at this point in the history
When we restart, we set watches on our funding transactions. But we don't
actually need to watch them immediately, we just need enough time to react
to our peer broadcasting their commitment. We use long `cltv_delta` delays
to guarantee funds safety, so we can spread out the watches across several
blocks to reduce the start-up load. It essentially is the same thing as
receiving mempool transactions or blocks after a delay, which is something
that our threat model already takes into account.
  • Loading branch information
t-bast committed Aug 29, 2023
1 parent ce7be93 commit 0eb0524
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 29 deletions.
1 change: 1 addition & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ eclair {
// expiry-delta-blocks.
fulfill-safety-before-timeout-blocks = 24
min-final-expiry-delta-blocks = 30 // Bolt 11 invoice's min_final_cltv_expiry; must be strictly greater than fulfill-safety-before-timeout-blocks
max-restart-watch-delay = 60 seconds // we add a random delay before watching funding transactions after restart
max-block-processing-delay = 30 seconds // we add a random delay before processing blocks, capped at this value, to prevent herd effect
max-tx-publish-retry-delay = 60 seconds // we add a random delay before retrying failed transaction publication

Expand Down
3 changes: 2 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
def currentFeerates: FeeratesPerKw = feerates.get()

/** Only to be used in tests. */
def setFeerates(value: FeeratesPerKw) = feerates.set(value)
def setFeerates(value: FeeratesPerKw): Unit = feerates.set(value)

/** Returns the features that should be used in our init message with the given peer. */
def initFeaturesFor(nodeId: PublicKey): Features[InitFeature] = overrideInitFeatures.getOrElse(nodeId, features).initFeatures()
Expand Down Expand Up @@ -505,6 +505,7 @@ object NodeParams extends Logging {
maxExpiryDelta = maxExpiryDelta,
fulfillSafetyBeforeTimeout = fulfillSafetyBeforeTimeout,
minFinalExpiryDelta = minFinalExpiryDelta,
maxRestartWatchDelay = FiniteDuration(config.getDuration("channel.max-restart-watch-delay").getSeconds, TimeUnit.SECONDS),
maxBlockProcessingDelay = FiniteDuration(config.getDuration("channel.max-block-processing-delay").getSeconds, TimeUnit.SECONDS),
maxTxPublishRetryDelay = FiniteDuration(config.getDuration("channel.max-tx-publish-retry-delay").getSeconds, TimeUnit.SECONDS),
unhandledExceptionStrategy = unhandledExceptionStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ object Channel {
maxExpiryDelta: CltvExpiryDelta,
fulfillSafetyBeforeTimeout: CltvExpiryDelta,
minFinalExpiryDelta: CltvExpiryDelta,
maxRestartWatchDelay: FiniteDuration,
maxBlockProcessingDelay: FiniteDuration,
maxTxPublishRetryDelay: FiniteDuration,
unhandledExceptionStrategy: UnhandledExceptionStrategy,
Expand Down Expand Up @@ -251,8 +252,13 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
context.system.eventStream.publish(ChannelRestored(self, data.channelId, peer, remoteNodeId, data))
txPublisher ! SetChannelId(remoteNodeId, data.channelId)

// we watch all unconfirmed funding txs, whatever our state is
// (there can be multiple funding txs due to rbf, and they can be unconfirmed in any state due to zero-conf)
// We watch all unconfirmed funding txs, whatever our state is.
// There can be multiple funding txs due to rbf, and they can be unconfirmed in any state due to zero-conf.
// To avoid a herd effect on restart, we had a delay before watching funding txs.
val herdDelay_opt = nodeParams.channelConf.maxRestartWatchDelay.toSeconds match {
case maxRestartWatchDelay if maxRestartWatchDelay > 0 => Some((1 + Random.nextLong(maxRestartWatchDelay)).seconds)
case _ => None
}
data match {
case _: ChannelDataWithoutCommitments => ()
case data: ChannelDataWithCommitments => data.commitments.all.foreach { commitment =>
Expand All @@ -271,24 +277,24 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
// once, because at the next restore, the status of the funding tx will be "confirmed".
()
}
watchFundingConfirmed(commitment.fundingTxId, Some(singleFundingMinDepth(data)))
watchFundingConfirmed(commitment.fundingTxId, Some(singleFundingMinDepth(data)), herdDelay_opt)
case fundingTx: LocalFundingStatus.DualFundedUnconfirmedFundingTx =>
publishFundingTx(fundingTx)
val minDepth_opt = data.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, fundingTx.sharedTx.tx)
watchFundingConfirmed(fundingTx.sharedTx.txId, minDepth_opt)
watchFundingConfirmed(fundingTx.sharedTx.txId, minDepth_opt, herdDelay_opt)
case fundingTx: LocalFundingStatus.ZeroconfPublishedFundingTx =>
// those are zero-conf channels, the min-depth isn't critical, we use the default
watchFundingConfirmed(fundingTx.tx.txid, Some(nodeParams.channelConf.minDepthBlocks.toLong))
watchFundingConfirmed(fundingTx.tx.txid, Some(nodeParams.channelConf.minDepthBlocks.toLong), herdDelay_opt)
case _: LocalFundingStatus.ConfirmedFundingTx =>
data match {
case closing: DATA_CLOSING if Closing.nothingAtStake(closing) || Closing.isClosingTypeAlreadyKnown(closing).isDefined =>
// no need to do anything
()
case closing: DATA_CLOSING =>
// in all other cases we need to be ready for any type of closing
watchFundingSpent(commitment, closing.spendingTxs.map(_.txid).toSet)
watchFundingSpent(commitment, closing.spendingTxs.map(_.txid).toSet, herdDelay_opt)
case _ =>
watchFundingSpent(commitment)
watchFundingSpent(commitment, Set.empty, herdDelay_opt)
}
}
}
Expand Down Expand Up @@ -564,7 +570,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
// That's why we move on immediately to the next step, and will update our unsigned funding tx when we
// receive their tx_sigs.
val minDepth_opt = d.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt, delay_opt = None)
val commitments1 = d.commitments.add(signingSession1.commitment)
val d1 = d.copy(commitments = commitments1, spliceStatus = SpliceStatus.NoSplice)
stay() using d1 storing() sending signingSession1.localSigs calling endQuiescence(d1)
Expand Down Expand Up @@ -1078,7 +1084,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
stay() using d.copy(spliceStatus = SpliceStatus.SpliceAborted) sending TxAbort(d.channelId, f.getMessage)
case Right(signingSession1) =>
val minDepth_opt = d.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt, delay_opt = None)
val commitments1 = d.commitments.add(signingSession1.commitment)
val d1 = d.copy(commitments = commitments1, spliceStatus = SpliceStatus.NoSplice)
log.info("publishing funding tx for channelId={} fundingTxId={}", d.channelId, signingSession1.fundingTx.sharedTx.txId)
Expand All @@ -1095,7 +1101,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx)
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks))
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None)
maybeEmitEventsPostSplice(d.shortIds, d.commitments, commitments1)
stay() using d.copy(commitments = commitments1) storing() sending SpliceLocked(d.channelId, w.tx.hash)
case Left(_) => stay()
Expand Down Expand Up @@ -2163,7 +2169,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
log.info(s"zero-conf funding txid=${w.tx.txid} has been published")
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks))
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None)
val d1 = d match {
// NB: we discard remote's stashed channel_ready, they will send it back at reconnection
case d: DATA_WAIT_FOR_FUNDING_CONFIRMED =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
// That's why we move on immediately to the next step, and will update our unsigned funding tx when we
// receive their tx_sigs.
val minDepth_opt = d.channelParams.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx)
watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt)
watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt, delay_opt = None)
val commitments = Commitments(
params = d.channelParams,
changes = CommitmentChanges.init(),
Expand All @@ -403,7 +403,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
goto(CLOSED) sending Error(d.channelId, f.getMessage)
case Right(signingSession) =>
val minDepth_opt = d.channelParams.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession.fundingTx.sharedTx.tx)
watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt)
watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt, delay_opt = None)
val commitments = Commitments(
params = d.channelParams,
changes = CommitmentChanges.init(),
Expand Down Expand Up @@ -468,7 +468,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
stay() using d.copy(rbfStatus = RbfStatus.RbfAborted) sending TxAbort(d.channelId, f.getMessage)
case Right(signingSession1) =>
val minDepth_opt = d.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt, delay_opt = None)
val commitments1 = d.commitments.add(signingSession1.commitment)
val d1 = DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(commitments1, d.localPushAmount, d.remotePushAmount, d.waitingSince, d.lastChecked, RbfStatus.NoRbf, d.deferred)
stay() using d1 storing() sending signingSession1.localSigs calling publishFundingTx(signingSession1.fundingTx)
Expand Down Expand Up @@ -615,7 +615,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
stay() using d.copy(rbfStatus = RbfStatus.RbfWaitingForSigs(signingSession1))
case signingSession1: InteractiveTxSigningSession.SendingSigs =>
val minDepth_opt = d.commitments.params.minDepthDualFunding(nodeParams.channelConf.minDepthBlocks, signingSession1.fundingTx.sharedTx.tx)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt)
watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt, delay_opt = None)
val commitments1 = d.commitments.add(signingSession1.commitment)
val d1 = DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(commitments1, d.localPushAmount, d.remotePushAmount, d.waitingSince, d.lastChecked, RbfStatus.NoRbf, d.deferred)
stay() using d1 storing() sending signingSession1.localSigs
Expand Down Expand Up @@ -677,7 +677,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
// we still watch the funding tx for confirmation even if we can use the zero-conf channel right away
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks))
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None)
val realScidStatus = RealScidStatus.Unknown
val shortIds = createShortIds(d.channelId, realScidStatus)
val channelReady = createChannelReady(shortIds, d.commitments.params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers {
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
// NB: we don't send a ChannelSignatureSent for the first commit
log.info(s"waiting for them to publish the funding tx for channelId=$channelId fundingTxid=${commitment.fundingTxId}")
watchFundingConfirmed(commitment.fundingTxId, params.minDepthFundee(nodeParams.channelConf.minDepthBlocks, fundingAmount))
watchFundingConfirmed(commitment.fundingTxId, params.minDepthFundee(nodeParams.channelConf.minDepthBlocks, fundingAmount), delay_opt = None)
goto(WAIT_FOR_FUNDING_CONFIRMED) using DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, nodeParams.currentBlockHeight, None, Right(fundingSigned)) storing() sending fundingSigned
}
}
Expand Down Expand Up @@ -340,7 +340,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers {
val blockHeight = nodeParams.currentBlockHeight
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
log.info(s"publishing funding tx fundingTxid=${commitment.fundingTxId}")
watchFundingConfirmed(commitment.fundingTxId, params.minDepthFunder)
watchFundingConfirmed(commitment.fundingTxId, params.minDepthFunder, delay_opt = None)
// we will publish the funding tx only after the channel state has been written to disk because we want to
// make sure we first persist the commitment that returns back the funds to us in case of problem
goto(WAIT_FOR_FUNDING_CONFIRMED) using DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, blockHeight, None, Left(fundingCreated)) storing() calling publishFundingTx(d.channelId, fundingTx, fundingTxFee, d.replyTo)
Expand Down Expand Up @@ -394,7 +394,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers {
case Right((commitments1, _)) =>
log.info("funding txid={} was successfully published for zero-conf channelId={}", w.tx.txid, d.channelId)
// we still watch the funding tx for confirmation even if we can use the zero-conf channel right away
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks))
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None)
val realScidStatus = RealScidStatus.Unknown
val shortIds = createShortIds(d.channelId, realScidStatus)
val channelReady = createChannelReady(shortIds, d.commitments.params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package fr.acinq.eclair.channel.fsm

import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.actor.typed.scaladsl.adapter.{TypedActorRefOps, actorRefAdapter}
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.ScriptFlags
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Transaction}
Expand All @@ -29,7 +29,7 @@ import fr.acinq.eclair.channel.fsm.Channel.{ANNOUNCEMENTS_MINCONF, BroadcastChan
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelReady, ChannelReadyTlv, TlvStream}

import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.{Failure, Success, Try}

/**
Expand All @@ -40,17 +40,25 @@ trait CommonFundingHandlers extends CommonHandlers {

this: Channel =>

def watchFundingSpent(commitment: Commitment, additionalKnownSpendingTxs: Set[ByteVector32] = Set.empty): Unit = {
def watchFundingSpent(commitment: Commitment, additionalKnownSpendingTxs: Set[ByteVector32], delay_opt: Option[FiniteDuration]): Unit = {
val knownSpendingTxs = Set(commitment.localCommit.commitTxAndRemoteSig.commitTx.tx.txid, commitment.remoteCommit.txid) ++ commitment.nextRemoteCommit_opt.map(_.commit.txid).toSet ++ additionalKnownSpendingTxs
blockchain ! WatchFundingSpent(self, commitment.commitInput.outPoint.txid, commitment.commitInput.outPoint.index.toInt, knownSpendingTxs)
val watch = WatchFundingSpent(self, commitment.commitInput.outPoint.txid, commitment.commitInput.outPoint.index.toInt, knownSpendingTxs)
delay_opt match {
case Some(delay) => context.system.scheduler.scheduleOnce(delay, blockchain.toClassic, watch)
case None => blockchain ! watch
}
}

def watchFundingConfirmed(fundingTxId: ByteVector32, minDepth_opt: Option[Long]): Unit = {
minDepth_opt match {
case Some(fundingMinDepth) => blockchain ! WatchFundingConfirmed(self, fundingTxId, fundingMinDepth)
def watchFundingConfirmed(fundingTxId: ByteVector32, minDepth_opt: Option[Long], delay_opt: Option[FiniteDuration]): Unit = {
val watch = minDepth_opt match {
case Some(fundingMinDepth) => WatchFundingConfirmed(self, fundingTxId, fundingMinDepth)
// When using 0-conf, we make sure that the transaction was successfully published, otherwise there is a risk
// of accidentally double-spending it later (e.g. restarting bitcoind would remove the utxo locks).
case None => blockchain ! WatchPublished(self, fundingTxId)
case None => WatchPublished(self, fundingTxId)
}
delay_opt match {
case Some(delay) => context.system.scheduler.scheduleOnce(delay, blockchain.toClassic, watch)
case None => blockchain ! watch
}
}

Expand All @@ -73,7 +81,7 @@ trait CommonFundingHandlers extends CommonHandlers {
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus).map {
case (commitments1, commitment) =>
// first of all, we watch the funding tx that is now confirmed
watchFundingSpent(commitment)
watchFundingSpent(commitment, Set.empty, None)
// in the dual-funding case we can forget all other transactions, they have been double spent by the tx that just confirmed
rollbackDualFundingTxs(d.commitments.active // note how we use the unpruned original commitments
.filter(c => c.fundingTxIndex == commitment.fundingTxIndex && c.fundingTxId != commitment.fundingTxId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ object TestConstants {
maxExpiryDelta = CltvExpiryDelta(2016),
fulfillSafetyBeforeTimeout = CltvExpiryDelta(6),
minFinalExpiryDelta = CltvExpiryDelta(18),
maxRestartWatchDelay = 0 millis,
maxBlockProcessingDelay = 10 millis,
maxTxPublishRetryDelay = 10 millis,
htlcMinimum = 0 msat,
Expand Down Expand Up @@ -274,6 +275,7 @@ object TestConstants {
maxExpiryDelta = CltvExpiryDelta(2016),
fulfillSafetyBeforeTimeout = CltvExpiryDelta(6),
minFinalExpiryDelta = CltvExpiryDelta(18),
maxRestartWatchDelay = 5 millis,
maxBlockProcessingDelay = 10 millis,
maxTxPublishRetryDelay = 10 millis,
htlcMinimum = 1000 msat,
Expand Down

0 comments on commit 0eb0524

Please sign in to comment.