From ab1b509922f60df6c66724391b6703dca77b0657 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Wed, 5 Jul 2023 16:49:54 +0800 Subject: [PATCH] fix: compaction support pick by max_seq (#1041) ## Rationale Part of #987. Current implementation will compact by file size, max_seq is not considered, this may cause data corruption in corner case, eg: - sst1, max_seq:10, PK1=10 - sst2, max_seq:11, PK1=9 - sst3, max_seq:12, no PK1 If compact pick sst1 and sst3, and output sst4, its max_seq will be 12, now PK1 exists in two files: - sst2, max_seq:11, PK1=9 - sst4, max_seq:12, PK1=10 That's to say, PK1's value is 10 now, which is wrong value(9 is right). ## Detailed Changes When do compaction, first sort sst by max_seq desc, then only pick adjacent ssts, the original issue is fixed in this way. At the same time picked ssts are ensured to meet other requirements such as `min_threshold`, `max_threshold`, `max_input_size`. ## Test Plan UT and manually. --- analytic_engine/src/compaction/picker.rs | 330 ++++++++++++++++++----- 1 file changed, 259 insertions(+), 71 deletions(-) diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index e104aca7d2..f2f38f8261 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -3,7 +3,7 @@ //! Compaction picker. use std::{ - collections::{BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap}, sync::Arc, time::Duration, }; @@ -176,10 +176,33 @@ fn trim_to_threshold( .collect() } +// TODO: Remove this function when pick_by_seq is stable. +fn prefer_pick_by_seq() -> bool { + std::env::var("CERESDB_COMPACT_PICK_BY_SEQ").unwrap_or_else(|_| "true".to_string()) == "true" +} + /// Size tiered compaction strategy -/// See https://github.com/jeffjirsa/twcs/blob/master/src/main/java/com/jeffjirsa/cassandra/db/compaction/SizeTieredCompactionStrategy.java -#[derive(Default)] -pub struct SizeTieredPicker {} +/// +/// Origin solution[1] will only consider file size, but this will cause data +/// corrupt, see https://github.com/CeresDB/ceresdb/pull/1041 +/// +/// So we could only compact files with adjacent seq, or ssts without +/// overlapping key range among them. Currently solution is relative simple, +/// only pick adjacent sst. Maybe a better, but more complex solution could be +/// introduced later. +/// +/// [1]: https://github.com/jeffjirsa/twcs/blob/master/src/main/java/com/jeffjirsa/cassandra/db/compaction/SizeTieredCompactionStrategy.java +pub struct SizeTieredPicker { + pick_by_seq: bool, +} + +impl Default for SizeTieredPicker { + fn default() -> Self { + Self { + pick_by_seq: prefer_pick_by_seq(), + } + } +} /// Similar size files group #[derive(Debug, Clone)] @@ -241,44 +264,15 @@ impl LevelPicker for SizeTieredPicker { return None; } - let all_segments: BTreeSet<_> = files_by_segment.keys().collect(); let opts = ctx.size_tiered_opts(); - // Iterate the segment in reverse order, so newest segment is examined first. - for (idx, segment_key) in all_segments.iter().rev().enumerate() { - // segment_key should always exist. - if let Some(segment) = files_by_segment.get(segment_key) { - let buckets = Self::get_buckets( - segment.to_vec(), - opts.bucket_high, - opts.bucket_low, - opts.min_sstable_size.as_byte() as f32, - ); - - let files = Self::most_interesting_bucket( - buckets, - opts.min_threshold, - opts.max_threshold, - opts.max_input_sstable_size.as_byte(), - ); - - if files.is_some() { - info!( - "Compact segment, idx: {}, size:{}, segment_key:{:?}, files:{:?}", - idx, - segment.len(), - segment_key, - segment - ); - return files; - } - debug!( - "No compaction necessary for segment, size:{}, segment_key:{:?}, idx:{}", - segment.len(), - segment_key, - idx - ); + for (idx, (segment_key, segment)) in files_by_segment.iter().rev().enumerate() { + let files = self.pick_ssts(segment.to_vec(), &opts); + if files.is_some() { + info!("Compact segment, idx:{idx}, segment_key:{segment_key:?}, files:{segment:?}"); + return files; } + debug!("No compaction necessary for segment, idx:{idx}, segment_key:{segment_key:?}"); } None @@ -286,6 +280,71 @@ impl LevelPicker for SizeTieredPicker { } impl SizeTieredPicker { + fn pick_ssts( + &self, + files: Vec, + opts: &SizeTieredCompactionOptions, + ) -> Option> { + if self.pick_by_seq { + return Self::pick_by_seq( + files, + opts.min_threshold, + opts.max_threshold, + opts.max_input_sstable_size.as_byte(), + ); + } + + Self::pick_by_size(files, opts) + } + + fn pick_by_seq( + mut files: Vec, + min_threshold: usize, + max_threshold: usize, + max_input_sstable_size: u64, + ) -> Option> { + // Sort files by max_seq desc. + files.sort_unstable_by_key(|b| std::cmp::Reverse(b.max_sequence())); + + 'outer: for start in 0..files.len() { + // Try max_threshold first, since we hope to compact as many small files as we + // can. + for step in (min_threshold..=max_threshold).rev() { + let end = (start + step).min(files.len()); + if end - start < min_threshold { + // too little files, switch to next loop and find again. + continue 'outer; + } + + let curr_size: u64 = files[start..end].iter().map(|f| f.size()).sum(); + if curr_size <= max_input_sstable_size { + return Some(files[start..end].to_vec()); + } + } + } + + None + } + + fn pick_by_size( + files: Vec, + opts: &SizeTieredCompactionOptions, + ) -> Option> { + let buckets = Self::get_buckets( + files, + opts.bucket_high, + opts.bucket_low, + opts.min_sstable_size.as_byte() as f32, + ); + + Self::most_interesting_bucket( + buckets, + opts.min_threshold, + opts.max_threshold, + opts.max_input_sstable_size.as_byte(), + ) + } + /// Group files of similar size into buckets. fn get_buckets( mut files: Vec, @@ -378,8 +437,8 @@ impl SizeTieredPicker { level: Level, segment_duration: Duration, expire_time: Option, - ) -> HashMap> { - let mut files_by_segment = HashMap::new(); + ) -> BTreeMap> { + let mut files_by_segment = BTreeMap::new(); let uncompact_files = find_uncompact_files(levels_controller, level, expire_time); for file in uncompact_files { // We use the end time of the range to calculate segment. @@ -420,8 +479,17 @@ impl SizeTieredPicker { /// Time window compaction strategy /// See https://github.com/jeffjirsa/twcs/blob/master/src/main/java/com/jeffjirsa/cassandra/db/compaction/TimeWindowCompactionStrategy.java -#[derive(Default)] -pub struct TimeWindowPicker {} +pub struct TimeWindowPicker { + pick_by_seq: bool, +} + +impl Default for TimeWindowPicker { + fn default() -> Self { + Self { + pick_by_seq: prefer_pick_by_seq(), + } + } +} impl TimeWindowPicker { fn get_window_bounds_in_millis(window: &Duration, ts: i64) -> (i64, i64) { @@ -479,6 +547,7 @@ impl TimeWindowPicker { } fn newest_bucket( + &self, buckets: HashMap>, size_tiered_opts: SizeTieredCompactionOptions, now: i64, @@ -492,40 +561,21 @@ impl TimeWindowPicker { // First compact latest buckets for key in all_keys.into_iter().rev() { if let Some(bucket) = buckets.get(key) { - debug!("Key {}, now {}", key, now); + debug!("Newest bucket loop, key:{key}, now:{now}"); - let max_input_sstable_size = size_tiered_opts.max_input_sstable_size.as_byte(); if bucket.len() >= size_tiered_opts.min_threshold && *key >= now { // If we're in the newest bucket, we'll use STCS to prioritize sstables - let buckets = SizeTieredPicker::get_buckets( - bucket.to_vec(), - size_tiered_opts.bucket_high, - size_tiered_opts.bucket_low, - size_tiered_opts.min_sstable_size.as_byte() as f32, - ); - let files = SizeTieredPicker::most_interesting_bucket( - buckets, - size_tiered_opts.min_threshold, - size_tiered_opts.max_threshold, - max_input_sstable_size, - ); + let size_picker = SizeTieredPicker::default(); + let files = size_picker.pick_ssts(bucket.to_vec(), &size_tiered_opts); if files.is_some() { return files; } } else if bucket.len() >= 2 && *key < now { debug!("Bucket size {} >= 2 and not in current bucket, compacting what's here: {:?}", bucket.len(), bucket); - // Sort by sstable file size - let mut sorted_files = bucket.to_vec(); - sorted_files.sort_unstable_by_key(FileHandle::size); - let candidate_files = trim_to_threshold( - sorted_files, - size_tiered_opts.max_threshold, - max_input_sstable_size, - ); - // At least 2 sst for compaction - if candidate_files.len() > 1 { - return Some(candidate_files); + let files = self.pick_sst_for_old_bucket(bucket.to_vec(), &size_tiered_opts); + if files.is_some() { + return files; } } else { debug!( @@ -541,6 +591,34 @@ impl TimeWindowPicker { None } + fn pick_sst_for_old_bucket( + &self, + mut files: Vec, + size_tiered_opts: &SizeTieredCompactionOptions, + ) -> Option> { + let max_input_size = size_tiered_opts.max_input_sstable_size.as_byte(); + // For old bucket, sst is likely already compacted, so min_thresold is not very + // strict, and greedy as `size_tiered_opts`. + let min_threshold = 2; + if self.pick_by_seq { + return SizeTieredPicker::pick_by_seq( + files, + min_threshold, + size_tiered_opts.max_threshold, + max_input_size, + ); + } + + files.sort_unstable_by_key(FileHandle::size); + let candidate_files = + trim_to_threshold(files, size_tiered_opts.max_threshold, max_input_size); + if candidate_files.len() >= min_threshold { + return Some(candidate_files); + } + + None + } + /// Get current window timestamp, the caller MUST ensure the level has ssts, /// panic otherwise. fn get_current_window( @@ -597,7 +675,7 @@ impl LevelPicker for TimeWindowPicker { ); assert!(now >= max_bucket_ts); - Self::newest_bucket(buckets, opts.size_tiered, now) + self.newest_bucket(buckets, opts.size_tiered, now) } } @@ -789,6 +867,26 @@ mod tests { .collect() } + fn build_file_handles_seq(sizes: Vec<(u64, u64)>) -> Vec { + let (tx, _rx) = mpsc::unbounded_channel(); + + sizes + .into_iter() + .map(|(size, max_seq)| { + let file_meta = FileMeta { + size, + time_range: TimeRange::new_unchecked_for_test(0, 1), + id: 1, + row_num: 0, + max_seq, + storage_format: StorageFormat::default(), + }; + let queue = FilePurgeQueue::new(1, 1.into(), tx.clone()); + FileHandle::new(file_meta, queue) + }) + .collect() + } + #[test] fn test_size_tiered_picker() { let time_range = TimeRange::empty(); @@ -844,6 +942,7 @@ mod tests { #[test] fn test_time_window_newest_bucket() { let size_tiered_opts = SizeTieredCompactionOptions::default(); + let tw_picker = TimeWindowPicker { pick_by_seq: false }; // old bucket have enough sst for compaction { let old_bucket = build_file_handles(vec![ @@ -857,7 +956,9 @@ mod tests { ]); let buckets = hash_map! { 100 => old_bucket, 200 => new_bucket }; - let bucket = TimeWindowPicker::newest_bucket(buckets, size_tiered_opts, 200).unwrap(); + let bucket = tw_picker + .newest_bucket(buckets, size_tiered_opts, 200) + .unwrap(); assert_eq!( vec![100, 101, 102], bucket.into_iter().map(|f| f.size()).collect::>() @@ -874,8 +975,95 @@ mod tests { ]); let buckets = hash_map! { 100 => old_bucket, 200 => new_bucket }; - let bucket = TimeWindowPicker::newest_bucket(buckets, size_tiered_opts, 200); + let bucket = tw_picker.newest_bucket(buckets, size_tiered_opts, 200); + assert_eq!(None, bucket); + } + } + + #[test] + fn test_time_window_newest_bucket_for_seq() { + let size_tiered_opts = SizeTieredCompactionOptions::default(); + let tw_picker = TimeWindowPicker { pick_by_seq: true }; + // old bucket have enough sst for compaction + { + let old_bucket = build_file_handles(vec![ + (102, TimeRange::new_unchecked_for_test(100, 200)), + (100, TimeRange::new_unchecked_for_test(100, 200)), + (101, TimeRange::new_unchecked_for_test(100, 200)), + ]); + let new_bucket = build_file_handles(vec![ + (200, TimeRange::new_unchecked_for_test(200, 300)), + (201, TimeRange::new_unchecked_for_test(200, 300)), + ]); + + let buckets = hash_map! { 100 => old_bucket, 200 => new_bucket }; + let bucket = tw_picker + .newest_bucket(buckets, size_tiered_opts, 200) + .unwrap(); + assert_eq!( + vec![102, 100, 101], + bucket.into_iter().map(|f| f.size()).collect::>() + ); + } + + // old bucket have only 1 sst, which is not enough for compaction + { + let old_bucket = + build_file_handles(vec![(100, TimeRange::new_unchecked_for_test(100, 200))]); + let new_bucket = build_file_handles(vec![ + (200, TimeRange::new_unchecked_for_test(200, 300)), + (201, TimeRange::new_unchecked_for_test(200, 300)), + ]); + + let buckets = hash_map! { 100 => old_bucket, 200 => new_bucket }; + let bucket = tw_picker.newest_bucket(buckets, size_tiered_opts, 200); assert_eq!(None, bucket); } } + + #[test] + fn test_size_pick_by_max_seq() { + let input_files = build_file_handles_seq(vec![ + // size, seq + (20, 10), + (10, 20), + (201, 25), + (100, 30), + (100, 40), + (100, 50), + ]); + + assert_eq!( + vec![50, 40, 30], + SizeTieredPicker::pick_by_seq(input_files.clone(), 2, 5, 300) + .unwrap() + .iter() + .map(|f| f.max_sequence()) + .collect::>() + ); + assert_eq!( + vec![50, 40, 30], + SizeTieredPicker::pick_by_seq(input_files.clone(), 2, 5, 500) + .unwrap() + .iter() + .map(|f| f.max_sequence()) + .collect::>() + ); + assert_eq!( + vec![50, 40, 30, 25], + SizeTieredPicker::pick_by_seq(input_files.clone(), 2, 5, 501) + .unwrap() + .iter() + .map(|f| f.max_sequence()) + .collect::>() + ); + assert_eq!( + vec![20, 10], + SizeTieredPicker::pick_by_seq(input_files, 2, 5, 30) + .unwrap() + .iter() + .map(|f| f.max_sequence()) + .collect::>() + ); + } }