Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utxo set scanner #2072

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package scorex.crypto.authds.avltree.batch

import com.google.common.primitives.Ints
import scorex.crypto.authds.avltree.batch.Constants.{DigestType, HashFnType, hashFn}
import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{topNodeHashKey, topNodeHeightKey}
import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{noStoreSerializer, topNodeHashKey, topNodeHeightKey}
import scorex.crypto.authds.avltree.batch.serialization.{BatchAVLProverManifest, BatchAVLProverSubtree, ProxyInternalNode}
import scorex.crypto.authds.{ADDigest, ADKey}
import scorex.util.encode.Base16
Expand Down Expand Up @@ -104,7 +104,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore)
def subtreeLoop(label: DigestType, builder: mutable.ArrayBuilder[Byte]): Unit = {
val nodeBytes = dbReader.get(label)
builder ++= nodeBytes
val node = VersionedLDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes)
val node = noStoreSerializer.parseBytes(nodeBytes)
node match {
case in: ProxyInternalNode[DigestType] =>
subtreeLoop(Digest32 @@@ in.leftLabel, builder)
Expand All @@ -123,7 +123,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore)
def manifestLoop(nodeDbKey: Array[Byte], level: Int, manifestBuilder: mutable.ArrayBuilder[Byte]): Unit = {
val nodeBytes = dbReader.get(nodeDbKey)
manifestBuilder ++= nodeBytes
val node = VersionedLDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes)
val node = noStoreSerializer.parseBytes(nodeBytes)
node match {
case in: ProxyInternalNode[DigestType] if level == manifestDepth =>
dumpSubtree(Digest32 @@@ in.leftLabel)
Expand Down Expand Up @@ -153,6 +153,44 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore)
rootNodeLabel
}
}

/**
* Split the AVL+ tree to 2^depth number of subtrees and process them
* @param fromIndex - only start processing subtrees from this index
* @param depth - depth at whitch to split AVL+ tree to subtrees
* @param handleSubtree - function to process subtree
*/
def iterateAVLTree(fromIndex: Int, depth: Int)(handleSubtree: BatchAVLProverSubtree[DigestType] => Unit): Unit =
store.processSnapshot { dbReader =>

var current: Int = 0

def subtree(sid: Array[Byte]): BatchAVLProverSubtree[DigestType] = {
def loop(label: Array[Byte]): ProverNodes[DigestType] =
(noStoreSerializer.parseBytes(dbReader.get(label)): @unchecked) match {
case leaf: ProverLeaf[DigestType] => leaf
case i: ProxyInternalNode[DigestType] =>
i.getNew(loop(i.leftLabel), loop(i.rightLabel))
}
new BatchAVLProverSubtree[DigestType](loop(sid))
}

def proxyLoop(label: Array[Byte], level: Int): Unit =
noStoreSerializer.parseBytes(dbReader.get(label)) match {
case in: ProxyInternalNode[DigestType] if level == depth =>
if(current >= fromIndex) handleSubtree(subtree(in.leftLabel))
current += 1
if(current >= fromIndex) handleSubtree(subtree(in.rightLabel))
current += 1
case in: ProxyInternalNode[DigestType] =>
proxyLoop(in.leftLabel, level + 1)
proxyLoop(in.rightLabel, level + 1)
case _ =>
}

proxyLoop(dbReader.get(topNodeHashKey), 1)

}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import scorex.crypto.authds.{ADDigest, ADKey, ADValue, SerializedAdProof}
import scorex.util.encode.Base16
import scorex.crypto.hash.{Blake2b256, Digest32}
import scorex.db.{LDBFactory, LDBVersionedStore}
import scorex.util.ByteArrayBuilder
import scorex.util.{ByteArrayBuilder, ModifierId, bytesToId}
import scorex.util.serialization.VLQByteBufferWriter
import scorex.utils.{Random => RandomBytes}

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
Expand Down Expand Up @@ -370,4 +371,22 @@ class VersionedLDBAVLStorageSpecification
}
}

property("iterate AVL tree") {
val prover = createPersistentProver()
val current = 11
val depth = 6
blockchainWorkflowTest(prover)

val chunkBuffer: ArrayBuffer[(ModifierId,Array[Array[Byte]])] = ArrayBuffer.empty[(ModifierId,Array[Array[Byte]])]

prover.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree(current, depth) { subtree =>
chunkBuffer += ((
bytesToId(subtree.id),
subtree.leafValues.toArray
))
}

chunkBuffer.size shouldBe math.pow(2, depth) - current
}

}
4 changes: 3 additions & 1 deletion src/main/scala/org/ergoplatform/ErgoApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.ergoplatform.local._
import org.ergoplatform.mining.ErgoMiner
import org.ergoplatform.mining.ErgoMiner.StartMining
import org.ergoplatform.network.{ErgoNodeViewSynchronizer, ErgoSyncTracker}
import org.ergoplatform.nodeView.history.ErgoSyncInfoMessageSpec
import org.ergoplatform.nodeView.history.{ErgoSyncInfoMessageSpec, UtxoSetScanner}
import org.ergoplatform.nodeView.history.extra.ExtraIndexer
import org.ergoplatform.nodeView.{ErgoNodeViewRef, ErgoReadersHolderRef}
import org.ergoplatform.settings.{Args, ErgoSettings, ErgoSettingsReader, NetworkType, ScorexSettings}
Expand Down Expand Up @@ -115,6 +115,8 @@ class ErgoApp(args: Args) extends ScorexLogging {
None
}

