From 86470793a0c74cb309c7b33dbb563a46ef624787 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Tue, 3 Oct 2023 01:58:03 +0200 Subject: [PATCH 01/12] [WIP] Basic structure for UTXOSetScanner --- .../nodeView/ErgoNodeViewHolder.scala | 1 + .../nodeView/wallet/UTXOSetScanner.scala | 61 +++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 src/main/scala/org/ergoplatform/nodeView/wallet/UTXOSetScanner.scala diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala index fdcc3bb17f..c41eea03cd 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala @@ -287,6 +287,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti history().createPersistentProver(store, history(), height, blockId) match { case Success(pp) => log.info(s"Restoring state from prover with digest ${pp.digest} reconstructed for height $height") + // initiate scan history().onUtxoSnapshotApplied(height) val newState = new UtxoState(pp, version = VersionTag @@@ blockId, store, settings) updateNodeView(updatedState = Some(newState.asInstanceOf[State])) diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/UTXOSetScanner.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/UTXOSetScanner.scala new file mode 100644 index 0000000000..63ef122b3b --- /dev/null +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/UTXOSetScanner.scala @@ -0,0 +1,61 @@ +package org.ergoplatform.nodeView.wallet + +import akka.actor.{Actor, Props} +import org.ergoplatform.ErgoBox +import org.ergoplatform.nodeView.state.UtxoState +import org.ergoplatform.nodeView.wallet.UTXOSetScanner._ +import org.ergoplatform.wallet.boxes.ErgoBoxSerializer +import scorex.crypto.authds.avltree.batch.{InternalProverNode, ProverLeaf, ProverNodes} +import scorex.crypto.hash.Digest32 +import scorex.util.{ScorexLogging, bytesToId} + +import scala.annotation.tailrec +import scala.util.{Failure, Success} + +class UTXOSetScanner extends Actor with ScorexLogging { + + private def scanBox(box: ErgoBox): Unit = { + log.info(s"Scanning box ${bytesToId(box.id)}") + // perform scan + } + + private def run(state: UtxoState): Unit = { + + @tailrec + def walk(rNode: ProverNodes[Digest32], ir: Int): Int = { + rNode match { + case leaf: ProverLeaf[Digest32] => + leafFn(leaf, ir) + + case r: InternalProverNode[Digest32] => + val i = internalNodeFn(r, ir) + walk(i._1, i._2) + } + } + + def internalNodeFn(r: InternalProverNode[Digest32], depth: Int): (ProverNodes[Digest32],Int) = + (r.left, walk(r.right, depth + 1)) + + def leafFn(leaf: ProverLeaf[Digest32], depth: Int): Int = { + ErgoBoxSerializer.parseBytesTry(leaf.value) match { + case Success(box) => scanBox(box) + case Failure(e) => log.error(s"Failed to parse box from state, $e") + } + depth + 1 + } + + state.persistentProver.avlProver.treeWalk(internalNodeFn, leafFn, 0) + } + + override def receive: Receive = { + case StartScan(state: UtxoState) => run(state) + } + +} + +object UTXOSetScanner { + + case class StartScan(state: UtxoState) + + def props(): Props = Props(new UTXOSetScanner()) +} From db56a77a4fe3cb098573ee8143fef1ad714801dc Mon Sep 17 00:00:00 2001 From: jellymlg Date: Thu, 2 Nov 2023 01:43:05 +0100 Subject: [PATCH 02/12] [WIP] Reworked UTXOSetScanner, added messaging --- src/main/scala/org/ergoplatform/ErgoApp.scala | 4 +- .../nodeView/ErgoNodeViewHolder.scala | 3 +- .../nodeView/history/ErgoHistory.scala | 2 + .../nodeView/history/UTXOSetScanner.scala | 106 ++++++++++++++++++ .../UtxoSetSnapshotProcessor.scala | 9 +- .../nodeView/wallet/UTXOSetScanner.scala | 61 ---------- 6 files changed, 118 insertions(+), 67 deletions(-) create mode 100644 src/main/scala/org/ergoplatform/nodeView/history/UTXOSetScanner.scala delete mode 100644 src/main/scala/org/ergoplatform/nodeView/wallet/UTXOSetScanner.scala diff --git a/src/main/scala/org/ergoplatform/ErgoApp.scala b/src/main/scala/org/ergoplatform/ErgoApp.scala index 88021c75b2..b546d265e5 100644 --- a/src/main/scala/org/ergoplatform/ErgoApp.scala +++ b/src/main/scala/org/ergoplatform/ErgoApp.scala @@ -10,7 +10,7 @@ import org.ergoplatform.local._ import org.ergoplatform.mining.ErgoMiner import org.ergoplatform.mining.ErgoMiner.StartMining import org.ergoplatform.network.{ErgoNodeViewSynchronizer, ErgoSyncTracker} -import org.ergoplatform.nodeView.history.ErgoSyncInfoMessageSpec +import org.ergoplatform.nodeView.history.{ErgoSyncInfoMessageSpec, UTXOSetScanner} import org.ergoplatform.nodeView.history.extra.ExtraIndexer import org.ergoplatform.nodeView.{ErgoNodeViewRef, ErgoReadersHolderRef} import org.ergoplatform.settings.{Args, ErgoSettings, NetworkType} @@ -116,6 +116,8 @@ class ErgoApp(args: Args) extends ScorexLogging { // Create an instance of ExtraIndexer actor (will start if "extraIndex = true" in config) private val indexer: ActorRef = ExtraIndexer(ergoSettings.chainSettings, ergoSettings.cacheSettings) + private val UtxoSetScanner: ActorRef = UTXOSetScanner() + private val syncTracker = ErgoSyncTracker(scorexSettings.network) private val deliveryTracker: DeliveryTracker = DeliveryTracker.empty(ergoSettings) diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala index c41eea03cd..d09d836384 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala @@ -28,6 +28,7 @@ import spire.syntax.all.cfor import java.io.File import org.ergoplatform.modifiers.history.extension.Extension +import org.ergoplatform.nodeView.history.UTXOSetScanner.{InitializeScan, Start, StartScan} import scala.annotation.tailrec import scala.collection.mutable @@ -287,7 +288,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti history().createPersistentProver(store, history(), height, blockId) match { case Success(pp) => log.info(s"Restoring state from prover with digest ${pp.digest} reconstructed for height $height") - // initiate scan + context.system.eventStream.publish(StartScan()) history().onUtxoSnapshotApplied(height) val newState = new UtxoState(pp, version = VersionTag @@@ blockId, store, settings) updateNodeView(updatedState = Some(newState.asInstanceOf[State])) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala index dc63c806d3..7a45caf548 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala @@ -12,6 +12,7 @@ import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.S import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{IndexedHeightKey, NewestVersion, NewestVersionBytes, SchemaVersionKey, getIndex} import org.ergoplatform.nodeView.history.storage.HistoryStorage import org.ergoplatform.nodeView.history.storage.modifierprocessors._ +import UTXOSetScanner.InitializeUTXOSetScanner import org.ergoplatform.settings._ import org.ergoplatform.utils.LoggingUtil import scorex.core.consensus.ProgressInfo @@ -337,6 +338,7 @@ object ErgoHistory extends ScorexLogging { log.info("History database read") if(ergoSettings.nodeSettings.extraIndex) // start extra indexer, if enabled context.system.eventStream.publish(StartExtraIndexer(history)) + context.system.eventStream.publish(InitializeUTXOSetScanner(history)) history } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/UTXOSetScanner.scala b/src/main/scala/org/ergoplatform/nodeView/history/UTXOSetScanner.scala new file mode 100644 index 0000000000..da1ac59eae --- /dev/null +++ b/src/main/scala/org/ergoplatform/nodeView/history/UTXOSetScanner.scala @@ -0,0 +1,106 @@ +package org.ergoplatform.nodeView.history + +import akka.actor.{Actor, ActorRef, ActorSystem, Props} +import com.google.common.primitives.Ints +import org.ergoplatform.ErgoBox +import org.ergoplatform.modifiers.BlockSection +import org.ergoplatform.nodeView.history.UTXOSetScanner._ +import org.ergoplatform.nodeView.history.storage.HistoryStorage +import org.ergoplatform.wallet.boxes.ErgoBoxSerializer +import scorex.core.serialization.SubtreeSerializer +import scorex.crypto.authds.avltree.batch.Constants +import scorex.crypto.authds.avltree.batch.serialization.BatchAVLProverSubtree +import scorex.crypto.hash.Blake2b256 +import scorex.db.ByteArrayWrapper +import scorex.util.{ScorexLogging, bytesToId} + +import java.nio.ByteBuffer +import scala.util.{Failure, Success} + + +class UTXOSetScanner() extends Actor with ScorexLogging { + + private var history: ErgoHistory = _ + private def historyStorage: HistoryStorage = history.historyStorage + + private def readProgress(): (Int, Int) = + historyStorage.getIndex(utxoSetScanProgressKey).map(ByteBuffer.wrap).map { buffer => + val current = buffer.getInt + val total = buffer.getInt + (current, total) + }.getOrElse((0, 0)) + + private def saveProgress(current: Int, total: Int): Unit = { + val buffer: ByteBuffer = ByteBuffer.allocate(8) + buffer.putInt(current) + buffer.putInt(total) + historyStorage.insert(Array((utxoSetScanProgressKey,buffer.array)), Array.empty[BlockSection]) + } + + private def scanBox(box: ErgoBox, current: Int, total: Int): Unit = { + log.info(s"Scanning box ${bytesToId(box.id)} in chunk $current / $total") + // TODO perform scan + } + + private def run(): Unit = { + var (current, total) = readProgress() + downloadedChunksIterator(historyStorage, current, total).foreach { subtree => + subtree.leafValues.foreach { leaf => + ErgoBoxSerializer.parseBytesTry(leaf) match { + case Success(box) => scanBox(box, current, total) + case Failure(e) => log.error(s"Failed to parse box from prover leaf: $e") + } + } + current += 1 + saveProgress(current, total) + } + if(current == total) + history.removeUtxoSnapshotChunks() + } + + override def receive: Receive = { + case InitializeUTXOSetScanner(history: ErgoHistory) => + this.history = history + if(readProgress()._2 != 0) + run() + case StartScan() => + run() + } + + override def preStart(): Unit = { + context.system.eventStream.subscribe(self, classOf[InitializeUTXOSetScanner]) + } + +} + +object UTXOSetScanner { + + case class InitializeUTXOSetScanner(history: ErgoHistory) + + case class StartScan() + + private val downloadedChunksPrefix = Blake2b256.hash("downloaded chunk").drop(4) + + private def chunkIdFromIndex(index: Int): Array[Byte] = { + val idxBytes = Ints.toByteArray(index) + downloadedChunksPrefix ++ idxBytes + } + + private def downloadedChunkIdsIterator(from: Int, to: Int): Iterator[Array[Byte]] = { + Iterator.range(from, to).map(chunkIdFromIndex) + } + + def downloadedChunksIterator(historyStorage: HistoryStorage, from: Int, to: Int): Iterator[BatchAVLProverSubtree[Constants.DigestType]] = { + downloadedChunkIdsIterator(from, to).flatMap { chunkId => + historyStorage + .get(chunkId) + .flatMap(bs => SubtreeSerializer.parseBytesTry(bs).toOption) + } + } + + private val utxoSetScanProgressKey: ByteArrayWrapper = + ByteArrayWrapper(Blake2b256.hash("scanned chunk")) + + def apply()(implicit system: ActorSystem): ActorRef = + system.actorOf(Props.create(classOf[UTXOSetScanner])) +} diff --git a/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/UtxoSetSnapshotProcessor.scala b/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/UtxoSetSnapshotProcessor.scala index 66f614da64..52dc82b26b 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/UtxoSetSnapshotProcessor.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/UtxoSetSnapshotProcessor.scala @@ -58,7 +58,11 @@ trait UtxoSetSnapshotProcessor extends MinimalFullBlockHeightFunctions with Scor _cachedDownloadPlan.map(_.latestUpdateTime).getOrElse(0L) - _cachedDownloadPlan.map(_.createdTime).getOrElse(0L) } log.info(s"UTXO set downloading and application time: ${utxoPhaseTime / 1000.0} s.") - // remove downloaded utxo set snapshots chunks + // set height of first full block to be downloaded + writeMinimalFullBlockHeight(height + 1) + } + + def removeUtxoSnapshotChunks(): Unit = { val ts0 = System.currentTimeMillis() _cachedDownloadPlan.foreach { plan => val chunkIdsToRemove = downloadedChunkIdsIterator(plan.totalChunks) @@ -70,9 +74,6 @@ trait UtxoSetSnapshotProcessor extends MinimalFullBlockHeightFunctions with Scor _cachedDownloadPlan = None val ts = System.currentTimeMillis() log.info(s"Imported UTXO set snapshots chunks removed in ${ts - ts0} ms") - - // set height of first full block to be downloaded - writeMinimalFullBlockHeight(height + 1) } private def updateUtxoSetSnashotDownloadPlan(plan: UtxoSetSnapshotDownloadPlan): Unit = { diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/UTXOSetScanner.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/UTXOSetScanner.scala deleted file mode 100644 index 63ef122b3b..0000000000 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/UTXOSetScanner.scala +++ /dev/null @@ -1,61 +0,0 @@ -package org.ergoplatform.nodeView.wallet - -import akka.actor.{Actor, Props} -import org.ergoplatform.ErgoBox -import org.ergoplatform.nodeView.state.UtxoState -import org.ergoplatform.nodeView.wallet.UTXOSetScanner._ -import org.ergoplatform.wallet.boxes.ErgoBoxSerializer -import scorex.crypto.authds.avltree.batch.{InternalProverNode, ProverLeaf, ProverNodes} -import scorex.crypto.hash.Digest32 -import scorex.util.{ScorexLogging, bytesToId} - -import scala.annotation.tailrec -import scala.util.{Failure, Success} - -class UTXOSetScanner extends Actor with ScorexLogging { - - private def scanBox(box: ErgoBox): Unit = { - log.info(s"Scanning box ${bytesToId(box.id)}") - // perform scan - } - - private def run(state: UtxoState): Unit = { - - @tailrec - def walk(rNode: ProverNodes[Digest32], ir: Int): Int = { - rNode match { - case leaf: ProverLeaf[Digest32] => - leafFn(leaf, ir) - - case r: InternalProverNode[Digest32] => - val i = internalNodeFn(r, ir) - walk(i._1, i._2) - } - } - - def internalNodeFn(r: InternalProverNode[Digest32], depth: Int): (ProverNodes[Digest32],Int) = - (r.left, walk(r.right, depth + 1)) - - def leafFn(leaf: ProverLeaf[Digest32], depth: Int): Int = { - ErgoBoxSerializer.parseBytesTry(leaf.value) match { - case Success(box) => scanBox(box) - case Failure(e) => log.error(s"Failed to parse box from state, $e") - } - depth + 1 - } - - state.persistentProver.avlProver.treeWalk(internalNodeFn, leafFn, 0) - } - - override def receive: Receive = { - case StartScan(state: UtxoState) => run(state) - } - -} - -object UTXOSetScanner { - - case class StartScan(state: UtxoState) - - def props(): Props = Props(new UTXOSetScanner()) -} From baa3c77e0fb23652b08f5175c4ba99f3a22837f0 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Thu, 2 Nov 2023 23:35:08 +0100 Subject: [PATCH 03/12] [WIP] Rename and smaller refactors --- src/main/scala/org/ergoplatform/ErgoApp.scala | 4 +- .../nodeView/ErgoNodeViewHolder.scala | 9 +- .../nodeView/history/ErgoHistory.scala | 2 +- ...canner.scala => UTXOSnapshotScanner.scala} | 29 +++-- .../nodeView/wallet/WalletScanLogic.scala | 108 +++++++++--------- 5 files changed, 82 insertions(+), 70 deletions(-) rename src/main/scala/org/ergoplatform/nodeView/history/{UTXOSetScanner.scala => UTXOSnapshotScanner.scala} (78%) diff --git a/src/main/scala/org/ergoplatform/ErgoApp.scala b/src/main/scala/org/ergoplatform/ErgoApp.scala index b546d265e5..afa7707ad1 100644 --- a/src/main/scala/org/ergoplatform/ErgoApp.scala +++ b/src/main/scala/org/ergoplatform/ErgoApp.scala @@ -10,7 +10,7 @@ import org.ergoplatform.local._ import org.ergoplatform.mining.ErgoMiner import org.ergoplatform.mining.ErgoMiner.StartMining import org.ergoplatform.network.{ErgoNodeViewSynchronizer, ErgoSyncTracker} -import org.ergoplatform.nodeView.history.{ErgoSyncInfoMessageSpec, UTXOSetScanner} +import org.ergoplatform.nodeView.history.{ErgoSyncInfoMessageSpec, UTXOSnapshotScanner} import org.ergoplatform.nodeView.history.extra.ExtraIndexer import org.ergoplatform.nodeView.{ErgoNodeViewRef, ErgoReadersHolderRef} import org.ergoplatform.settings.{Args, ErgoSettings, NetworkType} @@ -116,7 +116,7 @@ class ErgoApp(args: Args) extends ScorexLogging { // Create an instance of ExtraIndexer actor (will start if "extraIndex = true" in config) private val indexer: ActorRef = ExtraIndexer(ergoSettings.chainSettings, ergoSettings.cacheSettings) - private val UtxoSetScanner: ActorRef = UTXOSetScanner() + UTXOSnapshotScanner() private val syncTracker = ErgoSyncTracker(scorexSettings.network) diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala index d09d836384..0044358946 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala @@ -28,7 +28,7 @@ import spire.syntax.all.cfor import java.io.File import org.ergoplatform.modifiers.history.extension.Extension -import org.ergoplatform.nodeView.history.UTXOSetScanner.{InitializeScan, Start, StartScan} +import org.ergoplatform.nodeView.history.UTXOSnapshotScanner.StartUtxoSetSnapshotScan import scala.annotation.tailrec import scala.collection.mutable @@ -288,7 +288,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti history().createPersistentProver(store, history(), height, blockId) match { case Success(pp) => log.info(s"Restoring state from prover with digest ${pp.digest} reconstructed for height $height") - context.system.eventStream.publish(StartScan()) + context.system.eventStream.publish(StartUtxoSetSnapshotScan()) history().onUtxoSnapshotApplied(height) val newState = new UtxoState(pp, version = VersionTag @@@ blockId, store, settings) updateNodeView(updatedState = Some(newState.asInstanceOf[State])) @@ -511,7 +511,10 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti v } - if (almostSynced) { + // dont update wallet if we are bootstrapping using UTXO set snapshot + // when UTXOSetScanner finishes scanning the snapshot it will start wallet scan + val usingSnapshot = settings.nodeSettings.utxoSettings.utxoBootstrap + if (almostSynced && !usingSnapshot) { blocksApplied.foreach(newVault.scanPersistent) } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala index 7a45caf548..258497b75e 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala @@ -12,7 +12,7 @@ import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.S import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{IndexedHeightKey, NewestVersion, NewestVersionBytes, SchemaVersionKey, getIndex} import org.ergoplatform.nodeView.history.storage.HistoryStorage import org.ergoplatform.nodeView.history.storage.modifierprocessors._ -import UTXOSetScanner.InitializeUTXOSetScanner +import UTXOSnapshotScanner.InitializeUTXOSetScanner import org.ergoplatform.settings._ import org.ergoplatform.utils.LoggingUtil import scorex.core.consensus.ProgressInfo diff --git a/src/main/scala/org/ergoplatform/nodeView/history/UTXOSetScanner.scala b/src/main/scala/org/ergoplatform/nodeView/history/UTXOSnapshotScanner.scala similarity index 78% rename from src/main/scala/org/ergoplatform/nodeView/history/UTXOSetScanner.scala rename to src/main/scala/org/ergoplatform/nodeView/history/UTXOSnapshotScanner.scala index da1ac59eae..719d036e62 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/UTXOSetScanner.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/UTXOSnapshotScanner.scala @@ -4,7 +4,7 @@ import akka.actor.{Actor, ActorRef, ActorSystem, Props} import com.google.common.primitives.Ints import org.ergoplatform.ErgoBox import org.ergoplatform.modifiers.BlockSection -import org.ergoplatform.nodeView.history.UTXOSetScanner._ +import org.ergoplatform.nodeView.history.UTXOSnapshotScanner._ import org.ergoplatform.nodeView.history.storage.HistoryStorage import org.ergoplatform.wallet.boxes.ErgoBoxSerializer import scorex.core.serialization.SubtreeSerializer @@ -18,7 +18,7 @@ import java.nio.ByteBuffer import scala.util.{Failure, Success} -class UTXOSetScanner() extends Actor with ScorexLogging { +class UTXOSnapshotScanner() extends Actor with ScorexLogging { private var history: ErgoHistory = _ private def historyStorage: HistoryStorage = history.historyStorage @@ -38,46 +38,51 @@ class UTXOSetScanner() extends Actor with ScorexLogging { } private def scanBox(box: ErgoBox, current: Int, total: Int): Unit = { - log.info(s"Scanning box ${bytesToId(box.id)} in chunk $current / $total") - // TODO perform scan + //filterWalletOutput(box, Some(box.creationHeight), null, None) + log.info(s"Scanned box ${bytesToId(box.id)} at height ${box.creationHeight} in chunk $current / $total") } private def run(): Unit = { var (current, total) = readProgress() + if(total == 0) return + log.info(s"Starting UTXO set snapshot scan for $total chunks") downloadedChunksIterator(historyStorage, current, total).foreach { subtree => subtree.leafValues.foreach { leaf => ErgoBoxSerializer.parseBytesTry(leaf) match { case Success(box) => scanBox(box, current, total) - case Failure(e) => log.error(s"Failed to parse box from prover leaf: $e") + case Failure(e) => log.error(s"Failed to parse box from snapshot chunk $current / $total: $e") } } current += 1 saveProgress(current, total) } - if(current == total) + if(current == total) { history.removeUtxoSnapshotChunks() + saveProgress(0, 0) + log.info(s"Successfully scanned $total UTXO set snapshot chunks") + } } override def receive: Receive = { case InitializeUTXOSetScanner(history: ErgoHistory) => this.history = history - if(readProgress()._2 != 0) - run() - case StartScan() => + run() + case StartUtxoSetSnapshotScan() => run() } override def preStart(): Unit = { context.system.eventStream.subscribe(self, classOf[InitializeUTXOSetScanner]) + context.system.eventStream.subscribe(self, classOf[StartUtxoSetSnapshotScan]) } } -object UTXOSetScanner { +object UTXOSnapshotScanner { case class InitializeUTXOSetScanner(history: ErgoHistory) - case class StartScan() + case class StartUtxoSetSnapshotScan() private val downloadedChunksPrefix = Blake2b256.hash("downloaded chunk").drop(4) @@ -102,5 +107,5 @@ object UTXOSetScanner { ByteArrayWrapper(Blake2b256.hash("scanned chunk")) def apply()(implicit system: ActorSystem): ActorRef = - system.actorOf(Props.create(classOf[UTXOSetScanner])) + system.actorOf(Props.create(classOf[UTXOSnapshotScanner])) } diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala index 68eb204d9c..6719650951 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala @@ -1,6 +1,7 @@ package org.ergoplatform.nodeView.wallet import com.google.common.hash.BloomFilter +import org.ergoplatform.ErgoBox import org.ergoplatform.modifiers.ErgoFullBlock import org.ergoplatform.modifiers.mempool.ErgoTransaction import org.ergoplatform.nodeView.wallet.IdUtils.{EncodedBoxId, encodedBoxId} @@ -183,79 +184,82 @@ object WalletScanLogic extends ScorexLogging { def extractWalletOutputs(tx: ErgoTransaction, inclusionHeight: Option[Int], walletVars: WalletVars, - dustLimit: Option[Long]): Seq[TrackedBox] = { + dustLimit: Option[Long]): Seq[TrackedBox] = + tx.outputs.flatMap(filterWalletOutput(_, inclusionHeight, walletVars, dustLimit)) + + def filterWalletOutput(box: ErgoBox, + inclusionHeight: Option[Int], + walletVars: WalletVars, + dustLimit: Option[Long]): Option[TrackedBox] = { val trackedBytes: Seq[Array[Byte]] = walletVars.trackedBytes val miningScriptsBytes: Seq[Array[Byte]] = walletVars.miningScriptsBytes val externalScans: Seq[Scan] = walletVars.externalScans - tx.outputs.flatMap { bx => - - // First, we check apps triggered by the tx output - val appsTriggered = - externalScans - .filter(_.trackingRule.filter(bx)) - .map(app => app.scanId -> app.walletInteraction) + // First, we check apps triggered by the tx output + val appsTriggered = + externalScans + .filter(_.trackingRule.filter(box)) + .map(app => app.scanId -> app.walletInteraction) - val boxScript = bx.propositionBytes + val boxScript = box.propositionBytes - // then check whether Bloom filter built on top of payment & mining scripts of the p2pk-wallet - val statuses: Set[ScanId] = if (walletVars.scriptsFilter.mightContain(boxScript)) { + // then check whether Bloom filter built on top of payment & mining scripts of the p2pk-wallet + val statuses: Set[ScanId] = if (walletVars.scriptsFilter.mightContain(boxScript)) { - // first, we are checking mining script - val miningIncomeTriggered = miningScriptsBytes.exists(ms => boxScript.sameElements(ms)) + // first, we are checking mining script + val miningIncomeTriggered = miningScriptsBytes.exists(ms => boxScript.sameElements(ms)) - val prePaymentStatuses = if (miningIncomeTriggered) { - val miningStatus: (ScanId, ScanWalletInteraction.Value) = if (walletVars.settings.miningRewardDelay > 0) { - MiningScanId -> ScanWalletInteraction.Off // scripts are different, so off is kinda overkill - } else { - //tweak for tests - PaymentsScanId -> ScanWalletInteraction.Off - } - appsTriggered :+ miningStatus + val prePaymentStatuses = if (miningIncomeTriggered) { + val miningStatus: (ScanId, ScanWalletInteraction.Value) = if (walletVars.settings.miningRewardDelay > 0) { + MiningScanId -> ScanWalletInteraction.Off // scripts are different, so off is kinda overkill } else { - appsTriggered + //tweak for tests + PaymentsScanId -> ScanWalletInteraction.Off } + appsTriggered :+ miningStatus + } else { + appsTriggered + } - if (prePaymentStatuses.nonEmpty && - !prePaymentStatuses.exists(t => ScanWalletInteraction.interactingWithWallet(t._2))) { - // if other scans intercept the box, and the scans are not sharing the box, - // then the box is not being tracked by the p2pk-wallet - prePaymentStatuses.map(_._1).toSet - } else { - //check whether payment is triggered (Bloom filter has false positives) - val paymentsTriggered = trackedBytes.exists(bs => boxScript.sameElements(bs)) - - val otherIds = prePaymentStatuses.map(_._1).toSet - if (paymentsTriggered) { - Set(PaymentsScanId) ++ otherIds - } else { - otherIds - } - } + if (prePaymentStatuses.nonEmpty && + !prePaymentStatuses.exists(t => ScanWalletInteraction.interactingWithWallet(t._2))) { + // if other scans intercept the box, and the scans are not sharing the box, + // then the box is not being tracked by the p2pk-wallet + prePaymentStatuses.map(_._1).toSet } else { - val appScans = appsTriggered.map(_._1).toSet + //check whether payment is triggered (Bloom filter has false positives) + val paymentsTriggered = trackedBytes.exists(bs => boxScript.sameElements(bs)) - // Add p2pk-wallet if there's a scan enforcing that - if (appsTriggered.exists(_._2 == ScanWalletInteraction.Forced)) { - appScans ++ Set(PaymentsScanId) + val otherIds = prePaymentStatuses.map(_._1).toSet + if (paymentsTriggered) { + Set(PaymentsScanId) ++ otherIds } else { - appScans + otherIds } } + } else { + val appScans = appsTriggered.map(_._1).toSet - if (statuses.nonEmpty) { - if (dustLimit.exists(bx.value <= _)){ - // filter out boxes with value that is considered dust - None - } else { - val tb = TrackedBox(tx.id, bx.index, inclusionHeight, None, None, bx, statuses) - log.debug("New tracked box: " + tb.boxId, " scans: " + tb.scans) - Some(tb) - } + // Add p2pk-wallet if there's a scan enforcing that + if (appsTriggered.exists(_._2 == ScanWalletInteraction.Forced)) { + appScans ++ Set(PaymentsScanId) } else { + appScans + } + } + + if (statuses.nonEmpty) { + if (dustLimit.exists(box.value <= _)) { + // filter out boxes with value that is considered dust None + } else { + val tb = TrackedBox(box.transactionId, box.index, inclusionHeight, None, None, box, statuses) + log.debug("New tracked box: " + tb.boxId, " scans: " + tb.scans) + Some(tb) } + } else { + None } } From ad67eafc2065c1ddf0d13a9e3c997fee96557d65 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Mon, 6 Nov 2023 02:34:41 +0100 Subject: [PATCH 04/12] Utxo snapshot scan fully working --- src/main/scala/org/ergoplatform/ErgoApp.scala | 4 +- .../nodeView/ErgoNodeViewHolder.scala | 21 ++- .../nodeView/history/ErgoHistory.scala | 14 +- .../history/UTXOSnapshotScanner.scala | 111 ------------ .../history/UtxoSnapshotScanner.scala | 163 ++++++++++++++++++ .../nodeView/wallet/ErgoWallet.scala | 48 +++++- .../nodeView/wallet/ErgoWalletActor.scala | 23 ++- .../nodeView/wallet/ErgoWalletReader.scala | 2 +- .../nodeView/wallet/ErgoWalletService.scala | 26 +++ .../nodeView/wallet/ErgoWalletState.scala | 4 +- .../nodeView/wallet/WalletScanLogic.scala | 51 +++++- .../wallet/persistence/WalletRegistry.scala | 3 +- 12 files changed, 327 insertions(+), 143 deletions(-) delete mode 100644 src/main/scala/org/ergoplatform/nodeView/history/UTXOSnapshotScanner.scala create mode 100644 src/main/scala/org/ergoplatform/nodeView/history/UtxoSnapshotScanner.scala diff --git a/src/main/scala/org/ergoplatform/ErgoApp.scala b/src/main/scala/org/ergoplatform/ErgoApp.scala index afa7707ad1..4f156fc5fc 100644 --- a/src/main/scala/org/ergoplatform/ErgoApp.scala +++ b/src/main/scala/org/ergoplatform/ErgoApp.scala @@ -10,7 +10,7 @@ import org.ergoplatform.local._ import org.ergoplatform.mining.ErgoMiner import org.ergoplatform.mining.ErgoMiner.StartMining import org.ergoplatform.network.{ErgoNodeViewSynchronizer, ErgoSyncTracker} -import org.ergoplatform.nodeView.history.{ErgoSyncInfoMessageSpec, UTXOSnapshotScanner} +import org.ergoplatform.nodeView.history.{ErgoSyncInfoMessageSpec, UtxoSnapshotScanner} import org.ergoplatform.nodeView.history.extra.ExtraIndexer import org.ergoplatform.nodeView.{ErgoNodeViewRef, ErgoReadersHolderRef} import org.ergoplatform.settings.{Args, ErgoSettings, NetworkType} @@ -116,7 +116,7 @@ class ErgoApp(args: Args) extends ScorexLogging { // Create an instance of ExtraIndexer actor (will start if "extraIndex = true" in config) private val indexer: ActorRef = ExtraIndexer(ergoSettings.chainSettings, ergoSettings.cacheSettings) - UTXOSnapshotScanner() + UtxoSnapshotScanner(nodeViewHolderRef) private val syncTracker = ErgoSyncTracker(scorexSettings.network) diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala index 0044358946..8079948ac2 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala @@ -2,7 +2,7 @@ package org.ergoplatform.nodeView import akka.actor.SupervisorStrategy.Escalate import akka.actor.{Actor, ActorRef, ActorSystem, OneForOneStrategy, Props} -import org.ergoplatform.ErgoApp +import org.ergoplatform.{ErgoApp, ErgoBox} import org.ergoplatform.ErgoApp.CriticalSystemException import org.ergoplatform.modifiers.history.header.Header import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction} @@ -28,10 +28,12 @@ import spire.syntax.all.cfor import java.io.File import org.ergoplatform.modifiers.history.extension.Extension -import org.ergoplatform.nodeView.history.UTXOSnapshotScanner.StartUtxoSetSnapshotScan +import org.ergoplatform.nodeView.history.UtxoSnapshotScanner.InitializeUtxoSetScannerWithSnapshot +import org.ergoplatform.nodeView.wallet.ErgoWalletActor.ScanBoxesFromUtxoSnapshot import scala.annotation.tailrec import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} /** @@ -288,7 +290,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti history().createPersistentProver(store, history(), height, blockId) match { case Success(pp) => log.info(s"Restoring state from prover with digest ${pp.digest} reconstructed for height $height") - context.system.eventStream.publish(StartUtxoSetSnapshotScan()) + context.system.eventStream.publish(InitializeUtxoSetScannerWithSnapshot()) history().onUtxoSnapshotApplied(height) val newState = new UtxoState(pp, version = VersionTag @@@ blockId, store, settings) updateNodeView(updatedState = Some(newState.asInstanceOf[State])) @@ -511,10 +513,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti v } - // dont update wallet if we are bootstrapping using UTXO set snapshot - // when UTXOSetScanner finishes scanning the snapshot it will start wallet scan - val usingSnapshot = settings.nodeSettings.utxoSettings.utxoBootstrap - if (almostSynced && !usingSnapshot) { + if (almostSynced) { blocksApplied.foreach(newVault.scanPersistent) } @@ -697,6 +696,11 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti sender() ! healthCheckReply } + private def proxyUtxoSetScan: Receive = { + case ScanBoxesFromUtxoSnapshot(chunks: ArrayBuffer[(ModifierId,Array[ErgoBox])], current: Int, total: Int) => + sender() ! vault().scanUtxoSnapshot(ScanBoxesFromUtxoSnapshot(chunks, current, total)) + } + override def receive: Receive = processRemoteModifiers orElse processLocallyGeneratedModifiers orElse @@ -704,7 +708,8 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti getCurrentInfo orElse getNodeViewChanges orElse processStateSnapshot orElse - handleHealthCheck orElse { + handleHealthCheck orElse + proxyUtxoSetScan orElse { case a: Any => log.error("Strange input: " + a) } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala index 258497b75e..47cc7050a2 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala @@ -8,11 +8,11 @@ import org.ergoplatform.mining.AutolykosPowScheme import org.ergoplatform.modifiers.history._ import org.ergoplatform.modifiers.history.header.{Header, PreGenesisHeader} import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock, NonHeaderBlockSection} +import org.ergoplatform.nodeView.history.UtxoSnapshotScanner.InitializeUtxoSetScannerWithHistory import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.StartExtraIndexer import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{IndexedHeightKey, NewestVersion, NewestVersionBytes, SchemaVersionKey, getIndex} import org.ergoplatform.nodeView.history.storage.HistoryStorage import org.ergoplatform.nodeView.history.storage.modifierprocessors._ -import UTXOSnapshotScanner.InitializeUTXOSetScanner import org.ergoplatform.settings._ import org.ergoplatform.utils.LoggingUtil import scorex.core.consensus.ProgressInfo @@ -336,9 +336,17 @@ object ErgoHistory extends ScorexLogging { } log.info("History database read") - if(ergoSettings.nodeSettings.extraIndex) // start extra indexer, if enabled + + // start extra indexer, if enabled + if(ergoSettings.nodeSettings.extraIndex) { context.system.eventStream.publish(StartExtraIndexer(history)) - context.system.eventStream.publish(InitializeUTXOSetScanner(history)) + } + + // set history for snapshot scanner, if bootstrapping by snapshot + if(ergoSettings.nodeSettings.utxoSettings.utxoBootstrap) { + context.system.eventStream.publish(InitializeUtxoSetScannerWithHistory(history)) + } + history } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/UTXOSnapshotScanner.scala b/src/main/scala/org/ergoplatform/nodeView/history/UTXOSnapshotScanner.scala deleted file mode 100644 index 719d036e62..0000000000 --- a/src/main/scala/org/ergoplatform/nodeView/history/UTXOSnapshotScanner.scala +++ /dev/null @@ -1,111 +0,0 @@ -package org.ergoplatform.nodeView.history - -import akka.actor.{Actor, ActorRef, ActorSystem, Props} -import com.google.common.primitives.Ints -import org.ergoplatform.ErgoBox -import org.ergoplatform.modifiers.BlockSection -import org.ergoplatform.nodeView.history.UTXOSnapshotScanner._ -import org.ergoplatform.nodeView.history.storage.HistoryStorage -import org.ergoplatform.wallet.boxes.ErgoBoxSerializer -import scorex.core.serialization.SubtreeSerializer -import scorex.crypto.authds.avltree.batch.Constants -import scorex.crypto.authds.avltree.batch.serialization.BatchAVLProverSubtree -import scorex.crypto.hash.Blake2b256 -import scorex.db.ByteArrayWrapper -import scorex.util.{ScorexLogging, bytesToId} - -import java.nio.ByteBuffer -import scala.util.{Failure, Success} - - -class UTXOSnapshotScanner() extends Actor with ScorexLogging { - - private var history: ErgoHistory = _ - private def historyStorage: HistoryStorage = history.historyStorage - - private def readProgress(): (Int, Int) = - historyStorage.getIndex(utxoSetScanProgressKey).map(ByteBuffer.wrap).map { buffer => - val current = buffer.getInt - val total = buffer.getInt - (current, total) - }.getOrElse((0, 0)) - - private def saveProgress(current: Int, total: Int): Unit = { - val buffer: ByteBuffer = ByteBuffer.allocate(8) - buffer.putInt(current) - buffer.putInt(total) - historyStorage.insert(Array((utxoSetScanProgressKey,buffer.array)), Array.empty[BlockSection]) - } - - private def scanBox(box: ErgoBox, current: Int, total: Int): Unit = { - //filterWalletOutput(box, Some(box.creationHeight), null, None) - log.info(s"Scanned box ${bytesToId(box.id)} at height ${box.creationHeight} in chunk $current / $total") - } - - private def run(): Unit = { - var (current, total) = readProgress() - if(total == 0) return - log.info(s"Starting UTXO set snapshot scan for $total chunks") - downloadedChunksIterator(historyStorage, current, total).foreach { subtree => - subtree.leafValues.foreach { leaf => - ErgoBoxSerializer.parseBytesTry(leaf) match { - case Success(box) => scanBox(box, current, total) - case Failure(e) => log.error(s"Failed to parse box from snapshot chunk $current / $total: $e") - } - } - current += 1 - saveProgress(current, total) - } - if(current == total) { - history.removeUtxoSnapshotChunks() - saveProgress(0, 0) - log.info(s"Successfully scanned $total UTXO set snapshot chunks") - } - } - - override def receive: Receive = { - case InitializeUTXOSetScanner(history: ErgoHistory) => - this.history = history - run() - case StartUtxoSetSnapshotScan() => - run() - } - - override def preStart(): Unit = { - context.system.eventStream.subscribe(self, classOf[InitializeUTXOSetScanner]) - context.system.eventStream.subscribe(self, classOf[StartUtxoSetSnapshotScan]) - } - -} - -object UTXOSnapshotScanner { - - case class InitializeUTXOSetScanner(history: ErgoHistory) - - case class StartUtxoSetSnapshotScan() - - private val downloadedChunksPrefix = Blake2b256.hash("downloaded chunk").drop(4) - - private def chunkIdFromIndex(index: Int): Array[Byte] = { - val idxBytes = Ints.toByteArray(index) - downloadedChunksPrefix ++ idxBytes - } - - private def downloadedChunkIdsIterator(from: Int, to: Int): Iterator[Array[Byte]] = { - Iterator.range(from, to).map(chunkIdFromIndex) - } - - def downloadedChunksIterator(historyStorage: HistoryStorage, from: Int, to: Int): Iterator[BatchAVLProverSubtree[Constants.DigestType]] = { - downloadedChunkIdsIterator(from, to).flatMap { chunkId => - historyStorage - .get(chunkId) - .flatMap(bs => SubtreeSerializer.parseBytesTry(bs).toOption) - } - } - - private val utxoSetScanProgressKey: ByteArrayWrapper = - ByteArrayWrapper(Blake2b256.hash("scanned chunk")) - - def apply()(implicit system: ActorSystem): ActorRef = - system.actorOf(Props.create(classOf[UTXOSnapshotScanner])) -} diff --git a/src/main/scala/org/ergoplatform/nodeView/history/UtxoSnapshotScanner.scala b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSnapshotScanner.scala new file mode 100644 index 0000000000..bf1987fcb2 --- /dev/null +++ b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSnapshotScanner.scala @@ -0,0 +1,163 @@ +package org.ergoplatform.nodeView.history + +import akka.actor.{Actor, ActorRef, ActorSystem, Props} +import akka.pattern.ask +import akka.util.Timeout +import com.google.common.primitives.Ints +import org.ergoplatform.ErgoBox +import org.ergoplatform.modifiers.BlockSection +import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.GetDataFromCurrentView +import org.ergoplatform.nodeView.history.UtxoSnapshotScanner._ +import org.ergoplatform.nodeView.history.storage.HistoryStorage +import org.ergoplatform.nodeView.state.UtxoState +import org.ergoplatform.nodeView.wallet.ErgoWallet +import org.ergoplatform.nodeView.wallet.ErgoWalletActor.ScanBoxesFromUtxoSnapshot +import org.ergoplatform.wallet.boxes.ErgoBoxSerializer +import scorex.core.serialization.SubtreeSerializer +import scorex.crypto.authds.avltree.batch.Constants +import scorex.crypto.authds.avltree.batch.serialization.BatchAVLProverSubtree +import scorex.crypto.hash.Blake2b256 +import scorex.db.ByteArrayWrapper +import scorex.util.{ModifierId, ScorexLogging, bytesToId} + +import java.nio.ByteBuffer +import java.util.concurrent.TimeUnit +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Await +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration + +class UtxoSnapshotScanner(nodeView: ActorRef) extends Actor with ScorexLogging { + + private var history: ErgoHistory = _ + private def historyStorage: HistoryStorage = history.historyStorage + + private implicit val timeout: Timeout = Timeout(10, TimeUnit.SECONDS) + private implicit val duration: Duration = Duration.create(10, TimeUnit.SECONDS) + + private val chunkBuffer: ArrayBuffer[(ModifierId,Array[ErgoBox])] = ArrayBuffer.empty[(ModifierId,Array[ErgoBox])] + + private def readProgress(): (Int, Int) = + historyStorage.getIndex(utxoSetScanProgressKey).map(ByteBuffer.wrap).map { buffer => + val current = buffer.getInt + val total = buffer.getInt + (current, total) + }.getOrElse((0, 0)) + + private def writeProgress(current: Int, total: Int): Unit = { + val buffer: ByteBuffer = ByteBuffer.allocate(8) + buffer.putInt(current) + buffer.putInt(total) + historyStorage.insert(Array((utxoSetScanProgressKey, buffer.array)), Array.empty[BlockSection]) + } + + private def sendBufferToWallet(current: Int, total: Int): Unit = { + Await.result(nodeView ? ScanBoxesFromUtxoSnapshot(chunkBuffer, current, total), duration) + writeProgress(current, total) + chunkBuffer.clear() + } + + private def run(): Unit = { + + var (current, total) = readProgress() + if(total == 0) return + + val initialized = Await.result( + (nodeView ? GetDataFromCurrentView[UtxoState, ErgoWallet](_.vault)) + .mapTo[ErgoWallet] + .flatMap(_.getReader.getWalletStatus) + .map(_.initialized), + duration + ) + if(!initialized) return + + writeProgress(current, total) + + log.info(s"Starting UTXO set snapshot scan for $total chunks") + + downloadedChunksIterator(historyStorage, current, total).foreach { subtree => + current += 1 + + chunkBuffer += (( + bytesToId(subtree.id), + subtree.leafValues.par.flatMap(ErgoBoxSerializer.parseBytesTry(_).toOption).toArray + )) + + if(chunkBuffer.size == 32) { + sendBufferToWallet(current, total) + } + } + + // flush remaining data, if any + if(chunkBuffer.nonEmpty) { + sendBufferToWallet(current, total) + } + + if(current == total) { + log.info(s"Successfully scanned $total UTXO set snapshot chunks") + history.removeUtxoSnapshotChunks() + writeProgress(0, 0) + // start wallet scan with first available block + val firstBlock = history.bestFullBlockAt(history.readMinimalFullBlockHeight()).get + Await.result( + (nodeView ? GetDataFromCurrentView[UtxoState, ErgoWallet](_.vault)) + .mapTo[ErgoWallet].map(_.scanPersistent(firstBlock)), + duration + ) + } + + } + + override def receive: Receive = { + case InitializeUtxoSetScannerWithHistory(history: ErgoHistory) => + this.history = history + case InitializeUtxoSetScannerWithSnapshot() => + val total = history.utxoSetSnapshotDownloadPlan().map(_.totalChunks).getOrElse(0) + writeProgress(0, total) + case StartUtxoSetSnapshotScan() => + run() + } + + override def preStart(): Unit = { + context.system.eventStream.subscribe(self, classOf[InitializeUtxoSetScannerWithHistory]) + context.system.eventStream.subscribe(self, classOf[InitializeUtxoSetScannerWithSnapshot]) + context.system.eventStream.subscribe(self, classOf[StartUtxoSetSnapshotScan]) + } + +} + +object UtxoSnapshotScanner { + + case class InitializeUtxoSetScannerWithHistory(history: ErgoHistory) + + case class InitializeUtxoSetScannerWithSnapshot() + + case class StartUtxoSetSnapshotScan() + + private val downloadedChunksPrefix = Blake2b256.hash("downloaded chunk").drop(4) + + private def chunkIdFromIndex(index: Int): Array[Byte] = { + val idxBytes = Ints.toByteArray(index) + downloadedChunksPrefix ++ idxBytes + } + + private def downloadedChunkIdsIterator(from: Int, to: Int): Iterator[Array[Byte]] = { + Iterator.range(from, to).map(chunkIdFromIndex) + } + + def downloadedChunksIterator(historyStorage: HistoryStorage, + from: Int, + to: Int): Iterator[BatchAVLProverSubtree[Constants.DigestType]] = { + downloadedChunkIdsIterator(from, to).flatMap { chunkId => + historyStorage + .get(chunkId) + .flatMap(bs => SubtreeSerializer.parseBytesTry(bs).toOption) + } + } + + private val utxoSetScanProgressKey: ByteArrayWrapper = + ByteArrayWrapper(Blake2b256.hash("scanned chunk")) + + def apply(nodeView: ActorRef)(implicit system: ActorSystem): ActorRef = + system.actorOf(Props.create(classOf[UtxoSnapshotScanner], nodeView)) +} diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala index 5971ff24ff..65bd134aeb 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala @@ -1,9 +1,11 @@ package org.ergoplatform.nodeView.wallet import akka.actor.{ActorRef, ActorSystem} +import akka.pattern.ask import org.ergoplatform.modifiers.mempool.ErgoTransaction -import org.ergoplatform.modifiers.{ErgoFullBlock, BlockSection} +import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock} import org.ergoplatform.nodeView.history.ErgoHistoryReader +import org.ergoplatform.nodeView.history.UtxoSnapshotScanner.StartUtxoSetSnapshotScan import org.ergoplatform.nodeView.state.ErgoState import org.ergoplatform.nodeView.wallet.ErgoWalletActor._ import org.ergoplatform.settings.{ErgoSettings, Parameters} @@ -11,6 +13,9 @@ import org.ergoplatform.wallet.boxes.{ReemissionData, ReplaceCompactCollectBoxSe import scorex.core.VersionTag import scorex.util.ScorexLogging +import java.util.concurrent.TimeUnit +import scala.concurrent.Await +import scala.concurrent.duration.Duration import scala.util.{Failure, Success, Try} class ErgoWallet(historyReader: ErgoHistoryReader, settings: ErgoSettings, parameters: Parameters) @@ -36,6 +41,17 @@ class ErgoWallet(historyReader: ErgoHistoryReader, settings: ErgoSettings, param override val walletActor: ActorRef = ErgoWalletActor(settings, parameters, new ErgoWalletServiceImpl(settings), boxSelector, historyReader) + private val duration: Duration = Duration.create(10, TimeUnit.SECONDS) + + private var isUtxoSnapshotScannerRunning: Boolean = false + private var isUtxoSnapshotScannerStarted: Boolean = false + + def scanUtxoSnapshot(msg: ScanBoxesFromUtxoSnapshot): ErgoWallet = { + isUtxoSnapshotScannerRunning = msg.current < msg.total + Await.result(walletActor ? msg, duration) + this + } + def scanOffchain(tx: ErgoTransaction): ErgoWallet = { walletActor ! ScanOffChain(tx) this @@ -47,12 +63,30 @@ class ErgoWallet(historyReader: ErgoHistoryReader, settings: ErgoSettings, param } def scanPersistent(modifier: BlockSection): ErgoWallet = { - modifier match { - case fb: ErgoFullBlock => - walletActor ! ScanOnChain(fb) - case _ => - log.debug("Not full block in ErgoWallet.scanPersistent, which could be the case only if " + - "state = digest when bootstrapping") + + def isUtxoBootStrapping: Boolean = // height is kept at 0 while the scan is running + settings.nodeSettings.utxoSettings.utxoBootstrap && Await.result(getWalletStatus, duration).height == 0 + val shouldStart: Boolean = !isUtxoSnapshotScannerRunning && !isUtxoSnapshotScannerStarted + + + if(isUtxoSnapshotScannerRunning || // scan already running, dont process blocks + (shouldStart && isUtxoBootStrapping)) { // scan not running, start scanner + + if(!isUtxoSnapshotScannerStarted) { + actorSystem.eventStream.publish(StartUtxoSetSnapshotScan()) + isUtxoSnapshotScannerStarted = true + isUtxoSnapshotScannerRunning = true + } + + }else { + isUtxoSnapshotScannerStarted = true // this prevents getWalletStatus getting called every time + modifier match { + case fb: ErgoFullBlock => + walletActor ! ScanOnChain(fb) + case _ => + log.debug("Not full block in ErgoWallet.scanPersistent, which could be the case only if " + + "state = digest when bootstrapping") + } } this } diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala index 3c1594e091..f98b4d1f16 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala @@ -27,6 +27,7 @@ import scorex.util.{ModifierId, ScorexLogging} import sigmastate.Values.SigmaBoolean import sigmastate.crypto.DLogProtocol.{DLogProverInput, ProveDlog} +import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} @@ -241,9 +242,25 @@ class ErgoWalletActor(settings: ErgoSettings, ) context.become(loadedWallet(newState)) + case ScanBoxesFromUtxoSnapshot(chunks: ArrayBuffer[(ModifierId,Array[ErgoBox])], current: Int, total: Int) => + val newState = chunks.zipWithIndex.foldLeft(state) { case (accState, ((id, boxes), i)) => + val chunk = current - chunks.size + i + 1 + // last scanned chunk sets wallet height to the first available block + ergoWalletService.scanSnapshotChunk(accState, boxes, id, settings.walletSettings.dustLimit) match { + case Failure(ex) => + val errorMsg = s"Failed to scan ${boxes.length} boxes in chunk $chunk / $total: ${ex.getMessage}" + accState.copy(error = Some(errorMsg)) + case Success(updatedState) => + log.info(s"Successfully scanned ${boxes.length} boxes in chunk $chunk / $total") + updatedState + } + } + context.become(loadedWallet(newState)) + sender() ! "ok" + // rescan=true means we serve a user request for rescan from arbitrary height case ScanInThePast(blockHeight, rescan) => - val nextBlockHeight = state.expectedNextBlockHeight(blockHeight, settings.nodeSettings.isFullBlocksPruned) + val nextBlockHeight = state.expectedNextBlockHeight(historyReader.readMinimalFullBlockHeight(), settings.nodeSettings.isFullBlocksPruned) if (nextBlockHeight == blockHeight || rescan) { val newState = historyReader.bestFullBlockAt(blockHeight) match { @@ -273,7 +290,7 @@ class ErgoWalletActor(settings: ErgoSettings, //scan block transactions case ScanOnChain(newBlock) => if (state.secretIsSet(settings.walletSettings.testMnemonic)) { // scan blocks only if wallet is initialized - val nextBlockHeight = state.expectedNextBlockHeight(newBlock.height, settings.nodeSettings.isFullBlocksPruned) + val nextBlockHeight = state.expectedNextBlockHeight(historyReader.readMinimalFullBlockHeight(), settings.nodeSettings.isFullBlocksPruned) if (nextBlockHeight == newBlock.height) { log.info(s"Wallet is going to scan a block ${newBlock.id} on chain at height ${newBlock.height}") val newState = @@ -543,6 +560,8 @@ object ErgoWalletActor extends ScorexLogging { // Publicly available signals for the wallet actor + final case class ScanBoxesFromUtxoSnapshot(chunks: ArrayBuffer[(ModifierId,Array[ErgoBox])], current: Int, total: Int) + /** * Command to scan offchain transaction * diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletReader.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletReader.scala index fd2d38ba76..02e2042a95 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletReader.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletReader.scala @@ -30,7 +30,7 @@ trait ErgoWalletReader extends NodeViewComponent { val walletActor: ActorRef - private implicit val timeout: Timeout = Timeout(60, TimeUnit.SECONDS) + protected implicit val timeout: Timeout = Timeout(60, TimeUnit.SECONDS) /** Returns the Future generated mnemonic phrase. * @param pass storage encription password diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletService.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletService.scala index 7ff6ab2d72..dc7b869082 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletService.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletService.scala @@ -221,6 +221,16 @@ trait ErgoWalletService { */ def scanBlockUpdate(state: ErgoWalletState, block: ErgoFullBlock, dustLimit: Option[Long]): Try[ErgoWalletState] + /** + * + * @param state + * @param boxes + * @param subtreeId + * @param dustLimit + * @return + */ + def scanSnapshotChunk(state: ErgoWalletState, boxes: Array[ErgoBox], subtreeId: ModifierId, dustLimit: Option[Long]): Try[ErgoWalletState] + /** * Sign a transaction */ @@ -596,6 +606,22 @@ class ErgoWalletServiceImpl(override val ergoSettings: ErgoSettings) extends Erg state.copy(registry = reg, offChainRegistry = offReg, outputsFilter = Some(updatedOutputsFilter)) } + override def scanSnapshotChunk(state: ErgoWalletState, + boxes: Array[ErgoBox], + subtreeId: ModifierId, + dustLimit: Option[Long]): Try[ErgoWalletState] = + WalletScanLogic.scanSnapshotBoxes( + state.registry, + state.offChainRegistry, + state.walletVars, + boxes, + subtreeId, + state.outputsFilter, + dustLimit, + ergoSettings.walletSettings.walletProfile).map { case (reg, offReg, updatedOutputsFilter) => + state.copy(registry = reg, offChainRegistry = offReg, outputsFilter = Some(updatedOutputsFilter)) + } + override def updateUtxoState(state: ErgoWalletState): ErgoWalletState = { (state.mempoolReaderOpt, state.stateReaderOpt) match { case (Some(mr), Some(sr)) => diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletState.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletState.scala index 049893830f..70b2455785 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletState.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletState.scala @@ -101,7 +101,7 @@ case class ErgoWalletState( } // expected height of a next block when the wallet is receiving a new block with the height blockHeight - def expectedNextBlockHeight(blockHeight: Height, isFullBlocksPruned: Boolean): Height = { + def expectedNextBlockHeight(minimalFullBlockHeight: Height, isFullBlocksPruned: Boolean): Height = { val walletHeight = getWalletHeight if (!isFullBlocksPruned) { // Node has all the full blocks and applies them sequentially @@ -109,7 +109,7 @@ case class ErgoWalletState( } else { // Node has pruned blockchain if (walletHeight == 0) { - blockHeight // todo: should be height of first non-pruned block + minimalFullBlockHeight // height of first non-pruned block } else { walletHeight + 1 } diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala index 6719650951..359e7682fd 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala @@ -166,7 +166,48 @@ object WalletScanLogic extends ScorexLogging { } // function effects: updating registry and offchainRegistry datasets - registry.updateOnBlock(scanRes, blockId, height) + updateRegistryAndOffchain(registry, offChainRegistry, outputsFilter, scanRes, blockId, height) + } + + def scanSnapshotBoxes(registry: WalletRegistry, + offChainRegistry: OffChainRegistry, + walletVars: WalletVars, + boxes: Array[ErgoBox], + subtreeId: ModifierId, + cachedOutputsFilter: Option[BloomFilter[Array[Byte]]], + dustLimit: Option[Long], + walletProfile: WalletProfile): Try[(WalletRegistry, OffChainRegistry, BloomFilter[Array[Byte]])] = { + + // Take unspent wallet outputs Bloom Filter from cache + // or recreate it from unspent outputs stored in the database + val outputsFilter = cachedOutputsFilter.getOrElse { + val bf = WalletCache.emptyFilter(walletProfile.outputsFilterSize) + registry.allUnspentBoxes().foreach(tb => bf.put(tb.box.id)) + bf + } + + // extract wallet- (and external scans) related outputs + val myOutputs = boxes.flatMap { box => + filterWalletOutput(box, Some(box.creationHeight), walletVars, dustLimit) + } + + // add extracted outputs to the filter + myOutputs.foreach { out => + outputsFilter.put(out.box.id) + } + + val scanRes = ScanResults(myOutputs, Seq.empty, Seq.empty) + + updateRegistryAndOffchain(registry, offChainRegistry, outputsFilter, scanRes, subtreeId, 0) + } + + def updateRegistryAndOffchain(registry: WalletRegistry, + offChainRegistry: OffChainRegistry, + outputsFilter: BloomFilter[Array[Byte]], + scanRes: ScanResults, + versionId: ModifierId, + height: Int): Try[(WalletRegistry, OffChainRegistry, BloomFilter[Array[Byte]])] = + registry.updateOnBlock(scanRes, versionId, height) .map { _ => //data needed to update the offchain-registry val walletUnspent = registry.walletUnspentBoxes() @@ -175,8 +216,6 @@ object WalletScanLogic extends ScorexLogging { (registry, updatedOffchainRegistry, outputsFilter) } - } - /** * Extracts all outputs which contain tracked bytes from the given transaction. @@ -188,9 +227,9 @@ object WalletScanLogic extends ScorexLogging { tx.outputs.flatMap(filterWalletOutput(_, inclusionHeight, walletVars, dustLimit)) def filterWalletOutput(box: ErgoBox, - inclusionHeight: Option[Int], - walletVars: WalletVars, - dustLimit: Option[Long]): Option[TrackedBox] = { + inclusionHeight: Option[Int], + walletVars: WalletVars, + dustLimit: Option[Long]): Option[TrackedBox] = { val trackedBytes: Seq[Array[Byte]] = walletVars.trackedBytes val miningScriptsBytes: Seq[Array[Byte]] = walletVars.miningScriptsBytes diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/persistence/WalletRegistry.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/persistence/WalletRegistry.scala index 1dc3753b5d..2d63248887 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/persistence/WalletRegistry.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/persistence/WalletRegistry.scala @@ -269,7 +269,8 @@ class WalletRegistry(private val store: LDBVersionedStore)(ws: WalletSettings) e // and update wallet digest updateDigest(bag3) { case WalletDigest(height, wBalance, wTokensSeq) => - if (height + 1 != blockHeight) { + val isUtxoSnapshotScan = height == 0 && blockHeight == 0 + if (height + 1 != blockHeight && !isUtxoSnapshotScan) { log.error(s"Blocks were skipped during wallet scanning, from $height until $blockHeight") } val spentWalletBoxes = spentBoxesWithTx.map(_._2).filter(_.scans.contains(PaymentsScanId)) From 5326239b33734d8069fcf6126526038de1702ef9 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Tue, 7 Nov 2023 02:25:51 +0100 Subject: [PATCH 05/12] [WIP] Refactor to use utxo set --- .../batch/VersionedLDBAVLStorage.scala | 37 +++++++++++ .../nodeView/ErgoNodeViewHolder.scala | 18 ++---- .../history/UtxoSnapshotScanner.scala | 62 +++++-------------- .../UtxoSetSnapshotProcessor.scala | 7 +-- .../nodeView/wallet/ErgoWallet.scala | 19 +----- 5 files changed, 62 insertions(+), 81 deletions(-) diff --git a/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala b/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala index 2d541569e7..f3ad158fff 100644 --- a/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala +++ b/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala @@ -1,6 +1,7 @@ package scorex.crypto.authds.avltree.batch import com.google.common.primitives.Ints +import scorex.core.serialization.{ManifestSerializer, SubtreeSerializer} import scorex.crypto.authds.avltree.batch.Constants.{DigestType, HashFnType, hashFn} import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{topNodeHashKey, topNodeHeightKey} import scorex.crypto.authds.avltree.batch.serialization.{BatchAVLProverManifest, BatchAVLProverSubtree, ProxyInternalNode} @@ -153,6 +154,42 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore) rootNodeLabel } } + + def iterateAVLTree(handleSubtree: BatchAVLProverSubtree[DigestType] => Unit): Unit = + store.processSnapshot { dbReader => + + def subtreeLoop(label: Array[Byte], builder: mutable.ArrayBuilder[Byte]): Unit = { + val nodeBytes = dbReader.get(label) + builder ++= nodeBytes + VersionedLDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes) match { + case in: ProxyInternalNode[DigestType] => + subtreeLoop(Digest32 @@@ in.leftLabel, builder) + subtreeLoop(Digest32 @@@ in.rightLabel, builder) + case _ => + } + } + + def processSubtree(label: Array[Byte]): BatchAVLProverSubtree[DigestType] = { + val builder = mutable.ArrayBuilder.make[Byte]() + builder.sizeHint(200000) + subtreeLoop(label, builder) + SubtreeSerializer.parseBytes(builder.result()) + } + + def proxyLoop(label: Array[Byte], level: Int): Unit = + VersionedLDBAVLStorage.noStoreSerializer.parseBytes(dbReader.get(label)) match { + case in: ProxyInternalNode[DigestType] if level == ManifestSerializer.MainnetManifestDepth => + handleSubtree(processSubtree(in.leftLabel)) + handleSubtree(processSubtree(in.rightLabel)) + case in: ProxyInternalNode[DigestType] => + proxyLoop(in.leftLabel, level + 1) + proxyLoop(in.rightLabel, level + 1) + case _ => + } + + proxyLoop(dbReader.get(topNodeHashKey), 1) + + } } diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala index 8079948ac2..8d6995c2d8 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala @@ -2,7 +2,7 @@ package org.ergoplatform.nodeView import akka.actor.SupervisorStrategy.Escalate import akka.actor.{Actor, ActorRef, ActorSystem, OneForOneStrategy, Props} -import org.ergoplatform.{ErgoApp, ErgoBox} +import org.ergoplatform.ErgoApp import org.ergoplatform.ErgoApp.CriticalSystemException import org.ergoplatform.modifiers.history.header.Header import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction} @@ -28,12 +28,10 @@ import spire.syntax.all.cfor import java.io.File import org.ergoplatform.modifiers.history.extension.Extension -import org.ergoplatform.nodeView.history.UtxoSnapshotScanner.InitializeUtxoSetScannerWithSnapshot -import org.ergoplatform.nodeView.wallet.ErgoWalletActor.ScanBoxesFromUtxoSnapshot +import org.ergoplatform.nodeView.history.UtxoSnapshotScanner.StartUtxoSetSnapshotScan import scala.annotation.tailrec import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} /** @@ -290,7 +288,6 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti history().createPersistentProver(store, history(), height, blockId) match { case Success(pp) => log.info(s"Restoring state from prover with digest ${pp.digest} reconstructed for height $height") - context.system.eventStream.publish(InitializeUtxoSetScannerWithSnapshot()) history().onUtxoSnapshotApplied(height) val newState = new UtxoState(pp, version = VersionTag @@@ blockId, store, settings) updateNodeView(updatedState = Some(newState.asInstanceOf[State])) @@ -524,6 +521,9 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti case utxoStateReader: UtxoStateReader if headersHeight == fullBlockHeight => val recheckCommand = RecheckMempool(utxoStateReader, newMemPool) context.system.eventStream.publish(recheckCommand) + if(settings.nodeSettings.utxoSettings.utxoBootstrap) { + context.system.eventStream.publish(StartUtxoSetSnapshotScan()) + } case _ => } @@ -696,11 +696,6 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti sender() ! healthCheckReply } - private def proxyUtxoSetScan: Receive = { - case ScanBoxesFromUtxoSnapshot(chunks: ArrayBuffer[(ModifierId,Array[ErgoBox])], current: Int, total: Int) => - sender() ! vault().scanUtxoSnapshot(ScanBoxesFromUtxoSnapshot(chunks, current, total)) - } - override def receive: Receive = processRemoteModifiers orElse processLocallyGeneratedModifiers orElse @@ -708,8 +703,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti getCurrentInfo orElse getNodeViewChanges orElse processStateSnapshot orElse - handleHealthCheck orElse - proxyUtxoSetScan orElse { + handleHealthCheck orElse { case a: Any => log.error("Strange input: " + a) } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/UtxoSnapshotScanner.scala b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSnapshotScanner.scala index bf1987fcb2..a44549b365 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/UtxoSnapshotScanner.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSnapshotScanner.scala @@ -3,7 +3,6 @@ package org.ergoplatform.nodeView.history import akka.actor.{Actor, ActorRef, ActorSystem, Props} import akka.pattern.ask import akka.util.Timeout -import com.google.common.primitives.Ints import org.ergoplatform.ErgoBox import org.ergoplatform.modifiers.BlockSection import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.GetDataFromCurrentView @@ -13,9 +12,8 @@ import org.ergoplatform.nodeView.state.UtxoState import org.ergoplatform.nodeView.wallet.ErgoWallet import org.ergoplatform.nodeView.wallet.ErgoWalletActor.ScanBoxesFromUtxoSnapshot import org.ergoplatform.wallet.boxes.ErgoBoxSerializer -import scorex.core.serialization.SubtreeSerializer -import scorex.crypto.authds.avltree.batch.Constants -import scorex.crypto.authds.avltree.batch.serialization.BatchAVLProverSubtree +import scorex.core.serialization.ManifestSerializer +import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage import scorex.crypto.hash.Blake2b256 import scorex.db.ByteArrayWrapper import scorex.util.{ModifierId, ScorexLogging, bytesToId} @@ -51,8 +49,8 @@ class UtxoSnapshotScanner(nodeView: ActorRef) extends Actor with ScorexLogging { historyStorage.insert(Array((utxoSetScanProgressKey, buffer.array)), Array.empty[BlockSection]) } - private def sendBufferToWallet(current: Int, total: Int): Unit = { - Await.result(nodeView ? ScanBoxesFromUtxoSnapshot(chunkBuffer, current, total), duration) + private def sendBufferToWallet(wallet: ErgoWallet, current: Int, total: Int): Unit = { + wallet.scanUtxoSnapshot(ScanBoxesFromUtxoSnapshot(chunkBuffer, current, total)) writeProgress(current, total) chunkBuffer.clear() } @@ -60,22 +58,22 @@ class UtxoSnapshotScanner(nodeView: ActorRef) extends Actor with ScorexLogging { private def run(): Unit = { var (current, total) = readProgress() - if(total == 0) return + if(total == math.pow(2, ManifestSerializer.MainnetManifestDepth)) return - val initialized = Await.result( - (nodeView ? GetDataFromCurrentView[UtxoState, ErgoWallet](_.vault)) - .mapTo[ErgoWallet] - .flatMap(_.getReader.getWalletStatus) - .map(_.initialized), + val (state, wallet) = Await.result( + (nodeView ? GetDataFromCurrentView[UtxoState, (UtxoState, ErgoWallet)](x => (x.state, x.vault))) + .mapTo[(UtxoState, ErgoWallet)], duration ) + + val initialized: Boolean = Await.result(wallet.getWalletStatus.map(_.initialized), duration) if(!initialized) return writeProgress(current, total) log.info(s"Starting UTXO set snapshot scan for $total chunks") - downloadedChunksIterator(historyStorage, current, total).foreach { subtree => + state.persistentProver.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree { subtree => current += 1 chunkBuffer += (( @@ -84,26 +82,21 @@ class UtxoSnapshotScanner(nodeView: ActorRef) extends Actor with ScorexLogging { )) if(chunkBuffer.size == 32) { - sendBufferToWallet(current, total) + sendBufferToWallet(wallet, current, total) } } // flush remaining data, if any if(chunkBuffer.nonEmpty) { - sendBufferToWallet(current, total) + sendBufferToWallet(wallet, current, total) } if(current == total) { log.info(s"Successfully scanned $total UTXO set snapshot chunks") - history.removeUtxoSnapshotChunks() writeProgress(0, 0) // start wallet scan with first available block val firstBlock = history.bestFullBlockAt(history.readMinimalFullBlockHeight()).get - Await.result( - (nodeView ? GetDataFromCurrentView[UtxoState, ErgoWallet](_.vault)) - .mapTo[ErgoWallet].map(_.scanPersistent(firstBlock)), - duration - ) + wallet.scanPersistent(firstBlock) } } @@ -111,16 +104,12 @@ class UtxoSnapshotScanner(nodeView: ActorRef) extends Actor with ScorexLogging { override def receive: Receive = { case InitializeUtxoSetScannerWithHistory(history: ErgoHistory) => this.history = history - case InitializeUtxoSetScannerWithSnapshot() => - val total = history.utxoSetSnapshotDownloadPlan().map(_.totalChunks).getOrElse(0) - writeProgress(0, total) case StartUtxoSetSnapshotScan() => run() } override def preStart(): Unit = { context.system.eventStream.subscribe(self, classOf[InitializeUtxoSetScannerWithHistory]) - context.system.eventStream.subscribe(self, classOf[InitializeUtxoSetScannerWithSnapshot]) context.system.eventStream.subscribe(self, classOf[StartUtxoSetSnapshotScan]) } @@ -130,31 +119,8 @@ object UtxoSnapshotScanner { case class InitializeUtxoSetScannerWithHistory(history: ErgoHistory) - case class InitializeUtxoSetScannerWithSnapshot() - case class StartUtxoSetSnapshotScan() - private val downloadedChunksPrefix = Blake2b256.hash("downloaded chunk").drop(4) - - private def chunkIdFromIndex(index: Int): Array[Byte] = { - val idxBytes = Ints.toByteArray(index) - downloadedChunksPrefix ++ idxBytes - } - - private def downloadedChunkIdsIterator(from: Int, to: Int): Iterator[Array[Byte]] = { - Iterator.range(from, to).map(chunkIdFromIndex) - } - - def downloadedChunksIterator(historyStorage: HistoryStorage, - from: Int, - to: Int): Iterator[BatchAVLProverSubtree[Constants.DigestType]] = { - downloadedChunkIdsIterator(from, to).flatMap { chunkId => - historyStorage - .get(chunkId) - .flatMap(bs => SubtreeSerializer.parseBytesTry(bs).toOption) - } - } - private val utxoSetScanProgressKey: ByteArrayWrapper = ByteArrayWrapper(Blake2b256.hash("scanned chunk")) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/UtxoSetSnapshotProcessor.scala b/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/UtxoSetSnapshotProcessor.scala index 52dc82b26b..a84e64c364 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/UtxoSetSnapshotProcessor.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/UtxoSetSnapshotProcessor.scala @@ -58,11 +58,6 @@ trait UtxoSetSnapshotProcessor extends MinimalFullBlockHeightFunctions with Scor _cachedDownloadPlan.map(_.latestUpdateTime).getOrElse(0L) - _cachedDownloadPlan.map(_.createdTime).getOrElse(0L) } log.info(s"UTXO set downloading and application time: ${utxoPhaseTime / 1000.0} s.") - // set height of first full block to be downloaded - writeMinimalFullBlockHeight(height + 1) - } - - def removeUtxoSnapshotChunks(): Unit = { val ts0 = System.currentTimeMillis() _cachedDownloadPlan.foreach { plan => val chunkIdsToRemove = downloadedChunkIdsIterator(plan.totalChunks) @@ -74,6 +69,8 @@ trait UtxoSetSnapshotProcessor extends MinimalFullBlockHeightFunctions with Scor _cachedDownloadPlan = None val ts = System.currentTimeMillis() log.info(s"Imported UTXO set snapshots chunks removed in ${ts - ts0} ms") + // set height of first full block to be downloaded + writeMinimalFullBlockHeight(height + 1) } private def updateUtxoSetSnashotDownloadPlan(plan: UtxoSetSnapshotDownloadPlan): Unit = { diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala index 65bd134aeb..128696c96c 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala @@ -5,7 +5,6 @@ import akka.pattern.ask import org.ergoplatform.modifiers.mempool.ErgoTransaction import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock} import org.ergoplatform.nodeView.history.ErgoHistoryReader -import org.ergoplatform.nodeView.history.UtxoSnapshotScanner.StartUtxoSetSnapshotScan import org.ergoplatform.nodeView.state.ErgoState import org.ergoplatform.nodeView.wallet.ErgoWalletActor._ import org.ergoplatform.settings.{ErgoSettings, Parameters} @@ -44,7 +43,6 @@ class ErgoWallet(historyReader: ErgoHistoryReader, settings: ErgoSettings, param private val duration: Duration = Duration.create(10, TimeUnit.SECONDS) private var isUtxoSnapshotScannerRunning: Boolean = false - private var isUtxoSnapshotScannerStarted: Boolean = false def scanUtxoSnapshot(msg: ScanBoxesFromUtxoSnapshot): ErgoWallet = { isUtxoSnapshotScannerRunning = msg.current < msg.total @@ -64,22 +62,11 @@ class ErgoWallet(historyReader: ErgoHistoryReader, settings: ErgoSettings, param def scanPersistent(modifier: BlockSection): ErgoWallet = { - def isUtxoBootStrapping: Boolean = // height is kept at 0 while the scan is running + def willUtxoSnapshotScannerRun: Boolean = // height is kept at 0 while the scan is running settings.nodeSettings.utxoSettings.utxoBootstrap && Await.result(getWalletStatus, duration).height == 0 - val shouldStart: Boolean = !isUtxoSnapshotScannerRunning && !isUtxoSnapshotScannerStarted - - if(isUtxoSnapshotScannerRunning || // scan already running, dont process blocks - (shouldStart && isUtxoBootStrapping)) { // scan not running, start scanner - - if(!isUtxoSnapshotScannerStarted) { - actorSystem.eventStream.publish(StartUtxoSetSnapshotScan()) - isUtxoSnapshotScannerStarted = true - isUtxoSnapshotScannerRunning = true - } - - }else { - isUtxoSnapshotScannerStarted = true // this prevents getWalletStatus getting called every time + if(!(isUtxoSnapshotScannerRunning || // scan already running, dont process blocks + willUtxoSnapshotScannerRun)) { // scan not running, scanner will start modifier match { case fb: ErgoFullBlock => walletActor ! ScanOnChain(fb) From 791045b81012357508742cb0bd61c6cc67165d11 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Tue, 7 Nov 2023 11:00:55 +0100 Subject: [PATCH 06/12] [WIP] Added rescan functionality, reworked AVL tree iteration --- .../batch/VersionedLDBAVLStorage.scala | 34 ++++++------- src/main/scala/org/ergoplatform/ErgoApp.scala | 4 +- .../nodeView/ErgoNodeViewHolder.scala | 25 ++++++++-- .../nodeView/history/ErgoHistory.scala | 6 +-- ...shotScanner.scala => UtxoSetScanner.scala} | 49 ++++++++++--------- .../nodeView/wallet/ErgoWallet.scala | 22 +++------ .../nodeView/wallet/ErgoWalletActor.scala | 11 +++-- 7 files changed, 82 insertions(+), 69 deletions(-) rename src/main/scala/org/ergoplatform/nodeView/history/{UtxoSnapshotScanner.scala => UtxoSetScanner.scala} (74%) diff --git a/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala b/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala index f3ad158fff..b0376487d6 100644 --- a/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala +++ b/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala @@ -1,7 +1,7 @@ package scorex.crypto.authds.avltree.batch import com.google.common.primitives.Ints -import scorex.core.serialization.{ManifestSerializer, SubtreeSerializer} +import scorex.core.serialization.ManifestSerializer import scorex.crypto.authds.avltree.batch.Constants.{DigestType, HashFnType, hashFn} import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{topNodeHashKey, topNodeHeightKey} import scorex.crypto.authds.avltree.batch.serialization.{BatchAVLProverManifest, BatchAVLProverSubtree, ProxyInternalNode} @@ -155,32 +155,28 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore) } } - def iterateAVLTree(handleSubtree: BatchAVLProverSubtree[DigestType] => Unit): Unit = + def iterateAVLTree(fromIndex: Int)(handleSubtree: BatchAVLProverSubtree[DigestType] => Unit): Unit = store.processSnapshot { dbReader => - def subtreeLoop(label: Array[Byte], builder: mutable.ArrayBuilder[Byte]): Unit = { - val nodeBytes = dbReader.get(label) - builder ++= nodeBytes - VersionedLDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes) match { - case in: ProxyInternalNode[DigestType] => - subtreeLoop(Digest32 @@@ in.leftLabel, builder) - subtreeLoop(Digest32 @@@ in.rightLabel, builder) - case _ => - } - } + var current: Int = 0 - def processSubtree(label: Array[Byte]): BatchAVLProverSubtree[DigestType] = { - val builder = mutable.ArrayBuilder.make[Byte]() - builder.sizeHint(200000) - subtreeLoop(label, builder) - SubtreeSerializer.parseBytes(builder.result()) + def subtree(sid: Array[Byte]): BatchAVLProverSubtree[DigestType] = { + def loop(label: Array[Byte]): ProverNodes[DigestType] = + VersionedLDBAVLStorage.noStoreSerializer.parseBytes(dbReader.get(label)) match { + case leaf: ProverLeaf[DigestType] => leaf + case i: ProxyInternalNode[DigestType] => + i.getNew(loop(i.leftLabel), loop(i.rightLabel)) + } + new BatchAVLProverSubtree[DigestType](loop(sid)) } def proxyLoop(label: Array[Byte], level: Int): Unit = VersionedLDBAVLStorage.noStoreSerializer.parseBytes(dbReader.get(label)) match { case in: ProxyInternalNode[DigestType] if level == ManifestSerializer.MainnetManifestDepth => - handleSubtree(processSubtree(in.leftLabel)) - handleSubtree(processSubtree(in.rightLabel)) + if(current >= fromIndex) handleSubtree(subtree(in.leftLabel)) + current += 1 + if(current >= fromIndex) handleSubtree(subtree(in.rightLabel)) + current += 1 case in: ProxyInternalNode[DigestType] => proxyLoop(in.leftLabel, level + 1) proxyLoop(in.rightLabel, level + 1) diff --git a/src/main/scala/org/ergoplatform/ErgoApp.scala b/src/main/scala/org/ergoplatform/ErgoApp.scala index 4f156fc5fc..2a26e42a47 100644 --- a/src/main/scala/org/ergoplatform/ErgoApp.scala +++ b/src/main/scala/org/ergoplatform/ErgoApp.scala @@ -10,7 +10,7 @@ import org.ergoplatform.local._ import org.ergoplatform.mining.ErgoMiner import org.ergoplatform.mining.ErgoMiner.StartMining import org.ergoplatform.network.{ErgoNodeViewSynchronizer, ErgoSyncTracker} -import org.ergoplatform.nodeView.history.{ErgoSyncInfoMessageSpec, UtxoSnapshotScanner} +import org.ergoplatform.nodeView.history.{ErgoSyncInfoMessageSpec, UtxoSetScanner} import org.ergoplatform.nodeView.history.extra.ExtraIndexer import org.ergoplatform.nodeView.{ErgoNodeViewRef, ErgoReadersHolderRef} import org.ergoplatform.settings.{Args, ErgoSettings, NetworkType} @@ -116,7 +116,7 @@ class ErgoApp(args: Args) extends ScorexLogging { // Create an instance of ExtraIndexer actor (will start if "extraIndex = true" in config) private val indexer: ActorRef = ExtraIndexer(ergoSettings.chainSettings, ergoSettings.cacheSettings) - UtxoSnapshotScanner(nodeViewHolderRef) + UtxoSetScanner(nodeViewHolderRef) private val syncTracker = ErgoSyncTracker(scorexSettings.network) diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala index 8d6995c2d8..eff666c19e 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala @@ -28,10 +28,13 @@ import spire.syntax.all.cfor import java.io.File import org.ergoplatform.modifiers.history.extension.Extension -import org.ergoplatform.nodeView.history.UtxoSnapshotScanner.StartUtxoSetSnapshotScan +import org.ergoplatform.nodeView.history.UtxoSetScanner.StartUtxoSetScan import scala.annotation.tailrec import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.{Duration, SECONDS} import scala.util.{Failure, Success, Try} /** @@ -64,6 +67,11 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti private val modifiersCache = new ErgoModifiersCache(384) + /** + * Cache wallet height for utxo set snapshot bootstrap + */ + private var cachedWalletHeight: Int = 0 + /** * The main data structure a node software is taking care about, a node view consists * of four elements to be updated atomically: history (log of persistent modifiers), @@ -452,8 +460,17 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti } } - //todo: update state in async way? + def shouldScanBlocks: Boolean = + if(settings.nodeSettings.utxoSettings.utxoBootstrap) { + if(cachedWalletHeight == 0) { + cachedWalletHeight = Await.result(vault().getWalletStatus.map(_.height), Duration(3, SECONDS)) + } + cachedWalletHeight > 0 + }else { + true + } + //todo: update state in async way? /** * Remote and local persistent modifiers need to be appended to history, applied to state * which also needs to be git propagated to mempool and wallet @@ -510,7 +527,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti v } - if (almostSynced) { + if (almostSynced && shouldScanBlocks) { blocksApplied.foreach(newVault.scanPersistent) } @@ -522,7 +539,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti val recheckCommand = RecheckMempool(utxoStateReader, newMemPool) context.system.eventStream.publish(recheckCommand) if(settings.nodeSettings.utxoSettings.utxoBootstrap) { - context.system.eventStream.publish(StartUtxoSetSnapshotScan()) + context.system.eventStream.publish(StartUtxoSetScan()) } case _ => } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala index 47cc7050a2..b85f9661e2 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala @@ -8,7 +8,7 @@ import org.ergoplatform.mining.AutolykosPowScheme import org.ergoplatform.modifiers.history._ import org.ergoplatform.modifiers.history.header.{Header, PreGenesisHeader} import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock, NonHeaderBlockSection} -import org.ergoplatform.nodeView.history.UtxoSnapshotScanner.InitializeUtxoSetScannerWithHistory +import org.ergoplatform.nodeView.history.UtxoSetScanner.StartUtxoSetScanWithHistory import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.StartExtraIndexer import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{IndexedHeightKey, NewestVersion, NewestVersionBytes, SchemaVersionKey, getIndex} import org.ergoplatform.nodeView.history.storage.HistoryStorage @@ -342,9 +342,9 @@ object ErgoHistory extends ScorexLogging { context.system.eventStream.publish(StartExtraIndexer(history)) } - // set history for snapshot scanner, if bootstrapping by snapshot + // set history for utxo set scanner, if bootstrapping by snapshot if(ergoSettings.nodeSettings.utxoSettings.utxoBootstrap) { - context.system.eventStream.publish(InitializeUtxoSetScannerWithHistory(history)) + context.system.eventStream.publish(StartUtxoSetScanWithHistory(history)) } history diff --git a/src/main/scala/org/ergoplatform/nodeView/history/UtxoSnapshotScanner.scala b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala similarity index 74% rename from src/main/scala/org/ergoplatform/nodeView/history/UtxoSnapshotScanner.scala rename to src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala index a44549b365..82a35f5e45 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/UtxoSnapshotScanner.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala @@ -6,7 +6,7 @@ import akka.util.Timeout import org.ergoplatform.ErgoBox import org.ergoplatform.modifiers.BlockSection import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.GetDataFromCurrentView -import org.ergoplatform.nodeView.history.UtxoSnapshotScanner._ +import org.ergoplatform.nodeView.history.UtxoSetScanner._ import org.ergoplatform.nodeView.history.storage.HistoryStorage import org.ergoplatform.nodeView.state.UtxoState import org.ergoplatform.nodeView.wallet.ErgoWallet @@ -25,7 +25,7 @@ import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration -class UtxoSnapshotScanner(nodeView: ActorRef) extends Actor with ScorexLogging { +class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { private var history: ErgoHistory = _ private def historyStorage: HistoryStorage = history.historyStorage @@ -42,23 +42,25 @@ class UtxoSnapshotScanner(nodeView: ActorRef) extends Actor with ScorexLogging { (current, total) }.getOrElse((0, 0)) - private def writeProgress(current: Int, total: Int): Unit = { + private def writeProgress(current: Int, total: Int = MainnetTotal): Unit = { val buffer: ByteBuffer = ByteBuffer.allocate(8) buffer.putInt(current) buffer.putInt(total) historyStorage.insert(Array((utxoSetScanProgressKey, buffer.array)), Array.empty[BlockSection]) } - private def sendBufferToWallet(wallet: ErgoWallet, current: Int, total: Int): Unit = { - wallet.scanUtxoSnapshot(ScanBoxesFromUtxoSnapshot(chunkBuffer, current, total)) - writeProgress(current, total) + private def sendBufferToWallet(wallet: ErgoWallet, current: Int): Unit = { + wallet.scanUtxoSnapshot(ScanBoxesFromUtxoSnapshot(chunkBuffer, current, MainnetTotal)) + writeProgress(current) chunkBuffer.clear() } private def run(): Unit = { var (current, total) = readProgress() - if(total == math.pow(2, ManifestSerializer.MainnetManifestDepth)) return + if(total == 0 || // scan should not start yet, still syncing + current == MainnetTotal) // scan already done + return val (state, wallet) = Await.result( (nodeView ? GetDataFromCurrentView[UtxoState, (UtxoState, ErgoWallet)](x => (x.state, x.vault))) @@ -69,11 +71,9 @@ class UtxoSnapshotScanner(nodeView: ActorRef) extends Actor with ScorexLogging { val initialized: Boolean = Await.result(wallet.getWalletStatus.map(_.initialized), duration) if(!initialized) return - writeProgress(current, total) - log.info(s"Starting UTXO set snapshot scan for $total chunks") - state.persistentProver.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree { subtree => + state.persistentProver.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree(current) { subtree => current += 1 chunkBuffer += (( @@ -82,48 +82,53 @@ class UtxoSnapshotScanner(nodeView: ActorRef) extends Actor with ScorexLogging { )) if(chunkBuffer.size == 32) { - sendBufferToWallet(wallet, current, total) + sendBufferToWallet(wallet, current) } } // flush remaining data, if any if(chunkBuffer.nonEmpty) { - sendBufferToWallet(wallet, current, total) + sendBufferToWallet(wallet, current) } if(current == total) { - log.info(s"Successfully scanned $total UTXO set snapshot chunks") - writeProgress(0, 0) + log.info(s"Successfully scanned $total Utxo set subtrees") // start wallet scan with first available block val firstBlock = history.bestFullBlockAt(history.readMinimalFullBlockHeight()).get wallet.scanPersistent(firstBlock) + }else { + log.error(s"Inconsistent Utxo set scan state: $current scanned subtrees out of $total") } } override def receive: Receive = { - case InitializeUtxoSetScannerWithHistory(history: ErgoHistory) => + case StartUtxoSetScanWithHistory(history: ErgoHistory) => this.history = history - case StartUtxoSetSnapshotScan() => + run() + case StartUtxoSetScan() => + writeProgress(0) run() } override def preStart(): Unit = { - context.system.eventStream.subscribe(self, classOf[InitializeUtxoSetScannerWithHistory]) - context.system.eventStream.subscribe(self, classOf[StartUtxoSetSnapshotScan]) + context.system.eventStream.subscribe(self, classOf[StartUtxoSetScanWithHistory]) + context.system.eventStream.subscribe(self, classOf[StartUtxoSetScan]) } } -object UtxoSnapshotScanner { +object UtxoSetScanner { + + case class StartUtxoSetScanWithHistory(history: ErgoHistory) - case class InitializeUtxoSetScannerWithHistory(history: ErgoHistory) + case class StartUtxoSetScan() - case class StartUtxoSetSnapshotScan() + private val MainnetTotal: Int = math.pow(2, ManifestSerializer.MainnetManifestDepth).toInt private val utxoSetScanProgressKey: ByteArrayWrapper = ByteArrayWrapper(Blake2b256.hash("scanned chunk")) def apply(nodeView: ActorRef)(implicit system: ActorSystem): ActorRef = - system.actorOf(Props.create(classOf[UtxoSnapshotScanner], nodeView)) + system.actorOf(Props.create(classOf[UtxoSetScanner], nodeView)) } diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala index 128696c96c..8cfde7754c 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala @@ -42,10 +42,7 @@ class ErgoWallet(historyReader: ErgoHistoryReader, settings: ErgoSettings, param private val duration: Duration = Duration.create(10, TimeUnit.SECONDS) - private var isUtxoSnapshotScannerRunning: Boolean = false - def scanUtxoSnapshot(msg: ScanBoxesFromUtxoSnapshot): ErgoWallet = { - isUtxoSnapshotScannerRunning = msg.current < msg.total Await.result(walletActor ? msg, duration) this } @@ -61,19 +58,12 @@ class ErgoWallet(historyReader: ErgoHistoryReader, settings: ErgoSettings, param } def scanPersistent(modifier: BlockSection): ErgoWallet = { - - def willUtxoSnapshotScannerRun: Boolean = // height is kept at 0 while the scan is running - settings.nodeSettings.utxoSettings.utxoBootstrap && Await.result(getWalletStatus, duration).height == 0 - - if(!(isUtxoSnapshotScannerRunning || // scan already running, dont process blocks - willUtxoSnapshotScannerRun)) { // scan not running, scanner will start - modifier match { - case fb: ErgoFullBlock => - walletActor ! ScanOnChain(fb) - case _ => - log.debug("Not full block in ErgoWallet.scanPersistent, which could be the case only if " + - "state = digest when bootstrapping") - } + modifier match { + case fb: ErgoFullBlock => + walletActor ! ScanOnChain(fb) + case _ => + log.debug("Not full block in ErgoWallet.scanPersistent, which could be the case only if " + + "state = digest when bootstrapping") } this } diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala index f98b4d1f16..a0795dc5f3 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala @@ -21,6 +21,7 @@ import org.ergoplatform.wallet.boxes.{BoxSelector, ChainStatus} import org.ergoplatform.wallet.interface4j.SecretString import org.ergoplatform.wallet.interpreter.TransactionHintsBag import org.ergoplatform._ +import org.ergoplatform.nodeView.history.UtxoSetScanner.StartUtxoSetScan import scorex.core.VersionTag import scorex.core.utils.ScorexEncoding import scorex.util.{ModifierId, ScorexLogging} @@ -305,7 +306,7 @@ class ErgoWalletActor(settings: ErgoSettings, context.become(loadedWallet(newState)) } else if (nextBlockHeight < newBlock.height) { log.warn(s"Wallet: skipped blocks found starting from $nextBlockHeight, going back to scan them") - self ! ScanInThePast(nextBlockHeight, false) + self ! ScanInThePast(nextBlockHeight, state.rescanInProgress) } else { log.warn(s"Wallet: block in the past reported at ${newBlock.height}, blockId: ${newBlock.id}") } @@ -369,8 +370,12 @@ class ErgoWalletActor(settings: ErgoSettings, ergoWalletService.recreateRegistry(state, settings) match { case Success(newState) => context.become(loadedWallet(newState.copy(rescanInProgress = true))) - val heightToScanFrom = Math.min(newState.fullHeight, fromHeight) - self ! ScanInThePast(heightToScanFrom, rescan = true) + if(settings.nodeSettings.utxoSettings.utxoBootstrap) { + context.system.eventStream.publish(StartUtxoSetScan()) + }else { + val heightToScanFrom = Math.min(newState.fullHeight, fromHeight) + self ! ScanInThePast(heightToScanFrom, rescan = true) + } sender() ! Success(()) case f@Failure(t) => log.error("Error during rescan attempt: ", t) From 8a50056e76268e1f3ffffe63e2e457b46a6f7cf2 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Tue, 7 Nov 2023 13:14:37 +0100 Subject: [PATCH 07/12] Fixed bug that restarted indexer --- .../avltree/batch/VersionedLDBAVLStorage.scala | 14 +++++++------- .../ergoplatform/nodeView/ErgoNodeViewHolder.scala | 4 ++-- .../nodeView/history/UtxoSetScanner.scala | 14 ++++++-------- .../nodeView/wallet/ErgoWalletActor.scala | 2 +- 4 files changed, 16 insertions(+), 18 deletions(-) diff --git a/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala b/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala index b0376487d6..6fe75e499c 100644 --- a/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala +++ b/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala @@ -1,9 +1,9 @@ package scorex.crypto.authds.avltree.batch import com.google.common.primitives.Ints -import scorex.core.serialization.ManifestSerializer +import scorex.core.serialization.ManifestSerializer.MainnetManifestDepth import scorex.crypto.authds.avltree.batch.Constants.{DigestType, HashFnType, hashFn} -import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{topNodeHashKey, topNodeHeightKey} +import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{noStoreSerializer, topNodeHashKey, topNodeHeightKey} import scorex.crypto.authds.avltree.batch.serialization.{BatchAVLProverManifest, BatchAVLProverSubtree, ProxyInternalNode} import scorex.crypto.authds.{ADDigest, ADKey} import scorex.util.encode.Base16 @@ -105,7 +105,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore) def subtreeLoop(label: DigestType, builder: mutable.ArrayBuilder[Byte]): Unit = { val nodeBytes = dbReader.get(label) builder ++= nodeBytes - val node = VersionedLDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes) + val node = noStoreSerializer.parseBytes(nodeBytes) node match { case in: ProxyInternalNode[DigestType] => subtreeLoop(Digest32 @@@ in.leftLabel, builder) @@ -124,7 +124,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore) def manifestLoop(nodeDbKey: Array[Byte], level: Int, manifestBuilder: mutable.ArrayBuilder[Byte]): Unit = { val nodeBytes = dbReader.get(nodeDbKey) manifestBuilder ++= nodeBytes - val node = VersionedLDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes) + val node = noStoreSerializer.parseBytes(nodeBytes) node match { case in: ProxyInternalNode[DigestType] if level == manifestDepth => dumpSubtree(Digest32 @@@ in.leftLabel) @@ -162,7 +162,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore) def subtree(sid: Array[Byte]): BatchAVLProverSubtree[DigestType] = { def loop(label: Array[Byte]): ProverNodes[DigestType] = - VersionedLDBAVLStorage.noStoreSerializer.parseBytes(dbReader.get(label)) match { + (noStoreSerializer.parseBytes(dbReader.get(label)): @unchecked) match { case leaf: ProverLeaf[DigestType] => leaf case i: ProxyInternalNode[DigestType] => i.getNew(loop(i.leftLabel), loop(i.rightLabel)) @@ -171,8 +171,8 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore) } def proxyLoop(label: Array[Byte], level: Int): Unit = - VersionedLDBAVLStorage.noStoreSerializer.parseBytes(dbReader.get(label)) match { - case in: ProxyInternalNode[DigestType] if level == ManifestSerializer.MainnetManifestDepth => + noStoreSerializer.parseBytes(dbReader.get(label)) match { + case in: ProxyInternalNode[DigestType] if level == MainnetManifestDepth => if(current >= fromIndex) handleSubtree(subtree(in.leftLabel)) current += 1 if(current >= fromIndex) handleSubtree(subtree(in.rightLabel)) diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala index eff666c19e..661e9417e0 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala @@ -538,8 +538,8 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti case utxoStateReader: UtxoStateReader if headersHeight == fullBlockHeight => val recheckCommand = RecheckMempool(utxoStateReader, newMemPool) context.system.eventStream.publish(recheckCommand) - if(settings.nodeSettings.utxoSettings.utxoBootstrap) { - context.system.eventStream.publish(StartUtxoSetScan()) + if(!shouldScanBlocks) { + context.system.eventStream.publish(StartUtxoSetScan(false)) } case _ => } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala index 82a35f5e45..69da536591 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala @@ -42,7 +42,7 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { (current, total) }.getOrElse((0, 0)) - private def writeProgress(current: Int, total: Int = MainnetTotal): Unit = { + private def writeProgress(current: Int, total: Int): Unit = { val buffer: ByteBuffer = ByteBuffer.allocate(8) buffer.putInt(current) buffer.putInt(total) @@ -51,7 +51,7 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { private def sendBufferToWallet(wallet: ErgoWallet, current: Int): Unit = { wallet.scanUtxoSnapshot(ScanBoxesFromUtxoSnapshot(chunkBuffer, current, MainnetTotal)) - writeProgress(current) + writeProgress(current, MainnetTotal) chunkBuffer.clear() } @@ -93,9 +93,7 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { if(current == total) { log.info(s"Successfully scanned $total Utxo set subtrees") - // start wallet scan with first available block - val firstBlock = history.bestFullBlockAt(history.readMinimalFullBlockHeight()).get - wallet.scanPersistent(firstBlock) + wallet.scanPersistent(history.bestFullBlockOpt.get) }else { log.error(s"Inconsistent Utxo set scan state: $current scanned subtrees out of $total") } @@ -106,8 +104,8 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { case StartUtxoSetScanWithHistory(history: ErgoHistory) => this.history = history run() - case StartUtxoSetScan() => - writeProgress(0) + case StartUtxoSetScan(rescan: Boolean) => + if(readProgress()._1 == 0 || rescan) writeProgress(0, MainnetTotal) run() } @@ -122,7 +120,7 @@ object UtxoSetScanner { case class StartUtxoSetScanWithHistory(history: ErgoHistory) - case class StartUtxoSetScan() + case class StartUtxoSetScan(rescan: Boolean) private val MainnetTotal: Int = math.pow(2, ManifestSerializer.MainnetManifestDepth).toInt diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala index a0795dc5f3..e4cd2d6b93 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala @@ -371,7 +371,7 @@ class ErgoWalletActor(settings: ErgoSettings, case Success(newState) => context.become(loadedWallet(newState.copy(rescanInProgress = true))) if(settings.nodeSettings.utxoSettings.utxoBootstrap) { - context.system.eventStream.publish(StartUtxoSetScan()) + context.system.eventStream.publish(StartUtxoSetScan(true)) }else { val heightToScanFrom = Math.min(newState.fullHeight, fromHeight) self ! ScanInThePast(heightToScanFrom, rescan = true) From 40574e6e2869fc039c69d53c9d82ff152ef63da4 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Wed, 8 Nov 2023 01:24:13 +0100 Subject: [PATCH 08/12] Removed caching --- .../ergoplatform/nodeView/ErgoNodeViewHolder.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala index 661e9417e0..b9f6840fcb 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala @@ -67,11 +67,6 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti private val modifiersCache = new ErgoModifiersCache(384) - /** - * Cache wallet height for utxo set snapshot bootstrap - */ - private var cachedWalletHeight: Int = 0 - /** * The main data structure a node software is taking care about, a node view consists * of four elements to be updated atomically: history (log of persistent modifiers), @@ -462,10 +457,11 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti def shouldScanBlocks: Boolean = if(settings.nodeSettings.utxoSettings.utxoBootstrap) { - if(cachedWalletHeight == 0) { - cachedWalletHeight = Await.result(vault().getWalletStatus.map(_.height), Duration(3, SECONDS)) + try { + Await.result(vault().getWalletStatus.map(_.height), Duration(3, SECONDS)) > 0 + }catch { + case _: Throwable => false } - cachedWalletHeight > 0 }else { true } From fb7b86a598737e53de99d0cb43c9fb4f3a1ebc04 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Wed, 8 Nov 2023 10:55:52 +0100 Subject: [PATCH 09/12] Fixed rescan problems --- .../ergoplatform/nodeView/ErgoNodeViewHolder.scala | 6 +++++- .../nodeView/wallet/ErgoWalletActor.scala | 5 ++--- .../nodeView/wallet/ErgoWalletService.scala | 12 ++++++------ 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala index b9f6840fcb..71269bc8a7 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala @@ -455,7 +455,11 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti } } - def shouldScanBlocks: Boolean = + /** + * Whether to send blocks to wallet to scan. (Utxo scan keeps wallet height at 0) + * @return true if utxoBootstrap is not enabled; if it is enabled, then walletHeight > 0 + */ + private def shouldScanBlocks: Boolean = if(settings.nodeSettings.utxoSettings.utxoBootstrap) { try { Await.result(vault().getWalletStatus.map(_.height), Duration(3, SECONDS)) > 0 diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala index e4cd2d6b93..bbbdbe41ed 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala @@ -246,7 +246,6 @@ class ErgoWalletActor(settings: ErgoSettings, case ScanBoxesFromUtxoSnapshot(chunks: ArrayBuffer[(ModifierId,Array[ErgoBox])], current: Int, total: Int) => val newState = chunks.zipWithIndex.foldLeft(state) { case (accState, ((id, boxes), i)) => val chunk = current - chunks.size + i + 1 - // last scanned chunk sets wallet height to the first available block ergoWalletService.scanSnapshotChunk(accState, boxes, id, settings.walletSettings.dustLimit) match { case Failure(ex) => val errorMsg = s"Failed to scan ${boxes.length} boxes in chunk $chunk / $total: ${ex.getMessage}" @@ -306,7 +305,7 @@ class ErgoWalletActor(settings: ErgoSettings, context.become(loadedWallet(newState)) } else if (nextBlockHeight < newBlock.height) { log.warn(s"Wallet: skipped blocks found starting from $nextBlockHeight, going back to scan them") - self ! ScanInThePast(nextBlockHeight, state.rescanInProgress) + self ! ScanInThePast(nextBlockHeight, rescan = false) } else { log.warn(s"Wallet: block in the past reported at ${newBlock.height}, blockId: ${newBlock.id}") } @@ -369,10 +368,10 @@ class ErgoWalletActor(settings: ErgoSettings, log.info(s"Rescanning the wallet from height: $fromHeight") ergoWalletService.recreateRegistry(state, settings) match { case Success(newState) => - context.become(loadedWallet(newState.copy(rescanInProgress = true))) if(settings.nodeSettings.utxoSettings.utxoBootstrap) { context.system.eventStream.publish(StartUtxoSetScan(true)) }else { + context.become(loadedWallet(newState.copy(rescanInProgress = true))) val heightToScanFrom = Math.min(newState.fullHeight, fromHeight) self ! ScanInThePast(heightToScanFrom, rescan = true) } diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletService.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletService.scala index dc7b869082..edfad4679f 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletService.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletService.scala @@ -222,12 +222,12 @@ trait ErgoWalletService { def scanBlockUpdate(state: ErgoWalletState, block: ErgoFullBlock, dustLimit: Option[Long]): Try[ErgoWalletState] /** - * - * @param state - * @param boxes - * @param subtreeId - * @param dustLimit - * @return + * Scan boxes extracted from Utxo set subtree + * @param state - current wallet state + * @param boxes - box array to scan + * @param subtreeId - id of Utxo set subtree (used instead of blockId as version id) + * @param dustLimit - boxes with value smaller than dustLimit are disregarded in wallet scan logic + * @return new wallet state */ def scanSnapshotChunk(state: ErgoWalletState, boxes: Array[ErgoBox], subtreeId: ModifierId, dustLimit: Option[Long]): Try[ErgoWalletState] From 31e651a5308d5e112f18d866a0532c1fbde88960 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Wed, 8 Nov 2023 18:19:12 +0100 Subject: [PATCH 10/12] Undo changes --- .../storage/modifierprocessors/UtxoSetSnapshotProcessor.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/UtxoSetSnapshotProcessor.scala b/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/UtxoSetSnapshotProcessor.scala index a84e64c364..66f614da64 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/UtxoSetSnapshotProcessor.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/storage/modifierprocessors/UtxoSetSnapshotProcessor.scala @@ -58,6 +58,7 @@ trait UtxoSetSnapshotProcessor extends MinimalFullBlockHeightFunctions with Scor _cachedDownloadPlan.map(_.latestUpdateTime).getOrElse(0L) - _cachedDownloadPlan.map(_.createdTime).getOrElse(0L) } log.info(s"UTXO set downloading and application time: ${utxoPhaseTime / 1000.0} s.") + // remove downloaded utxo set snapshots chunks val ts0 = System.currentTimeMillis() _cachedDownloadPlan.foreach { plan => val chunkIdsToRemove = downloadedChunkIdsIterator(plan.totalChunks) @@ -69,6 +70,7 @@ trait UtxoSetSnapshotProcessor extends MinimalFullBlockHeightFunctions with Scor _cachedDownloadPlan = None val ts = System.currentTimeMillis() log.info(s"Imported UTXO set snapshots chunks removed in ${ts - ts0} ms") + // set height of first full block to be downloaded writeMinimalFullBlockHeight(height + 1) } From 4112fb7071f36b001dea51e1ea5c53e17a14e6c3 Mon Sep 17 00:00:00 2001 From: jellymlg Date: Wed, 6 Dec 2023 00:50:55 +0100 Subject: [PATCH 11/12] Fixed merge problems --- .../authds/avltree/batch/VersionedLDBAVLStorage.scala | 2 +- .../ergoplatform/nodeView/history/UtxoSetScanner.scala | 4 ++-- .../ergoplatform/nodeView/wallet/ErgoWalletActor.scala | 7 ++++--- .../nodeView/wallet/ErgoWalletActorMessages.scala | 9 +++++++++ 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala b/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala index 6fe75e499c..eec2e08d5b 100644 --- a/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala +++ b/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala @@ -1,7 +1,7 @@ package scorex.crypto.authds.avltree.batch import com.google.common.primitives.Ints -import scorex.core.serialization.ManifestSerializer.MainnetManifestDepth +import org.ergoplatform.serialization.ManifestSerializer.MainnetManifestDepth import scorex.crypto.authds.avltree.batch.Constants.{DigestType, HashFnType, hashFn} import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{noStoreSerializer, topNodeHashKey, topNodeHeightKey} import scorex.crypto.authds.avltree.batch.serialization.{BatchAVLProverManifest, BatchAVLProverSubtree, ProxyInternalNode} diff --git a/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala index 69da536591..7c73056965 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala @@ -10,9 +10,9 @@ import org.ergoplatform.nodeView.history.UtxoSetScanner._ import org.ergoplatform.nodeView.history.storage.HistoryStorage import org.ergoplatform.nodeView.state.UtxoState import org.ergoplatform.nodeView.wallet.ErgoWallet -import org.ergoplatform.nodeView.wallet.ErgoWalletActor.ScanBoxesFromUtxoSnapshot +import org.ergoplatform.nodeView.wallet.ErgoWalletActorMessages.ScanBoxesFromUtxoSnapshot +import org.ergoplatform.serialization.ManifestSerializer import org.ergoplatform.wallet.boxes.ErgoBoxSerializer -import scorex.core.serialization.ManifestSerializer import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage import scorex.crypto.hash.Blake2b256 import scorex.db.ByteArrayWrapper diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala index 33c71ab0ff..fe356e10f4 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala @@ -17,6 +17,7 @@ import org.ergoplatform.wallet.interface4j.SecretString import org.ergoplatform.nodeView.wallet.ErgoWalletActorMessages._ import org.ergoplatform._ import org.ergoplatform.core.VersionTag +import org.ergoplatform.nodeView.history.UtxoSetScanner.StartUtxoSetScan import org.ergoplatform.utils.ScorexEncoding import scorex.util.ScorexLogging import scala.concurrent.duration._ @@ -230,9 +231,9 @@ class ErgoWalletActor(settings: ErgoSettings, ) context.become(loadedWallet(newState)) - case ScanBoxesFromUtxoSnapshot(chunks: ArrayBuffer[(ModifierId,Array[ErgoBox])], current: Int, total: Int) => - val newState = chunks.zipWithIndex.foldLeft(state) { case (accState, ((id, boxes), i)) => - val chunk = current - chunks.size + i + 1 + case ScanBoxesFromUtxoSnapshot(subtrees, current, total) => + val newState = subtrees.zipWithIndex.foldLeft(state) { case (accState, ((id, boxes), i)) => + val chunk = current - subtrees.size + i + 1 ergoWalletService.scanSnapshotChunk(accState, boxes, id, settings.walletSettings.dustLimit) match { case Failure(ex) => val errorMsg = s"Failed to scan ${boxes.length} boxes in chunk $chunk / $total: ${ex.getMessage}" diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActorMessages.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActorMessages.scala index e3292717c7..fb6ea7e1a4 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActorMessages.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActorMessages.scala @@ -17,6 +17,7 @@ import org.ergoplatform.core.VersionTag import scorex.util.ModifierId import sigmastate.Values.SigmaBoolean import sigmastate.crypto.DLogProtocol.{DLogProverInput, ProveDlog} +import scala.collection.mutable.ArrayBuffer import scala.util.Try object ErgoWalletActorMessages { @@ -33,6 +34,14 @@ object ErgoWalletActorMessages { // Publicly available signals for the wallet actor + /** + * Scan AVL subtrees containing UTXOs + * @param subtrees - AVL subtrees to scan + * @param current - current number of last subtree + * @param total - total number of subtrees + */ + final case class ScanBoxesFromUtxoSnapshot(subtrees: ArrayBuffer[(ModifierId,Array[ErgoBox])], current: Int, total: Int) + /** * Command to scan offchain transaction * From 0527dde8d60f959c73031c86785c0bc3d670bd2c Mon Sep 17 00:00:00 2001 From: jellymlg Date: Wed, 20 Dec 2023 03:35:50 +0100 Subject: [PATCH 12/12] Added ScalaDoc, added basic test for iterateAVLTree --- .../batch/VersionedLDBAVLStorage.scala | 11 ++-- .../VersionedLDBAVLStorageSpecification.scala | 21 +++++++- .../nodeView/history/ErgoHistory.scala | 4 +- .../nodeView/history/UtxoSetScanner.scala | 51 ++++++++++++++++--- .../nodeView/wallet/WalletScanLogic.scala | 3 ++ 5 files changed, 78 insertions(+), 12 deletions(-) diff --git a/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala b/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala index eec2e08d5b..5ca683c706 100644 --- a/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala +++ b/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala @@ -1,7 +1,6 @@ package scorex.crypto.authds.avltree.batch import com.google.common.primitives.Ints -import org.ergoplatform.serialization.ManifestSerializer.MainnetManifestDepth import scorex.crypto.authds.avltree.batch.Constants.{DigestType, HashFnType, hashFn} import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{noStoreSerializer, topNodeHashKey, topNodeHeightKey} import scorex.crypto.authds.avltree.batch.serialization.{BatchAVLProverManifest, BatchAVLProverSubtree, ProxyInternalNode} @@ -155,7 +154,13 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore) } } - def iterateAVLTree(fromIndex: Int)(handleSubtree: BatchAVLProverSubtree[DigestType] => Unit): Unit = + /** + * Split the AVL+ tree to 2^depth number of subtrees and process them + * @param fromIndex - only start processing subtrees from this index + * @param depth - depth at whitch to split AVL+ tree to subtrees + * @param handleSubtree - function to process subtree + */ + def iterateAVLTree(fromIndex: Int, depth: Int)(handleSubtree: BatchAVLProverSubtree[DigestType] => Unit): Unit = store.processSnapshot { dbReader => var current: Int = 0 @@ -172,7 +177,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore) def proxyLoop(label: Array[Byte], level: Int): Unit = noStoreSerializer.parseBytes(dbReader.get(label)) match { - case in: ProxyInternalNode[DigestType] if level == MainnetManifestDepth => + case in: ProxyInternalNode[DigestType] if level == depth => if(current >= fromIndex) handleSubtree(subtree(in.leftLabel)) current += 1 if(current >= fromIndex) handleSubtree(subtree(in.rightLabel)) diff --git a/avldb/src/test/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorageSpecification.scala b/avldb/src/test/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorageSpecification.scala index 810e986b00..57b3420612 100644 --- a/avldb/src/test/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorageSpecification.scala +++ b/avldb/src/test/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorageSpecification.scala @@ -12,10 +12,11 @@ import scorex.crypto.authds.{ADDigest, ADKey, ADValue, SerializedAdProof} import scorex.util.encode.Base16 import scorex.crypto.hash.{Blake2b256, Digest32} import scorex.db.{LDBFactory, LDBVersionedStore} -import scorex.util.ByteArrayBuilder +import scorex.util.{ByteArrayBuilder, ModifierId, bytesToId} import scorex.util.serialization.VLQByteBufferWriter import scorex.utils.{Random => RandomBytes} +import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Future} @@ -370,4 +371,22 @@ class VersionedLDBAVLStorageSpecification } } + property("iterate AVL tree") { + val prover = createPersistentProver() + val current = 11 + val depth = 6 + blockchainWorkflowTest(prover) + + val chunkBuffer: ArrayBuffer[(ModifierId,Array[Array[Byte]])] = ArrayBuffer.empty[(ModifierId,Array[Array[Byte]])] + + prover.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree(current, depth) { subtree => + chunkBuffer += (( + bytesToId(subtree.id), + subtree.leafValues.toArray + )) + } + + chunkBuffer.size shouldBe math.pow(2, depth) - current + } + } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala index b0d2d35db3..e0025f82ad 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala @@ -8,7 +8,7 @@ import org.ergoplatform.mining.AutolykosPowScheme import org.ergoplatform.modifiers.history._ import org.ergoplatform.modifiers.history.header.{Header, PreGenesisHeader} import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock, NonHeaderBlockSection} -import org.ergoplatform.nodeView.history.UtxoSetScanner.StartUtxoSetScanWithHistory +import org.ergoplatform.nodeView.history.UtxoSetScanner.InitializeUtxoSetScannerWithHistory import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.StartExtraIndexer import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{IndexedHeightKey, NewestVersion, NewestVersionBytes, SchemaVersionKey, getIndex} import org.ergoplatform.nodeView.history.storage.HistoryStorage @@ -304,7 +304,7 @@ object ErgoHistory extends ScorexLogging { // set history for utxo set scanner, if bootstrapping by snapshot if(ergoSettings.nodeSettings.utxoSettings.utxoBootstrap) { - context.system.eventStream.publish(StartUtxoSetScanWithHistory(history)) + context.system.eventStream.publish(InitializeUtxoSetScannerWithHistory(history)) } history diff --git a/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala index 7c73056965..e3d476c337 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala @@ -12,6 +12,7 @@ import org.ergoplatform.nodeView.state.UtxoState import org.ergoplatform.nodeView.wallet.ErgoWallet import org.ergoplatform.nodeView.wallet.ErgoWalletActorMessages.ScanBoxesFromUtxoSnapshot import org.ergoplatform.serialization.ManifestSerializer +import org.ergoplatform.serialization.ManifestSerializer.MainnetManifestDepth import org.ergoplatform.wallet.boxes.ErgoBoxSerializer import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage import scorex.crypto.hash.Blake2b256 @@ -25,6 +26,12 @@ import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration +/** + * This class is used to provide the current UTXO set for wallet scans when bootstrapping + * by UTXO set snapshot. This is done by creating a snapshot of the UTXO set, deserializing + * the raw bytes to ErgoBoxes and sending them to the wallet actor in chunks. + * @param nodeView - NodeView actor to get wallet and UTXOs from + */ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { private var history: ErgoHistory = _ @@ -33,8 +40,15 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { private implicit val timeout: Timeout = Timeout(10, TimeUnit.SECONDS) private implicit val duration: Duration = Duration.create(10, TimeUnit.SECONDS) + /** + * Internal buffer that holds deserialized AVL subtrees until they are sent to wallet + */ private val chunkBuffer: ArrayBuffer[(ModifierId,Array[ErgoBox])] = ArrayBuffer.empty[(ModifierId,Array[ErgoBox])] + /** + * Reads the current progress of the scanner. + * @return (current segment, total segments) + */ private def readProgress(): (Int, Int) = historyStorage.getIndex(utxoSetScanProgressKey).map(ByteBuffer.wrap).map { buffer => val current = buffer.getInt @@ -42,6 +56,11 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { (current, total) }.getOrElse((0, 0)) + /** + * Writes progress to db. + * @param current - current retrieved segment + * @param total - total segment count + */ private def writeProgress(current: Int, total: Int): Unit = { val buffer: ByteBuffer = ByteBuffer.allocate(8) buffer.putInt(current) @@ -49,6 +68,11 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { historyStorage.insert(Array((utxoSetScanProgressKey, buffer.array)), Array.empty[BlockSection]) } + /** + * Send deserialized AVL subtrees to wallet for scanning. + * @param wallet - wallet to send to + * @param current - current retrieved segment + */ private def sendBufferToWallet(wallet: ErgoWallet, current: Int): Unit = { wallet.scanUtxoSnapshot(ScanBoxesFromUtxoSnapshot(chunkBuffer, current, MainnetTotal)) writeProgress(current, MainnetTotal) @@ -69,11 +93,12 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { ) val initialized: Boolean = Await.result(wallet.getWalletStatus.map(_.initialized), duration) - if(!initialized) return + if(!initialized) // wallet is not initialized + return log.info(s"Starting UTXO set snapshot scan for $total chunks") - state.persistentProver.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree(current) { subtree => + state.persistentProver.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree(current, MainnetManifestDepth) { subtree => current += 1 chunkBuffer += (( @@ -93,6 +118,7 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { if(current == total) { log.info(s"Successfully scanned $total Utxo set subtrees") + // send newest block to wallet, if blocks were applied since scan began it will go back to scan them wallet.scanPersistent(history.bestFullBlockOpt.get) }else { log.error(s"Inconsistent Utxo set scan state: $current scanned subtrees out of $total") @@ -101,16 +127,18 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { } override def receive: Receive = { - case StartUtxoSetScanWithHistory(history: ErgoHistory) => + case InitializeUtxoSetScannerWithHistory(history: ErgoHistory) => this.history = history run() case StartUtxoSetScan(rescan: Boolean) => - if(readProgress()._1 == 0 || rescan) writeProgress(0, MainnetTotal) + if(readProgress()._1 == 0 || // + rescan) // start over UTXO set scan + writeProgress(0, MainnetTotal) run() } override def preStart(): Unit = { - context.system.eventStream.subscribe(self, classOf[StartUtxoSetScanWithHistory]) + context.system.eventStream.subscribe(self, classOf[InitializeUtxoSetScannerWithHistory]) context.system.eventStream.subscribe(self, classOf[StartUtxoSetScan]) } @@ -118,10 +146,21 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { object UtxoSetScanner { - case class StartUtxoSetScanWithHistory(history: ErgoHistory) + /** + * Initialize UTXO set scanner with database and try continuing scan if possible + * @param history - database handle + */ + case class InitializeUtxoSetScannerWithHistory(history: ErgoHistory) + /** + * Start scanning UTXO set, or continue if the scan was interrupted, or start over if rescan = true + * @param rescan - whether to start over or just continue scan + */ case class StartUtxoSetScan(rescan: Boolean) + /** + * Number of subtrees to divide AVL tree to + */ private val MainnetTotal: Int = math.pow(2, ManifestSerializer.MainnetManifestDepth).toInt private val utxoSetScanProgressKey: ByteArrayWrapper = diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala index 359e7682fd..1c609128bc 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala @@ -198,6 +198,9 @@ object WalletScanLogic extends ScorexLogging { val scanRes = ScanResults(myOutputs, Seq.empty, Seq.empty) + /** Pass subtreeId as blockId; set height to 0 so when UTXO set scan is finished normal wallet scan + * will start with the first non-pruned block (see [[ErgoWalletState.expectedNextBlockHeight]]) + */ updateRegistryAndOffchain(registry, offChainRegistry, outputsFilter, scanRes, subtreeId, 0) }