Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use OptimisticTransactionDb #543

Merged
merged 1 commit into from
Mar 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions common/db/src/rocks.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,51 @@
use std::sync::Arc;

use rocksdb::{
DBCompressionType, ThreadMode, SingleThreaded, LogLevel, WriteOptions, Transaction, Options,
TransactionDB,
DBCompressionType, ThreadMode, SingleThreaded, LogLevel, WriteOptions,
Transaction as RocksTransaction, Options, OptimisticTransactionDB,
};

use crate::*;

impl<T: ThreadMode> Get for Transaction<'_, TransactionDB<T>> {
pub struct Transaction<'a, T: ThreadMode>(
RocksTransaction<'a, OptimisticTransactionDB<T>>,
&'a OptimisticTransactionDB<T>,
);

impl<T: ThreadMode> Get for Transaction<'_, T> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
self.get(key).expect("couldn't read from RocksDB via transaction")
self.0.get(key).expect("couldn't read from RocksDB via transaction")
}
}
impl<T: ThreadMode> DbTxn for Transaction<'_, TransactionDB<T>> {
impl<T: ThreadMode> DbTxn for Transaction<'_, T> {
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
Transaction::put(self, key, value).expect("couldn't write to RocksDB via transaction")
self.0.put(key, value).expect("couldn't write to RocksDB via transaction")
}
fn del(&mut self, key: impl AsRef<[u8]>) {
self.delete(key).expect("couldn't delete from RocksDB via transaction")
self.0.delete(key).expect("couldn't delete from RocksDB via transaction")
}
fn commit(self) {
Transaction::commit(self).expect("couldn't commit to RocksDB via transaction")
self.0.commit().expect("couldn't commit to RocksDB via transaction");
self.1.flush_wal(true).expect("couldn't flush RocksDB WAL");
self.1.flush().expect("couldn't flush RocksDB");
}
}

impl<T: ThreadMode> Get for Arc<TransactionDB<T>> {
impl<T: ThreadMode> Get for Arc<OptimisticTransactionDB<T>> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
TransactionDB::get(self, key).expect("couldn't read from RocksDB")
OptimisticTransactionDB::get(self, key).expect("couldn't read from RocksDB")
}
}
impl<T: ThreadMode + 'static> Db for Arc<TransactionDB<T>> {
type Transaction<'a> = Transaction<'a, TransactionDB<T>>;
impl<T: Send + ThreadMode + 'static> Db for Arc<OptimisticTransactionDB<T>> {
type Transaction<'a> = Transaction<'a, T>;
fn txn(&mut self) -> Self::Transaction<'_> {
let mut opts = WriteOptions::default();
opts.set_sync(true);
self.transaction_opt(&opts, &Default::default())
Transaction(self.transaction_opt(&opts, &Default::default()), &**self)
}
}

pub type RocksDB = Arc<TransactionDB<SingleThreaded>>;
pub type RocksDB = Arc<OptimisticTransactionDB<SingleThreaded>>;
pub fn new_rocksdb(path: &str) -> RocksDB {
let mut options = Options::default();
options.create_if_missing(true);
Expand All @@ -54,5 +61,5 @@ pub fn new_rocksdb(path: &str) -> RocksDB {
options.set_max_log_file_size(1024 * 1024);
options.set_recycle_log_file_num(1);

Arc::new(TransactionDB::open(&options, &Default::default(), path).unwrap())
Arc::new(OptimisticTransactionDB::open(&options, path).unwrap())
}
Loading