Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to RocksDB #2098

Merged
merged 14 commits into from
Apr 9, 2024
2 changes: 1 addition & 1 deletion avldb/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ libraryDependencies ++= Seq(
"org.scalacheck" %% "scalacheck" % "1.14.3" % "test",
"org.scalatestplus" %% "scalatestplus-scalacheck" % "3.1.0.0-RC2" % Test,
"com.storm-enroute" %% "scalameter" % Versions.scalameter(scalaVersion.value) % "test",
"org.ethereum" % "leveldbjni-all" % "1.18.3",
"org.rocksdb" % "rocksdbjni" % "8.9.1",
"org.typelevel" %% "spire" % Versions.spire(scalaVersion.value)
)

Expand Down
60 changes: 33 additions & 27 deletions avldb/src/main/scala/scorex/db/KVStoreReader.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package scorex.db

import java.util.concurrent.locks.ReentrantReadWriteLock

import org.iq80.leveldb.{DB, ReadOptions}
import org.rocksdb.ReadOptions
import scorex.db.LDBFactory.RegisteredDB

import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.mutable

/**
Expand All @@ -15,7 +15,7 @@ trait KVStoreReader extends AutoCloseable {
type K = Array[Byte]
type V = Array[Byte]

protected val db: DB
protected val db: RegisteredDB

protected val lock = new ReentrantReadWriteLock()

Expand All @@ -33,38 +33,43 @@ trait KVStoreReader extends AutoCloseable {
}
}

/**
* Query if database contains key
* @param key - key
* @return true if key exists, false otherwise
*/
def contains(key: K): Boolean = {
lock.readLock().lock()
try {
db.contains(key)
} finally {
lock.readLock().unlock()
}
}

