From ce29193197e20d37256f1ca3310af80a65062195 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 4 Jul 2023 15:18:08 +0200 Subject: [PATCH] Expose a new SorterBuilder::sort_in_parallel method --- .github/workflows/rust.yml | 4 ++++ Cargo.toml | 1 + src/lib.rs | 20 +++++++++++++++--- src/sorter.rs | 43 ++++++++++++++++++++++++++++++++++++-- 4 files changed, 63 insertions(+), 5 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index db13e9e..e1340c0 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -34,6 +34,10 @@ jobs: command: test args: --all-features + - uses: actions-rs/cargo@v1 + with: + command: test + lint: runs-on: ubuntu-latest steps: diff --git a/Cargo.toml b/Cargo.toml index 70a7678..3287d61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ bytemuck = { version = "1.7.0", features = ["derive"] } byteorder = "1.3.4" flate2 = { version = "1.0", optional = true } lz4_flex = { version = "0.9.2", optional = true } +rayon = { version = "1.7.0", optional = true } snap = { version = "1.0.5", optional = true } tempfile = { version = "3.2.0", optional = true } zstd = { version = "0.10.0", optional = true } diff --git a/src/lib.rs b/src/lib.rs index e38dd9b..56bf543 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,21 @@ //! The entries in the grenad files are _immutable_ and the only way to modify them is by _creating //! a new file_ with the changes. //! -//! # Example: Use the `Writer` and `Reader` structs +//! # Features +//! +//! You can define which compression schemes to support, there are currently a few +//! available choices, these determine which types will be available inside the above modules: +//! +//! - _Snappy_ with the [`snap`](https://crates.io/crates/snap) crate. +//! - _Zlib_ with the [`flate2`](https://crates.io/crates/flate2) crate. +//! - _Lz4_ with the [`lz4_flex`](https://crates.io/crates/lz4_flex) crate. +//! +//! If you need more performances you can enable the `rayon` feature that will enable a bunch +//! of new settings like being able to make the `Sorter` sort in parallel. +//! +//! # Examples +//! +//! ## Use the `Writer` and `Reader` structs //! //! You can use the [`Writer`] struct to store key-value pairs into the specified //! [`std::io::Write`] type. The [`Reader`] type can then be used to read the entries. @@ -37,7 +51,7 @@ //! # Ok(()) } //! ``` //! -//! # Example: Use the `Merger` struct +//! ## Use the `Merger` struct //! //! In this example we show how you can merge multiple [`Reader`]s //! by using a _merge function_ when a conflict is encountered. @@ -107,7 +121,7 @@ //! # Ok(()) } //! ``` //! -//! # Example: Use the `Sorter` struct +//! ## Use the `Sorter` struct //! //! In this example we show how by defining a _merge function_, we can insert //! multiple entries with the same key and output them in lexicographic order. diff --git a/src/sorter.rs b/src/sorter.rs index 0c7a52c..7753d62 100644 --- a/src/sorter.rs +++ b/src/sorter.rs @@ -45,6 +45,7 @@ pub struct SorterBuilder { index_levels: Option, chunk_creator: CC, sort_algorithm: SortAlgorithm, + sort_in_parallel: bool, merge: MF, } @@ -63,6 +64,7 @@ impl SorterBuilder { index_levels: None, chunk_creator: DefaultChunkCreator::default(), sort_algorithm: SortAlgorithm::Stable, + sort_in_parallel: false, merge, } } @@ -140,6 +142,15 @@ impl SorterBuilder { self } + /// Whether we use [rayon to sort](https://docs.rs/rayon/latest/rayon/slice/trait.ParallelSliceMut.html#method.par_sort_by_key) the entries. + /// + /// By default we do not sort in parallel, the value is `false`. + #[cfg(feature = "rayon")] + pub fn sort_in_parallel(&mut self, value: bool) -> &mut Self { + self.sort_in_parallel = value; + self + } + /// The [`ChunkCreator`] struct used to generate the chunks used /// by the [`Sorter`] to bufferize when required. pub fn chunk_creator(self, creation: CC2) -> SorterBuilder { @@ -154,6 +165,7 @@ impl SorterBuilder { index_levels: self.index_levels, chunk_creator: creation, sort_algorithm: self.sort_algorithm, + sort_in_parallel: self.sort_in_parallel, merge: self.merge, } } @@ -179,6 +191,7 @@ impl SorterBuilder { index_levels: self.index_levels, chunk_creator: self.chunk_creator, sort_algorithm: self.sort_algorithm, + sort_in_parallel: self.sort_in_parallel, merge: self.merge, } } @@ -279,6 +292,27 @@ impl Entries { sort(bounds, |b: &EntryBound| &tail[tail.len() - b.key_start..][..b.key_length as usize]); } + /// Sorts in **parallel** the entry bounds by the entries keys, + /// after a sort the `iter` method will yield the entries sorted. + #[cfg(feature = "rayon")] + pub fn par_sort_by_key(&mut self, algorithm: SortAlgorithm) { + use rayon::slice::ParallelSliceMut; + + let bounds_end = self.bounds_count * size_of::(); + let (bounds, tail) = self.buffer.split_at_mut(bounds_end); + let bounds = cast_slice_mut::<_, EntryBound>(bounds); + let sort = match algorithm { + SortAlgorithm::Stable => <[EntryBound]>::par_sort_by_key, + SortAlgorithm::Unstable => <[EntryBound]>::par_sort_unstable_by_key, + }; + sort(bounds, |b: &EntryBound| &tail[tail.len() - b.key_start..][..b.key_length as usize]); + } + + #[cfg(not(feature = "rayon"))] + pub fn par_sort_by_key(&mut self, algorithm: SortAlgorithm) { + self.sort_by_key(algorithm); + } + /// Returns an iterator over the keys and datas. pub fn iter(&self) -> impl Iterator + '_ { let bounds_end = self.bounds_count * size_of::(); @@ -397,6 +431,7 @@ pub struct Sorter { index_levels: Option, chunk_creator: CC, sort_algorithm: SortAlgorithm, + sort_in_parallel: bool, merge: MF, } @@ -487,7 +522,11 @@ where } let mut writer = writer_builder.build(count_write_chunk); - self.entries.sort_by_key(self.sort_algorithm); + if self.sort_in_parallel { + self.entries.par_sort_by_key(self.sort_algorithm); + } else { + self.entries.sort_by_key(self.sort_algorithm); + } let mut current = None; for (key, value) in self.entries.iter() { @@ -507,7 +546,7 @@ where if let Some((key, vals)) = current.take() { let merged_val = (self.merge)(key, &vals).map_err(Error::Merge)?; - writer.insert(&key, &merged_val)?; + writer.insert(key, &merged_val)?; } // We retrieve the wrapped CountWrite and extract