Skip to content

Commit

Permalink
Optimize handling BVT/BALs
Browse files Browse the repository at this point in the history
  • Loading branch information
lucassong-mh authored and tatetian committed Jun 19, 2024
1 parent 594f427 commit 5b7babe
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 12 deletions.
10 changes: 7 additions & 3 deletions core/src/layers/4-lsm/tx_lsm_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,6 @@ impl<K: RecordKey<K>, V: RecordValue, D: BlockSet + 'static> TreeInner<K, V, D>

self.memtable_manager.sync(master_sync_id);

self.tx_log_store.sync().unwrap();

// TODO: Error handling: try twice or ignore
self.master_sync_id.increment()?;
Ok(())
Expand Down Expand Up @@ -767,6 +765,12 @@ impl<K: RecordKey<K>, V: RecordValue, D: BlockSet + 'static> Debug for TreeInner
}
}

impl<K: RecordKey<K>, V: RecordValue, D: BlockSet + 'static> Debug for TxLsmTree<K, V, D> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.0)
}
}

impl LsmLevel {
const LEVEL0_RATIO: u16 = 4;
const LEVELI_RATIO: u16 = 10;
Expand Down Expand Up @@ -963,7 +967,7 @@ mod tests {

#[test]
fn tx_lsm_tree_fns() -> Result<()> {
let nblocks = 64 * 1024;
let nblocks = 102400;
let mem_disk = MemDisk::create(nblocks)?;
let tx_log_store = Arc::new(TxLogStore::format(mem_disk, Key::random())?);
let tx_lsm_tree: TxLsmTree<BlockId, Value, MemDisk> =
Expand Down
18 changes: 16 additions & 2 deletions core/src/layers/5-disk/block_alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::util::BitMap;

use core::mem::size_of;
use core::num::NonZeroUsize;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use pod::Pod;
use serde::{Deserialize, Serialize};

Expand All @@ -29,6 +29,7 @@ pub(super) struct AllocTable {
bitmap: Mutex<BitMap>,
next_avail: AtomicUsize,
nblocks: NonZeroUsize,
is_dirty: AtomicBool,
cvar: Condvar,
num_free: CvarMutex<usize>,
}
Expand Down Expand Up @@ -60,6 +61,7 @@ impl AllocTable {
bitmap: Mutex::new(BitMap::repeat(true, nblocks.get())),
next_avail: AtomicUsize::new(0),
nblocks,
is_dirty: AtomicBool::new(false),
cvar: Condvar::new(),
num_free: CvarMutex::new(nblocks.get()),
}
Expand Down Expand Up @@ -97,6 +99,9 @@ impl AllocTable {
debug_assert_eq!(hbas.len(), cnt);

*num_free -= cnt;
let _ = self
.is_dirty
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed);
Ok(hbas)
}

Expand Down Expand Up @@ -160,6 +165,7 @@ impl AllocTable {
bitmap: Mutex::new(bitmap),
next_avail: AtomicUsize::new(next_avail),
nblocks,
is_dirty: AtomicBool::new(false),
cvar: Condvar::new(),
num_free: CvarMutex::new(num_free),
});
Expand Down Expand Up @@ -203,6 +209,7 @@ impl AllocTable {
bitmap: Mutex::new(bitmap),
next_avail: AtomicUsize::new(next_avail),
nblocks,
is_dirty: AtomicBool::new(false),
cvar: Condvar::new(),
num_free: CvarMutex::new(num_free),
})
Expand All @@ -218,6 +225,10 @@ impl AllocTable {

/// Persist the block validity table to `BVT` log. GC all existed `BAL` logs.
pub fn do_compaction<D: BlockSet + 'static>(&self, store: &Arc<TxLogStore<D>>) -> Result<()> {
if !self.is_dirty.load(Ordering::Relaxed) {
return Ok(());
}

// Serialize the block validity table
let bitmap = self.bitmap.lock();
const BITMAP_MAX_SIZE: usize = 1792 * BLOCK_SIZE; // TBD
Expand Down Expand Up @@ -252,7 +263,10 @@ impl AllocTable {
tx.abort();
return_errno_with_msg!(TxAborted, "persist block validity table TX aborted");
}
tx.commit()
tx.commit()?;

self.is_dirty.store(false, Ordering::Relaxed);
Ok(())
}

/// Mark a specific slot deallocated.
Expand Down
24 changes: 17 additions & 7 deletions core/src/layers/5-disk/sworndisk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ impl<D: BlockSet + 'static> SwornDisk<D> {
/// Sync all cached data in the device to the storage medium for durability.
pub fn sync(&self) -> Result<()> {
let _wguard = self.inner.write_sync_region.write();
self.inner.sync()?;
// TODO: Error handling the sync operation
self.inner.sync().unwrap();

#[cfg(not(feature = "linux"))]
debug!("[SwornDisk] Sync completed");
trace!("[SwornDisk] Sync completed. {self:?}");
Ok(())
}

Expand Down Expand Up @@ -150,7 +151,7 @@ impl<D: BlockSet + 'static> SwornDisk<D> {
};

#[cfg(not(feature = "linux"))]
debug!("[SwornDisk] Created successfully!");
info!("[SwornDisk] Created successfully! {:?}", &new_self);
// XXX: Would `disk::drop()` bring unexpected behavior?
Ok(new_self)
}
Expand Down Expand Up @@ -203,7 +204,7 @@ impl<D: BlockSet + 'static> SwornDisk<D> {
};

#[cfg(not(feature = "linux"))]
debug!("[SwornDisk] Opened successfully!");
info!("[SwornDisk] Opened successfully! {:?}", &opened_self);
Ok(opened_self)
}

Expand Down Expand Up @@ -450,13 +451,13 @@ impl<D: BlockSet + 'static> DiskInner<D> {

self.logical_block_table.sync()?;

self.user_data_disk.flush()?;

// XXX: May impact performance when there comes frequent syncs
self.block_validity_table
.do_compaction(&self.tx_log_store)?;

Ok(())
self.tx_log_store.sync()?;

self.user_data_disk.flush()
}

/// Handle one block I/O request. Mark the request completed when finished,
Expand Down Expand Up @@ -524,6 +525,15 @@ impl<D: BlockSet> Drop for SwornDisk<D> {
}
}

impl<D: BlockSet + 'static> Debug for SwornDisk<D> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SwornDisk")
.field("user_data_nblocks", &self.inner.user_data_disk.nblocks())
.field("logical_block_table", &self.inner.logical_block_table)
.finish()
}
}

/// A wrapper for `[BufMut]` used in `readv()`.
struct BufMutVec<'a> {
bufs: &'a mut [BufMut<'a>],
Expand Down

0 comments on commit 5b7babe

Please sign in to comment.