From 6324394ce7b463241191b013931bb13b1a64642e Mon Sep 17 00:00:00 2001 From: Qingsong Chen Date: Thu, 27 Jun 2024 02:07:19 +0000 Subject: [PATCH] Provide a default compaction policy for journal --- core/src/layers/2-edit/journal.rs | 203 ++++++++++++++++++++++++------ core/src/layers/2-edit/mod.rs | 4 +- core/src/layers/3-log/tx_log.rs | 26 ++-- 3 files changed, 177 insertions(+), 56 deletions(-) diff --git a/core/src/layers/2-edit/journal.rs b/core/src/layers/2-edit/journal.rs index b14bd1e..94aaf25 100644 --- a/core/src/layers/2-edit/journal.rs +++ b/core/src/layers/2-edit/journal.rs @@ -93,7 +93,7 @@ where disk: D, init_state: S, state_max_nbytes: usize, - compaction_policy: P, + mut compaction_policy: P, ) -> Result> { // Create `SnapshotManager` to persist the init state. let snapshots = SnapshotManager::create(&disk, &init_state, state_max_nbytes)?; @@ -111,6 +111,7 @@ where let mut write_buf = WriteBuf::new(CryptoChain::>::AVAIL_BLOCK_SIZE); write_buf.write(&Record::Version(mac))?; journal_chain.append(write_buf.as_slice())?; + compaction_policy.on_append_journal(1); write_buf.clear(); journal_chain.flush()?; @@ -233,37 +234,37 @@ where // TODO: sync disk first to ensure data are persisted before // journal records. + self.append_write_buf_to_journal(); + + let is_second_try_success = self.write_buf.write(record).is_ok(); + if is_second_try_success == false { + panic!("the write buffer must have enough free space"); + } + } + + fn append_write_buf_to_journal(&mut self) { let write_data = self.write_buf.as_slice(); + if write_data.len() == 0 { + return; + } + self.journal_chain .append(write_data) // TODO: how to handle I/O error in journaling? .expect("we cannot handle I/O error in journaling gracefully"); + self.compaction_policy.on_append_journal(1); self.write_buf.clear(); if self.compaction_policy.should_compact() { - if self.compact().is_err() { - // TODO: how to handle a compaction failure? - panic!("the journal chain compact failed"); - } - self.compaction_policy.done_compact(); - } - - let is_second_try_success = self.write_buf.write(record).is_ok(); - if is_second_try_success == false { - panic!("the write buffer must have enough free space"); + // TODO: how to handle a compaction failure? + let compacted_blocks = self.compact().expect("journal chain compaction failed"); + self.compaction_policy.done_compact(compacted_blocks); } } /// Ensure that all committed edits are persisted to disk. pub fn flush(&mut self) -> Result<()> { - let buf = self.write_buf.as_slice(); - if buf.len() != 0 { - self.journal_chain - .append(buf) - // TODO: how to handle I/O error when append the journal_chain. - .expect("journal_chain append failed"); - self.write_buf.clear(); - } + self.append_write_buf_to_journal(); self.journal_chain.flush() } @@ -272,9 +273,9 @@ where self.curr_edit_group.as_mut().map(|edits| edits.clear()); } - fn compact(&mut self) -> Result<()> { + fn compact(&mut self) -> Result { if self.journal_chain.block_range().is_empty() { - return Ok(()); + return Ok(0); } // Persist current state to latest snapshot. @@ -285,15 +286,19 @@ where // Persist the MAC of latest_snapshot. let mac = self.snapshots.latest_mac(); self.write_buf.write(&Record::Version(mac))?; - self.flush()?; + self.journal_chain.append(self.write_buf.as_slice())?; + self.compaction_policy.on_append_journal(1); + self.write_buf.clear(); // The latest_snapshot has been persisted, now trim the journal_chain. // And ensure that there is at least one valid block after trimming. - if self.journal_chain.block_range().len() > 1 { + let old_chain_len = self.journal_chain.block_range().len(); + if old_chain_len > 1 { self.journal_chain .trim(self.journal_chain.block_range().end - 1); } - Ok(()) + let new_chain_len = self.journal_chain.block_range().len(); + Ok(old_chain_len - new_chain_len) } } @@ -708,11 +713,20 @@ pub trait CompactPolicy, S> { /// decide that now is the time to compact. fn on_commit_edits(&mut self, edits: &EditGroup); + /// Called when some edits are appended to `CryptoChain`. + /// + /// The `appended_blocks` indicates how many blocks of journal area are + /// occupied by those edits. + fn on_append_journal(&mut self, appended_blocks: usize); + /// Returns whether now is a good timing for compaction. fn should_compact(&self) -> bool; /// Reset the state, as if no edits have ever been added. - fn done_compact(&mut self); + /// + /// The `compacted_blocks` indicates how many blocks are reclaimed during + /// this compaction. + fn done_compact(&mut self, compacted_blocks: usize); } /// A never-do-compaction policy. Mostly useful for testing. @@ -721,19 +735,75 @@ pub struct NeverCompactPolicy; impl, S> CompactPolicy for NeverCompactPolicy { fn on_commit_edits(&mut self, _edits: &EditGroup) {} + fn on_append_journal(&mut self, _appended_nblocks: usize) {} + fn should_compact(&self) -> bool { false } - fn done_compact(&mut self) {} + fn done_compact(&mut self, _compacted_blocks: usize) {} +} + +/// A compaction policy, triggered when there's no-space left for new edits. +pub struct DefaultCompactPolicy { + used_blocks: usize, + total_blocks: usize, +} + +impl DefaultCompactPolicy { + /// Constructs a `DefaultCompactPolicy`. + /// + /// It is initialized via the total number of blocks of `EditJournal` and state. + pub fn new(disk_nblocks: usize, state_max_nbytes: usize) -> Self { + // Calculate the blocks used by `Snapshot`s. + let snapshot_bytes = + CryptoBlob::::HEADER_NBYTES + state_max_nbytes + Snapshot::::meta_len(); + let snapshot_blocks = (snapshot_bytes + BLOCK_SIZE - 1) / BLOCK_SIZE; + debug_assert!( + snapshot_blocks * 2 < disk_nblocks, + "the number of blocks of journal area are too small" + ); + + Self { + used_blocks: 0, + total_blocks: disk_nblocks - snapshot_blocks * 2, + } + } + + /// Constructs a `DefaultCompactPolicy` from `EditJournalMeta`. + pub fn from_meta(meta: &EditJournalMeta) -> Self { + Self { + used_blocks: 0, + total_blocks: meta.journal_area_nblocks, + } + } +} + +impl, S> CompactPolicy for DefaultCompactPolicy { + fn on_commit_edits(&mut self, _edits: &EditGroup) {} + + fn on_append_journal(&mut self, nblocks: usize) { + self.used_blocks += nblocks; + } + + fn should_compact(&self) -> bool { + self.used_blocks >= self.total_blocks + } + + fn done_compact(&mut self, compacted_blocks: usize) { + debug_assert!(self.used_blocks >= compacted_blocks); + self.used_blocks -= compacted_blocks; + } } #[cfg(test)] mod tests { - use super::{CompactPolicy, Edit, EditGroup, EditJournal, Record, RecordSlice, WriteBuf}; + use super::{ + CompactPolicy, DefaultCompactPolicy, Edit, EditGroup, EditJournal, Record, RecordSlice, + WriteBuf, + }; use crate::layers::bio::{BlockSet, MemDisk, BLOCK_SIZE}; use crate::layers::crypto::Mac; - use crate::os::Mutex; use crate::prelude::*; use pod::Pod; use serde::{Deserialize, Serialize}; @@ -762,14 +832,14 @@ mod tests { /// The `EditJournal` must have enough space to persist the threshold /// of appended blocks, to avoid overlapping. struct ThresholdPolicy { - appended: Mutex, + appended: usize, threshold: usize, } impl ThresholdPolicy { pub fn new(threshold: usize) -> Self { Self { - appended: Mutex::new(0), + appended: 0, threshold, } } @@ -778,14 +848,16 @@ mod tests { impl CompactPolicy for ThresholdPolicy { fn on_commit_edits(&mut self, _edits: &EditGroup) {} + fn on_append_journal(&mut self, nblocks: usize) { + self.appended += nblocks; + } + fn should_compact(&self) -> bool { - let mut appended = self.appended.lock(); - *appended += 1; - *appended >= self.threshold + self.appended >= self.threshold } - fn done_compact(&mut self) { - *self.appended.lock() = 0; + fn done_compact(&mut self, _compacted_blocks: usize) { + self.appended = 0; } } @@ -901,6 +973,8 @@ mod tests { } }; + journal.flush().unwrap(); + let journal_disk = disk.subset(0..32).unwrap(); let threshold_policy = ThresholdPolicy::new(1_000); let recover = EditJournal::recover(journal_disk, &meta, threshold_policy).unwrap(); @@ -913,12 +987,10 @@ mod tests { println!("append times: {}", append_times); assert_eq!( recover.state().sum as usize, - (0 + 999) * 1000 / 2 * (append_times * 2) + (0 + 999) * 1000 / 2 * commit_times ); let compact_times = append_times / threshold; println!("compact times: {}", compact_times); - let block_range_len = append_times % threshold + 1; - assert_eq!(recover.journal_chain.block_range().len(), block_range_len); } #[test] @@ -937,4 +1009,59 @@ mod tests { // Compact many times. append_and_recover(5, 1000); } + + /// A test case for `DefaultCompactPolicy`. + /// + /// The `commit_times` is used to control the number of `EditGroup` committed. + fn default_compact_policy_when_commit(commit_times: usize) { + let disk = MemDisk::create(16).unwrap(); + + let journal_disk = disk.subset(0..12).unwrap(); + let state_max_nbytes = core::mem::size_of::() * 2; + let compact_policy = + DefaultCompactPolicy::new::(journal_disk.nblocks(), state_max_nbytes); + let mut journal: EditJournal = + EditJournal::format( + journal_disk, + XState { sum: 0 }, + state_max_nbytes, + compact_policy, + ) + .unwrap(); + let meta = journal.meta(); + assert_eq!(meta.snapshot_area_nblocks, 1); + assert_eq!(meta.journal_area_nblocks, 10); + { + println!("journaling started"); + // The `WriteBuf` could hold two `EditGroup` in this test. + for _ in 0..commit_times { + for x in 0..1000 { + let edit = XEdit { x }; + journal.add(edit); + } + journal.commit(); + println!("state: {}", journal.state().sum); + } + }; + + journal.flush().unwrap(); + + let journal_disk = disk.subset(0..12).unwrap(); + let compact_policy = DefaultCompactPolicy::from_meta(&meta); + let recover: EditJournal = + EditJournal::recover(journal_disk, &meta, compact_policy).unwrap(); + println!("recover state: {}", recover.state().sum); + assert_eq!( + recover.state().sum as usize, + (0 + 999) * 1000 / 2 * commit_times + ); + } + + #[test] + fn default_compact_policy() { + default_compact_policy_when_commit(0); + default_compact_policy_when_commit(10); + default_compact_policy_when_commit(100); + default_compact_policy_when_commit(1000); + } } diff --git a/core/src/layers/2-edit/mod.rs b/core/src/layers/2-edit/mod.rs index 23b12a0..8e127df 100644 --- a/core/src/layers/2-edit/mod.rs +++ b/core/src/layers/2-edit/mod.rs @@ -4,4 +4,6 @@ mod edit; mod journal; pub use self::edit::{Edit, EditGroup}; -pub use self::journal::{CompactPolicy, EditJournal, EditJournalMeta, NeverCompactPolicy}; +pub use self::journal::{ + CompactPolicy, DefaultCompactPolicy, EditJournal, EditJournalMeta, NeverCompactPolicy, +}; diff --git a/core/src/layers/3-log/tx_log.rs b/core/src/layers/3-log/tx_log.rs index ae3ce97..17b4e6d 100644 --- a/core/src/layers/3-log/tx_log.rs +++ b/core/src/layers/3-log/tx_log.rs @@ -131,11 +131,14 @@ impl TxLogStore { raw_log_store: RawLogStoreState::new(), tx_log_store: TxLogStoreState::new(), }; + let state_max_nbytes = 1048576; // TBD + let compaction_policy = + JournalCompactPolicy::new::(journal_area.nblocks(), state_max_nbytes); Arc::new(Mutex::new(Journal::format( journal_area, all_state, - 1048576, // TBD - JournalCompactPolicy {}, + state_max_nbytes, + compaction_policy, )?)) }; Self::register_commit_handler_for_journal(&journal, &tx_provider); @@ -219,10 +222,11 @@ impl TxLogStore { 1 + superblock.chunk_area_nblocks ..1 + superblock.chunk_area_nblocks + journal_area_meta.total_nblocks(), )?; + let compaction_policy = JournalCompactPolicy::from_meta(journal_area_meta); Arc::new(Mutex::new(Journal::recover( journal_area, &journal_area_meta, - JournalCompactPolicy {}, + compaction_policy, )?)) }; Self::register_commit_handler_for_journal(&journal, &tx_provider); @@ -1295,10 +1299,10 @@ impl TxData for OpenLogCache {} mod journaling { use super::*; - use crate::layers::edit::EditGroup; + use crate::layers::edit::DefaultCompactPolicy; pub type Journal = EditJournal; - pub type JournalCompactPolicy = NeverCompactPolicy; + pub type JournalCompactPolicy = DefaultCompactPolicy; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AllState { @@ -1353,18 +1357,6 @@ mod journaling { } } } - - pub struct NeverCompactPolicy; - - impl CompactPolicy for JournalCompactPolicy { - fn on_commit_edits(&mut self, _edits: &EditGroup) {} - - fn should_compact(&self) -> bool { - false - } - - fn done_compact(&mut self) {} - } } #[cfg(test)]