UtxoSetScanner(nodeViewHolderRef)

private val syncTracker = ErgoSyncTracker(scorexSettings.network)

private val deliveryTracker: DeliveryTracker = DeliveryTracker.empty(ergoSettings)
Expand Down
25 changes: 23 additions & 2 deletions src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ import spire.syntax.all.cfor

import java.io.File
import org.ergoplatform.modifiers.history.extension.Extension
import org.ergoplatform.nodeView.history.UtxoSetScanner.StartUtxoSetScan

import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{Duration, SECONDS}
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -449,8 +453,22 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti
}
}

//todo: update state in async way?
/**
* Whether to send blocks to wallet to scan. (Utxo scan keeps wallet height at 0)
* @return true if utxoBootstrap is not enabled; if it is enabled, then walletHeight > 0
*/
private def shouldScanBlocks: Boolean =
if(settings.nodeSettings.utxoSettings.utxoBootstrap) {
try {
Await.result(vault().getWalletStatus.map(_.height), Duration(3, SECONDS)) > 0
}catch {
case _: Throwable => false
}
}else {
true
}

//todo: update state in async way?
/**
* Remote and local persistent modifiers need to be appended to history, applied to state
* which also needs to be git propagated to mempool and wallet
Expand Down Expand Up @@ -507,7 +525,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti
v
}

if (almostSynced) {
if (almostSynced && shouldScanBlocks) {
blocksApplied.foreach(newVault.scanPersistent)
}

Expand All @@ -518,6 +536,9 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti
case utxoStateReader: UtxoStateReader if headersHeight == fullBlockHeight =>
val recheckCommand = RecheckMempool(utxoStateReader, newMemPool)
context.system.eventStream.publish(recheckCommand)
if(!shouldScanBlocks) {
context.system.eventStream.publish(StartUtxoSetScan(false))
}
case _ =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.ergoplatform.mining.AutolykosPowScheme
import org.ergoplatform.modifiers.history._
import org.ergoplatform.modifiers.history.header.{Header, PreGenesisHeader}
import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock, NonHeaderBlockSection}
import org.ergoplatform.nodeView.history.UtxoSetScanner.InitializeUtxoSetScannerWithHistory
import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.StartExtraIndexer
import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{IndexedHeightKey, NewestVersion, NewestVersionBytes, SchemaVersionKey, getIndex}
import org.ergoplatform.nodeView.history.storage.HistoryStorage
Expand Down Expand Up @@ -295,8 +296,17 @@ object ErgoHistory extends ScorexLogging {
repairIfNeeded(history)

log.info("History database read")
if(ergoSettings.nodeSettings.extraIndex) // start extra indexer, if enabled

// start extra indexer, if enabled
if(ergoSettings.nodeSettings.extraIndex) {
context.system.eventStream.publish(StartExtraIndexer(history))
}

// set history for utxo set scanner, if bootstrapping by snapshot
if(ergoSettings.nodeSettings.utxoSettings.utxoBootstrap) {
context.system.eventStream.publish(InitializeUtxoSetScannerWithHistory(history))
}

history
}

Expand Down
171 changes: 171 additions & 0 deletions src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package org.ergoplatform.nodeView.history

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import org.ergoplatform.ErgoBox
import org.ergoplatform.modifiers.BlockSection
import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.GetDataFromCurrentView
import org.ergoplatform.nodeView.history.UtxoSetScanner._
import org.ergoplatform.nodeView.history.storage.HistoryStorage
import org.ergoplatform.nodeView.state.UtxoState
import org.ergoplatform.nodeView.wallet.ErgoWallet
import org.ergoplatform.nodeView.wallet.ErgoWalletActorMessages.ScanBoxesFromUtxoSnapshot
import org.ergoplatform.serialization.ManifestSerializer
import org.ergoplatform.serialization.ManifestSerializer.MainnetManifestDepth
import org.ergoplatform.wallet.boxes.ErgoBoxSerializer
import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage
import scorex.crypto.hash.Blake2b256
import scorex.db.ByteArrayWrapper
import scorex.util.{ModifierId, ScorexLogging, bytesToId}

import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration

/**
* This class is used to provide the current UTXO set for wallet scans when bootstrapping
* by UTXO set snapshot. This is done by creating a snapshot of the UTXO set, deserializing
* the raw bytes to ErgoBoxes and sending them to the wallet actor in chunks.
* @param nodeView - NodeView actor to get wallet and UTXOs from
*/
class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging {

private var history: ErgoHistory = _
private def historyStorage: HistoryStorage = history.historyStorage

private implicit val timeout: Timeout = Timeout(10, TimeUnit.SECONDS)
private implicit val duration: Duration = Duration.create(10, TimeUnit.SECONDS)

/**
* Internal buffer that holds deserialized AVL subtrees until they are sent to wallet
*/
private val chunkBuffer: ArrayBuffer[(ModifierId,Array[ErgoBox])] = ArrayBuffer.empty[(ModifierId,Array[ErgoBox])]

/**
* Reads the current progress of the scanner.
* @return (current segment, total segments)
*/
private def readProgress(): (Int, Int) =
historyStorage.getIndex(utxoSetScanProgressKey).map(ByteBuffer.wrap).map { buffer =>
val current = buffer.getInt
val total = buffer.getInt
(current, total)
}.getOrElse((0, 0))

/**
* Writes progress to db.
* @param current - current retrieved segment
* @param total - total segment count
*/
private def writeProgress(current: Int, total: Int): Unit = {
val buffer: ByteBuffer = ByteBuffer.allocate(8)
buffer.putInt(current)
buffer.putInt(total)
historyStorage.insert(Array((utxoSetScanProgressKey, buffer.array)), Array.empty[BlockSection])
}

/**
* Send deserialized AVL subtrees to wallet for scanning.
* @param wallet - wallet to send to
* @param current - current retrieved segment
*/
private def sendBufferToWallet(wallet: ErgoWallet, current: Int): Unit = {
wallet.scanUtxoSnapshot(ScanBoxesFromUtxoSnapshot(chunkBuffer, current, MainnetTotal))
writeProgress(current, MainnetTotal)
chunkBuffer.clear()
}

private def run(): Unit = {

var (current, total) = readProgress()
if(total == 0 || // scan should not start yet, still syncing
current == MainnetTotal) // scan already done
return

val (state, wallet) = Await.result(
(nodeView ? GetDataFromCurrentView[UtxoState, (UtxoState, ErgoWallet)](x => (x.state, x.vault)))
.mapTo[(UtxoState, ErgoWallet)],
duration
)

val initialized: Boolean = Await.result(wallet.getWalletStatus.map(_.initialized), duration)
if(!initialized) // wallet is not initialized
return

log.info(s"Starting UTXO set snapshot scan for $total chunks")

state.persistentProver.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree(current, MainnetManifestDepth) { subtree =>
current += 1

chunkBuffer += ((
bytesToId(subtree.id),
subtree.leafValues.par.flatMap(ErgoBoxSerializer.parseBytesTry(_).toOption).toArray
))

if(chunkBuffer.size == 32) {
sendBufferToWallet(wallet, current)
}
}

// flush remaining data, if any
if(chunkBuffer.nonEmpty) {
sendBufferToWallet(wallet, current)
}

if(current == total) {
log.info(s"Successfully scanned $total Utxo set subtrees")
// send newest block to wallet, if blocks were applied since scan began it will go back to scan them
wallet.scanPersistent(history.bestFullBlockOpt.get)
}else {
log.error(s"Inconsistent Utxo set scan state: $current scanned subtrees out of $total")
}

}

override def receive: Receive = {
case InitializeUtxoSetScannerWithHistory(history: ErgoHistory) =>
this.history = history
run()
case StartUtxoSetScan(rescan: Boolean) =>
if(readProgress()._1 == 0 || //
rescan) // start over UTXO set scan
writeProgress(0, MainnetTotal)
run()
}

override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[InitializeUtxoSetScannerWithHistory])
context.system.eventStream.subscribe(self, classOf[StartUtxoSetScan])
}

}

object UtxoSetScanner {

/**
* Initialize UTXO set scanner with database and try continuing scan if possible
* @param history - database handle
*/
case class InitializeUtxoSetScannerWithHistory(history: ErgoHistory)

/**
* Start scanning UTXO set, or continue if the scan was interrupted, or start over if rescan = true
* @param rescan - whether to start over or just continue scan
*/
case class StartUtxoSetScan(rescan: Boolean)

/**
* Number of subtrees to divide AVL tree to
*/
private val MainnetTotal: Int = math.pow(2, ManifestSerializer.MainnetManifestDepth).toInt

private val utxoSetScanProgressKey: ByteArrayWrapper =
ByteArrayWrapper(Blake2b256.hash("scanned chunk"))

def apply(nodeView: ActorRef)(implicit system: ActorSystem): ActorRef =
system.actorOf(Props.create(classOf[UtxoSetScanner], nodeView))
}
Loading
Loading