/**
* Iterate through the database to read elements according to a filter function.
* @param cond - the filter function
* @return iterator over elements satisfying the filter function
* Read all the database elements.
* @return iterator over database contents
*/
def getWithFilter(cond: (K, V) => Boolean): Iterator[(K, V)] = {
def getAll: Iterator[(K, V)] = {
val ro = new ReadOptions()
ro.snapshot(db.getSnapshot)
ro.setSnapshot(db.getSnapshot)
val iter = db.iterator(ro)
try {
iter.seekToFirst()
val bf = mutable.ArrayBuffer.empty[(K, V)]
while (iter.hasNext) {
val next = iter.next()
val key = next.getKey
val value = next.getValue
if (cond(key, value)) bf += (key -> value)
while (iter.isValid) {
bf += (iter.key() -> iter.value())
iter.next()
}
bf.toIterator
} finally {
iter.close()
ro.snapshot().close()
db.releaseSnapshot(ro.snapshot())
ro.close()
}
}

/**
* Read all the database elements.
* @return iterator over database contents
*/
def getAll: Iterator[(K, V)] = getWithFilter((_, _) => true)

/** Returns value associated with the key, or default value from user
*/
def getOrElse(key: K, default: => V): V =
Expand Down Expand Up @@ -97,23 +102,24 @@ trait KVStoreReader extends AutoCloseable {
*/
def getRange(start: K, end: K, limit: Int = Int.MaxValue): Array[(K, V)] = {
val ro = new ReadOptions()
ro.snapshot(db.getSnapshot)
ro.setSnapshot(db.getSnapshot)
val iter = db.iterator(ro)
try {
iter.seek(start)
val bf = mutable.ArrayBuffer.empty[(K, V)]
var elemCounter = 0
while (iter.hasNext && elemCounter < limit) {
val next = iter.next()
if(ByteArrayUtils.compare(next.getKey, end) <= 0) {
while (iter.isValid && elemCounter < limit) {
if(ByteArrayUtils.compare(iter.key(), end) <= 0) {
elemCounter += 1
bf += (next.getKey -> next.getValue)
bf += (iter.key() -> iter.value())
} else elemCounter = limit // break
iter.next()
}
bf.toArray[(K,V)]
} finally {
iter.close()
ro.snapshot().close()
db.releaseSnapshot(ro.snapshot())
ro.close()
}
}

Expand Down
203 changes: 58 additions & 145 deletions avldb/src/main/scala/scorex/db/LDBFactory.scala
Original file line number Diff line number Diff line change
@@ -1,92 +1,81 @@
package scorex.db

import java.io.File
import java.util.concurrent.locks.ReentrantReadWriteLock
import org.iq80.leveldb.{DB, DBFactory, DBIterator, Options, Range, ReadOptions, Snapshot, WriteBatch, WriteOptions}
import org.rocksdb._
import org.rocksdb.util.SizeUnit
import scorex.util.ScorexLogging

import java.io.File
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.mutable

/**
* Registry of opened LevelDB instances.
* LevelDB prohibit access to the same storage file from more than one DB instance.
* Registry of opened RocksDB instances.
* RocksDB prohibit access to the same storage file from more than one DB instance.
* And ergo application (mostly tests) quite frequently doesn't not explicitly close
* database and tries to reopen it.
*/
case class StoreRegistry(factory: DBFactory) extends DBFactory with ScorexLogging {

val lock = new ReentrantReadWriteLock()
val map = new mutable.HashMap[File, RegisteredDB]

/**
* Decorator of LevelDB DB class which overrides close() methods and unlinks database from registry on close.
* So if database was not explicitly closed, then next attempt to open database with the same path will
* return existed instance instead of creating new one.
*/
case class RegisteredDB(impl: DB, path: File) extends DB {

def get(key: Array[Byte]): Array[Byte] = impl.get(key)

def get(key: Array[Byte], options: ReadOptions): Array[Byte] = impl.get(key, options)
object LDBFactory extends ScorexLogging {
jellymlg marked this conversation as resolved.
Show resolved Hide resolved

def iterator: DBIterator = impl.iterator
RocksDB.loadLibrary()

def iterator(options: ReadOptions): DBIterator = impl.iterator(options)
private val lock = new ReentrantReadWriteLock()
private val map = new mutable.HashMap[File, RegisteredDB]

/**
* Decorator of RocksDB class which overrides close() methods and unlinks database from registry on close.
* So if database was not explicitly closed, then next attempt to open database with the same path will
* return existed instance instead of creating new one.
*/
case class RegisteredDB(impl: RocksDB, path: File) {
val open: AtomicBoolean = new AtomicBoolean(true)
def get(key: Array[Byte]): Array[Byte] = {
if(open.get())
impl.get(key)
else
null
}
def get(options: ReadOptions, key: Array[Byte]): Array[Byte] = impl.get(options, key)
def contains(key: Array[Byte]): Boolean = impl.keyExists(key)
def iterator: RocksIterator = impl.newIterator()
def iterator(options: ReadOptions): RocksIterator = impl.newIterator(options)
def put(key: Array[Byte], value: Array[Byte]): Unit = impl.put(key, value)

def delete(key: Array[Byte]): Unit = impl.delete(key)

def write(batch: WriteBatch): Unit = impl.write(batch)

def write(batch: WriteBatch, options: WriteOptions): Snapshot = impl.write(batch, options)

def createWriteBatch: WriteBatch = impl.createWriteBatch()

def put(key: Array[Byte], value: Array[Byte], options: WriteOptions): Snapshot = impl.put(key, value, options)

def delete(key: Array[Byte], options: WriteOptions): Snapshot = impl.delete(key, options)

def write(options: WriteOptions, batch: WriteBatch): Unit = impl.write(options, batch)
def getSnapshot: Snapshot = impl.getSnapshot

def getApproximateSizes(ranges: Range*): Array[Long] = impl.getApproximateSizes(ranges: _*)

def getProperty(name: String): String = impl.getProperty(name)

def suspendCompactions(): Unit = impl.suspendCompactions()

def resumeCompactions(): Unit = impl.resumeCompactions()

def compactRange(begin: Array[Byte], end: Array[Byte]): Unit = impl.compactRange(begin, end)

override def close(): Unit = {
remove(path)
impl.close()
}
}

private def add(file: File, create: => DB): DB = {
lock.writeLock().lock()
try {
map.getOrElseUpdate(file, RegisteredDB(create, file))
} finally {
lock.writeLock().unlock()
}
}

private def remove(path: File): Option[RegisteredDB] = {
lock.writeLock().lock()
try {
map.remove(path)
} finally {
lock.writeLock().unlock()
def releaseSnapshot(snapshot: Snapshot): Unit = impl.releaseSnapshot(snapshot)
def close(): Unit = {
lock.writeLock().lock()
try {
map.remove(path)
impl.close()
open.set(false)
} finally {
lock.writeLock().unlock()
}
}
}

def open(path: File, options: Options): DB = {
private val normalOptions: Options = new Options()
.setCreateIfMissing(true)
.setWriteBufferSize(32 * SizeUnit.MB)
.setAllowMmapReads(true)
.setIncreaseParallelism(4)
.setCompressionType(CompressionType.LZ4_COMPRESSION)
.setCompactionStyle(CompactionStyle.LEVEL)

private val testOptions: Options = new Options()
.setCreateIfMissing(true)
.setWriteBufferSize(64 * SizeUnit.KB)
.setManifestPreallocationSize(32 * SizeUnit.KB)
.setCompressionType(CompressionType.LZ4_COMPRESSION)
.setCompactionStyle(CompactionStyle.LEVEL)

def open(path: File): RegisteredDB = {
lock.writeLock().lock()
try {
add(path, factory.open(path, options))
path.mkdirs()
val options = if(System.getProperty("env") == "test") testOptions else normalOptions
map.getOrElseUpdate(path, RegisteredDB(RocksDB.open(options, path.toString), path))
} catch {
case x: Throwable =>
log.error(s"Failed to initialize storage: $x. Please check that directory $path exists and is not used by some other active node")
Expand All @@ -97,80 +86,4 @@ case class StoreRegistry(factory: DBFactory) extends DBFactory with ScorexLoggin
}
}

def destroy(path: File, options: Options): Unit = {
factory.destroy(path, options)
}

def repair(path: File, options: Options): Unit = {
factory.repair(path, options)
}
}

object LDBFactory extends ScorexLogging {

private val nativeFactory = "org.fusesource.leveldbjni.JniDBFactory"
private val javaFactory = "org.iq80.leveldb.impl.Iq80DBFactory"
private val memoryPoolSize = 512 * 1024

def setLevelDBParams(factory: DBFactory): AnyRef = {
val pushMemoryPool = factory.getClass.getDeclaredMethod("pushMemoryPool", classOf[Int])
pushMemoryPool.invoke(null, Integer.valueOf(memoryPoolSize))
}

def createKvDb(path: String): LDBKVStore = {
val dir = new File(path)
dir.mkdirs()
val options = new Options()
options.createIfMissing(true)
try {
val db = factory.open(dir, options)
new LDBKVStore(db)
} catch {
case x: Throwable =>
log.error(s"Failed to initialize storage: $x. Please check that directory $path could be accessed " +
s"and is not used by some other active node")
java.lang.System.exit(2)
null
}
}


lazy val factory: DBFactory = {
val loaders = List(ClassLoader.getSystemClassLoader, this.getClass.getClassLoader)

// As LevelDB-JNI has problems on Mac (see https://github.com/ergoplatform/ergo/issues/1067),
// we are using only pure-Java LevelDB on Mac
val isMac = System.getProperty("os.name").toLowerCase().indexOf("mac") >= 0
val factories = if(isMac) {
List(javaFactory)
} else {
List(nativeFactory, javaFactory)
}

val pairs = loaders.view
.zip(factories)
.flatMap { case (loader, factoryName) =>
loadFactory(loader, factoryName).map(factoryName -> _)
}

val (name, factory) = pairs.headOption.getOrElse {
throw new RuntimeException(s"Could not load any of the factory classes: $factories")
}

if (name == javaFactory) {
log.warn("Using the pure java LevelDB implementation which is still experimental")
} else {
log.info(s"Loaded $name with $factory")
setLevelDBParams(factory)
}
StoreRegistry(factory)
}

private def loadFactory(loader: ClassLoader, factoryName: String): Option[DBFactory] =
try Some(loader.loadClass(factoryName).getConstructor().newInstance().asInstanceOf[DBFactory])
catch {
case e: Throwable =>
log.warn(s"Failed to load database factory $factoryName due to: $e")
None
}
}
Loading
Loading