Skip to content

Commit

Permalink
Merge pull request #2098 from ergoplatform/rocksdb
Browse files Browse the repository at this point in the history
Migrate to RocksDB
  • Loading branch information
kushti authored Apr 9, 2024
2 parents 5f2ed85 + 9ef6f9d commit 49df825
Show file tree
Hide file tree
Showing 53 changed files with 344 additions and 504 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import java.util.concurrent.TimeUnit

import org.openjdk.jmh.annotations._
import org.slf4j.LoggerFactory
import scorex.crypto.authds.avltree.batch.{Operation, PersistentBatchAVLProver, VersionedLDBAVLStorage}
import scorex.crypto.authds.avltree.batch.{Operation, PersistentBatchAVLProver, VersionedRocksDBAVLStorage}
import scorex.crypto.hash.{Blake2b256, Digest32}
import scorex.db.LDBVersionedStore
import scorex.db.RocksDBVersionedStore

object AVLTreeBatchPerformance extends {

Expand All @@ -20,8 +20,8 @@ object AVLTreeBatchPerformance extends {

val logger = LoggerFactory.getLogger("TEST")
var prover: Prover = _
var store: LDBVersionedStore = _
var storage: VersionedLDBAVLStorage = _
var store: RocksDBVersionedStore = _
var storage: VersionedRocksDBAVLStorage = _
var operations: Array[Operation] = _

@Setup(Level.Iteration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.google.common.primitives.Longs
import scorex.crypto.authds.avltree.batch._
import scorex.crypto.authds.{ADKey, ADValue}
import scorex.crypto.hash.{Blake2b256, Digest32}
import scorex.db.LDBVersionedStore
import scorex.db.RocksDBVersionedStore
import scorex.utils.Random

object Helper {
Expand Down Expand Up @@ -34,11 +34,11 @@ object Helper {
}

def persistentProverWithVersionedStore(initialKeepVersions: Int,
baseOperationsCount: Int = 0): (Prover, LDBVersionedStore, VersionedLDBAVLStorage) = {
baseOperationsCount: Int = 0): (Prover, RocksDBVersionedStore, VersionedRocksDBAVLStorage) = {
val dir = java.nio.file.Files.createTempDirectory("bench_testing_" + scala.util.Random.alphanumeric.take(15)).toFile
dir.deleteOnExit()
val store = new LDBVersionedStore(dir, initialKeepVersions = initialKeepVersions)
val storage = new VersionedLDBAVLStorage(store)
val store = new RocksDBVersionedStore(dir, initialKeepVersions = initialKeepVersions)
val storage = new VersionedRocksDBAVLStorage(store)
require(storage.isEmpty)
val prover = new BatchAVLProver[Digest32, HF](kl, Some(vl))

Expand Down
2 changes: 1 addition & 1 deletion avldb/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ libraryDependencies ++= Seq(
"org.scalacheck" %% "scalacheck" % "1.14.3" % "test",
"org.scalatestplus" %% "scalatestplus-scalacheck" % "3.1.0.0-RC2" % Test,
"com.storm-enroute" %% "scalameter" % Versions.scalameter(scalaVersion.value) % "test",
"org.ethereum" % "leveldbjni-all" % "1.18.3",
"org.rocksdb" % "rocksdbjni" % "8.9.1",
"org.typelevel" %% "spire" % Versions.spire(scalaVersion.value)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package org.ergoplatform.serialization

import scorex.crypto.authds.avltree.batch.Constants.DigestType
import scorex.crypto.authds.avltree.batch.serialization.{BatchAVLProverManifest, ProxyInternalNode}
import scorex.crypto.authds.avltree.batch.{InternalProverNode, ProverLeaf, ProverNodes, VersionedLDBAVLStorage}
import scorex.crypto.authds.avltree.batch.{InternalProverNode, ProverLeaf, ProverNodes, VersionedRocksDBAVLStorage}
import scorex.util.serialization.{Reader, Writer}

/**
* Serializer of manifest, a tree which is cut at some `manifestDepth` from root
*/
class ManifestSerializer(manifestDepth: Byte) extends ErgoSerializer[BatchAVLProverManifest[DigestType]] {
private val nodeSerializer = VersionedLDBAVLStorage.noStoreSerializer
private val nodeSerializer = VersionedRocksDBAVLStorage.noStoreSerializer

/**
* Serialize manifest provided as top subtree and height separately. Used in tests.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.ergoplatform.serialization

import scorex.crypto.authds.avltree.batch.Constants.DigestType
import scorex.crypto.authds.avltree.batch.{InternalProverNode, ProverLeaf, ProverNodes, VersionedLDBAVLStorage}
import scorex.crypto.authds.avltree.batch.{InternalProverNode, ProverLeaf, ProverNodes, VersionedRocksDBAVLStorage}
import scorex.crypto.authds.avltree.batch.serialization.{BatchAVLProverSubtree, ProxyInternalNode}
import scorex.util.ScorexLogging
import scorex.util.serialization.{Reader, Writer}
Expand All @@ -10,7 +10,7 @@ import scorex.util.serialization.{Reader, Writer}
* Serializer for subtree
*/
object SubtreeSerializer extends ErgoSerializer[BatchAVLProverSubtree[DigestType]] with ScorexLogging {
private val nodeSerializer = VersionedLDBAVLStorage.noStoreSerializer
private val nodeSerializer = VersionedRocksDBAVLStorage.noStoreSerializer

override def serialize(subtree: BatchAVLProverSubtree[DigestType], w: Writer): Unit = {
def loop(node: ProverNodes[DigestType]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import scorex.crypto.authds.{ADKey, ADValue, Balance}
import scorex.crypto.authds.avltree.batch.Constants.{DigestType, hashFn}
import scorex.crypto.authds.avltree.batch.serialization.ProxyInternalNode
import scorex.crypto.hash.Digest32
import scorex.db.LDBVersionedStore
import scorex.db.RocksDBVersionedStore
import scorex.util.serialization.{Reader, Writer}

/**
Expand All @@ -15,9 +15,9 @@ import scorex.util.serialization.{Reader, Writer}
*
* @param store - store which could be provided to fetch children of a node when a child is requested
*/
class ProverNodeSerializer(store: LDBVersionedStore) extends ErgoSerializer[ProverNodes[DigestType]] {
class ProverNodeSerializer(store: RocksDBVersionedStore) extends ErgoSerializer[ProverNodes[DigestType]] {

import VersionedLDBAVLStorage.{InternalNodePrefix,LeafPrefix}
import VersionedRocksDBAVLStorage.{InternalNodePrefix,LeafPrefix}

override def serialize(node: ProverNodes[DigestType], w: Writer): Unit = {
node match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package scorex.crypto.authds.avltree.batch

import scorex.crypto.authds.{ADKey, Balance}
import scorex.db.LDBVersionedStore
import scorex.db.RocksDBVersionedStore
import InternalNode.InternalNodePrefix
import scorex.crypto.authds.avltree.batch.Constants.{DigestType, hashFn}

Expand All @@ -15,7 +15,7 @@ class ProxyInternalProverNode(protected var pk: ADKey,
val leftLabel: ADKey,
val rightLabel: ADKey,
protected var pb: Balance = Balance @@ 0.toByte)
(store: LDBVersionedStore)
(store: RocksDBVersionedStore)
extends InternalProverNode(k = pk, l = null, r = null, b = pb)(hashFn) {

override protected def computeLabel: DigestType = {
Expand All @@ -24,14 +24,14 @@ class ProxyInternalProverNode(protected var pk: ADKey,

override def left: ProverNodes[DigestType] = {
if (l == null) {
l = VersionedLDBAVLStorage.fetch(leftLabel)(store)
l = VersionedRocksDBAVLStorage.fetch(leftLabel)(store)
}
l
}

override def right: ProverNodes[DigestType] = {
if (r == null) {
r = VersionedLDBAVLStorage.fetch(rightLabel)(store)
r = VersionedRocksDBAVLStorage.fetch(rightLabel)(store)
}
r
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package scorex.crypto.authds.avltree.batch

import com.google.common.primitives.Ints
import scorex.crypto.authds.avltree.batch.Constants.{DigestType, HashFnType, hashFn}
import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{topNodeHashKey, topNodeHeightKey}
import scorex.crypto.authds.avltree.batch.VersionedRocksDBAVLStorage.{topNodeHashKey, topNodeHeightKey}
import scorex.crypto.authds.avltree.batch.serialization.{BatchAVLProverManifest, BatchAVLProverSubtree, ProxyInternalNode}
import scorex.crypto.authds.{ADDigest, ADKey}
import scorex.util.encode.Base16
import scorex.crypto.hash
import scorex.crypto.hash.Digest32
import scorex.db.{LDBKVStore, LDBVersionedStore}
import scorex.db.{RocksDBKVStore, RocksDBVersionedStore}
import scorex.util.ScorexLogging

import scala.collection.mutable
Expand All @@ -19,13 +19,13 @@ import scala.util.{Failure, Try}
*
* @param store - level db storage to save the tree in
*/
class VersionedLDBAVLStorage(store: LDBVersionedStore)
class VersionedRocksDBAVLStorage(store: RocksDBVersionedStore)
extends VersionedAVLStorage[DigestType] with ScorexLogging {

import VersionedLDBAVLStorage.nodeLabel
import VersionedRocksDBAVLStorage.nodeLabel

private def restorePrunedRootNode(): Try[(ProverNodes[DigestType], Int)] = Try {
val rootNode = VersionedLDBAVLStorage.fetch(ADKey @@ store.get(topNodeHashKey).get)(store)
val rootNode = VersionedRocksDBAVLStorage.fetch(ADKey @@ store.get(topNodeHashKey).get)(store)
val rootHeight = Ints.fromByteArray(store.get(topNodeHeightKey).get)

rootNode -> rootHeight
Expand Down Expand Up @@ -73,7 +73,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore)
isTop: Boolean): Array[(Array[Byte], Array[Byte])] = {
// Should always serialize top node. It may not be new if it is the creation of the tree
if (node.isNew || isTop) {
val pair: (Array[Byte], Array[Byte]) = (nodeLabel(node), VersionedLDBAVLStorage.noStoreSerializer.toBytes(node))
val pair: (Array[Byte], Array[Byte]) = (nodeLabel(node), VersionedRocksDBAVLStorage.noStoreSerializer.toBytes(node))
node match {
case n: InternalProverNode[DigestType] =>
val leftSubtree = serializedVisitedNodes(n.left, isTop = false)
Expand All @@ -98,13 +98,13 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore)
* @param expectedRootHash - expected UTXO set authenticating tree root hash
* @return - hash of root node of tree, or failure if an error (e.g. in database) happened
*/
def dumpSnapshot(dumpStorage: LDBKVStore, manifestDepth: Byte, expectedRootHash: Array[Byte]): Try[Array[Byte]] = {
def dumpSnapshot(dumpStorage: RocksDBKVStore, manifestDepth: Byte, expectedRootHash: Array[Byte]): Try[Array[Byte]] = {
store.processSnapshot { dbReader =>

def subtreeLoop(label: DigestType, builder: mutable.ArrayBuilder[Byte]): Unit = {
val nodeBytes = dbReader.get(label)
builder ++= nodeBytes
val node = VersionedLDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes)
val node = VersionedRocksDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes)
node match {
case in: ProxyInternalNode[DigestType] =>
subtreeLoop(Digest32 @@@ in.leftLabel, builder)
Expand All @@ -123,7 +123,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore)
def manifestLoop(nodeDbKey: Array[Byte], level: Int, manifestBuilder: mutable.ArrayBuilder[Byte]): Unit = {
val nodeBytes = dbReader.get(nodeDbKey)
manifestBuilder ++= nodeBytes
val node = VersionedLDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes)
val node = VersionedRocksDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes)
node match {
case in: ProxyInternalNode[DigestType] if level == manifestDepth =>
dumpSubtree(Digest32 @@@ in.leftLabel)
Expand Down Expand Up @@ -156,7 +156,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore)
}


object VersionedLDBAVLStorage {
object VersionedRocksDBAVLStorage {

private[batch] val topNodeHashKey: Array[Byte] = Array.fill(StateTreeParameters.labelSize)(123: Byte)
private[batch] val topNodeHeightKey: Array[Byte] = Array.fill(StateTreeParameters.labelSize)(124: Byte)
Expand All @@ -179,7 +179,7 @@ object VersionedLDBAVLStorage {
* @return node read from the database (or throws exception if there is no such node), in case of internal node it
* returns pruned internal node, so a node where left and right children not stored, only their hashes
*/
def fetch(dbKey: ADKey)(store: LDBVersionedStore): ProverNodes[DigestType] = {
def fetch(dbKey: ADKey)(store: RocksDBVersionedStore): ProverNodes[DigestType] = {
val bytes = store(dbKey)
val node = new ProverNodeSerializer(store).parseBytes(bytes)
node.isNew = false
Expand All @@ -202,7 +202,7 @@ object VersionedLDBAVLStorage {
def recreate(manifest: BatchAVLProverManifest[DigestType],
chunks: Iterator[BatchAVLProverSubtree[DigestType]],
additionalData: Iterator[(Array[Byte], Array[Byte])],
store: LDBVersionedStore): Try[VersionedLDBAVLStorage] = {
store: RocksDBVersionedStore): Try[VersionedRocksDBAVLStorage] = {
//todo: the function below copy-pasted from BatchAVLProver, eliminate boilerplate?

def idCollector(node: ProverNodes[DigestType]): Iterator[(Array[Byte], Array[Byte])] = {
Expand All @@ -219,12 +219,12 @@ object VersionedLDBAVLStorage {

val rootNode = manifest.root
val rootNodeHeight = manifest.rootHeight
val digestWrapper = VersionedLDBAVLStorage.digest(rootNode.label, rootNodeHeight)
val digestWrapper = VersionedRocksDBAVLStorage.digest(rootNode.label, rootNodeHeight)
val indices = Iterator(topNodeHashKey -> nodeLabel(rootNode), topNodeHeightKey -> Ints.toByteArray(rootNodeHeight))
val nodesIterator = idCollector(manifest.root) ++
chunks.flatMap(subtree => idCollector(subtree.subtreeTop))
store.update(digestWrapper, toRemove = Nil, toUpdate = indices ++ nodesIterator ++ additionalData).map { _ =>
new VersionedLDBAVLStorage(store)
new VersionedRocksDBAVLStorage(store)
}
}

Expand Down
60 changes: 33 additions & 27 deletions avldb/src/main/scala/scorex/db/KVStoreReader.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package scorex.db

import java.util.concurrent.locks.ReentrantReadWriteLock

import org.iq80.leveldb.{DB, ReadOptions}
import org.rocksdb.ReadOptions
import scorex.db.RocksDBFactory.RegisteredDB

import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.mutable

/**
Expand All @@ -15,7 +15,7 @@ trait KVStoreReader extends AutoCloseable {
type K = Array[Byte]
type V = Array[Byte]

protected val db: DB
protected val db: RegisteredDB

protected val lock = new ReentrantReadWriteLock()

Expand All @@ -33,38 +33,43 @@ trait KVStoreReader extends AutoCloseable {
}
}

/**
* Query if database contains key
* @param key - key
* @return true if key exists, false otherwise
*/
def contains(key: K): Boolean = {
lock.readLock().lock()
try {
db.contains(key)
} finally {
lock.readLock().unlock()
}
}

/**
* Iterate through the database to read elements according to a filter function.
* @param cond - the filter function
* @return iterator over elements satisfying the filter function
* Read all the database elements.
* @return iterator over database contents
*/
def getWithFilter(cond: (K, V) => Boolean): Iterator[(K, V)] = {
def getAll: Iterator[(K, V)] = {
val ro = new ReadOptions()
ro.snapshot(db.getSnapshot)
ro.setSnapshot(db.getSnapshot)
val iter = db.iterator(ro)
try {
iter.seekToFirst()
val bf = mutable.ArrayBuffer.empty[(K, V)]
while (iter.hasNext) {
val next = iter.next()
val key = next.getKey
val value = next.getValue
if (cond(key, value)) bf += (key -> value)
while (iter.isValid) {
bf += (iter.key() -> iter.value())
iter.next()
}
bf.toIterator
} finally {
iter.close()
ro.snapshot().close()
db.releaseSnapshot(ro.snapshot())
ro.close()
}
}

/**
* Read all the database elements.
* @return iterator over database contents
*/
def getAll: Iterator[(K, V)] = getWithFilter((_, _) => true)

/** Returns value associated with the key, or default value from user
*/
def getOrElse(key: K, default: => V): V =
Expand Down Expand Up @@ -97,23 +102,24 @@ trait KVStoreReader extends AutoCloseable {
*/
def getRange(start: K, end: K, limit: Int = Int.MaxValue): Array[(K, V)] = {
val ro = new ReadOptions()
ro.snapshot(db.getSnapshot)
ro.setSnapshot(db.getSnapshot)
val iter = db.iterator(ro)
try {
iter.seek(start)
val bf = mutable.ArrayBuffer.empty[(K, V)]
var elemCounter = 0
while (iter.hasNext && elemCounter < limit) {
val next = iter.next()
if(ByteArrayUtils.compare(next.getKey, end) <= 0) {
while (iter.isValid && elemCounter < limit) {
if(ByteArrayUtils.compare(iter.key(), end) <= 0) {
elemCounter += 1
bf += (next.getKey -> next.getValue)
bf += (iter.key() -> iter.value())
} else elemCounter = limit // break
iter.next()
}
bf.toArray[(K,V)]
} finally {
iter.close()
ro.snapshot().close()
db.releaseSnapshot(ro.snapshot())
ro.close()
}
}

Expand Down
Loading

0 comments on commit 49df825

Please sign in to comment.