Skip to content

Commit

Permalink
Expose a new SorterBuilder::sort_in_parallel method
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerollmops committed Jul 4, 2023
1 parent 4ff6904 commit ce29193
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 5 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ jobs:
command: test
args: --all-features

- uses: actions-rs/cargo@v1
with:
command: test

lint:
runs-on: ubuntu-latest
steps:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bytemuck = { version = "1.7.0", features = ["derive"] }
byteorder = "1.3.4"
flate2 = { version = "1.0", optional = true }
lz4_flex = { version = "0.9.2", optional = true }
rayon = { version = "1.7.0", optional = true }
snap = { version = "1.0.5", optional = true }
tempfile = { version = "3.2.0", optional = true }
zstd = { version = "0.10.0", optional = true }
Expand Down
20 changes: 17 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,21 @@
//! The entries in the grenad files are _immutable_ and the only way to modify them is by _creating
//! a new file_ with the changes.
//!
//! # Example: Use the `Writer` and `Reader` structs
//! # Features
//!
//! You can define which compression schemes to support, there are currently a few
//! available choices, these determine which types will be available inside the above modules:
//!
//! - _Snappy_ with the [`snap`](https://crates.io/crates/snap) crate.
//! - _Zlib_ with the [`flate2`](https://crates.io/crates/flate2) crate.
//! - _Lz4_ with the [`lz4_flex`](https://crates.io/crates/lz4_flex) crate.
//!
//! If you need more performances you can enable the `rayon` feature that will enable a bunch
//! of new settings like being able to make the `Sorter` sort in parallel.
//!
//! # Examples
//!
//! ## Use the `Writer` and `Reader` structs
//!
//! You can use the [`Writer`] struct to store key-value pairs into the specified
//! [`std::io::Write`] type. The [`Reader`] type can then be used to read the entries.
Expand Down Expand Up @@ -37,7 +51,7 @@
//! # Ok(()) }
//! ```
//!
//! # Example: Use the `Merger` struct
//! ## Use the `Merger` struct
//!
//! In this example we show how you can merge multiple [`Reader`]s
//! by using a _merge function_ when a conflict is encountered.
Expand Down Expand Up @@ -107,7 +121,7 @@
//! # Ok(()) }
//! ```
//!
//! # Example: Use the `Sorter` struct
//! ## Use the `Sorter` struct
//!
//! In this example we show how by defining a _merge function_, we can insert
//! multiple entries with the same key and output them in lexicographic order.
Expand Down
43 changes: 41 additions & 2 deletions src/sorter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct SorterBuilder<MF, CC> {
index_levels: Option<u8>,
chunk_creator: CC,
sort_algorithm: SortAlgorithm,
sort_in_parallel: bool,
merge: MF,
}

Expand All @@ -63,6 +64,7 @@ impl<MF> SorterBuilder<MF, DefaultChunkCreator> {
index_levels: None,
chunk_creator: DefaultChunkCreator::default(),
sort_algorithm: SortAlgorithm::Stable,
sort_in_parallel: false,
merge,
}
}
Expand Down Expand Up @@ -140,6 +142,15 @@ impl<MF, CC> SorterBuilder<MF, CC> {
self
}

/// Whether we use [rayon to sort](https://docs.rs/rayon/latest/rayon/slice/trait.ParallelSliceMut.html#method.par_sort_by_key) the entries.
///
/// By default we do not sort in parallel, the value is `false`.
#[cfg(feature = "rayon")]
pub fn sort_in_parallel(&mut self, value: bool) -> &mut Self {
self.sort_in_parallel = value;
self
}

/// The [`ChunkCreator`] struct used to generate the chunks used
/// by the [`Sorter`] to bufferize when required.
pub fn chunk_creator<CC2>(self, creation: CC2) -> SorterBuilder<MF, CC2> {
Expand All @@ -154,6 +165,7 @@ impl<MF, CC> SorterBuilder<MF, CC> {
index_levels: self.index_levels,
chunk_creator: creation,
sort_algorithm: self.sort_algorithm,
sort_in_parallel: self.sort_in_parallel,
merge: self.merge,
}
}
Expand All @@ -179,6 +191,7 @@ impl<MF, CC: ChunkCreator> SorterBuilder<MF, CC> {
index_levels: self.index_levels,
chunk_creator: self.chunk_creator,
sort_algorithm: self.sort_algorithm,
sort_in_parallel: self.sort_in_parallel,
merge: self.merge,
}
}
Expand Down Expand Up @@ -279,6 +292,27 @@ impl Entries {
sort(bounds, |b: &EntryBound| &tail[tail.len() - b.key_start..][..b.key_length as usize]);
}

/// Sorts in **parallel** the entry bounds by the entries keys,
/// after a sort the `iter` method will yield the entries sorted.
#[cfg(feature = "rayon")]
pub fn par_sort_by_key(&mut self, algorithm: SortAlgorithm) {
use rayon::slice::ParallelSliceMut;

let bounds_end = self.bounds_count * size_of::<EntryBound>();
let (bounds, tail) = self.buffer.split_at_mut(bounds_end);
let bounds = cast_slice_mut::<_, EntryBound>(bounds);
let sort = match algorithm {
SortAlgorithm::Stable => <[EntryBound]>::par_sort_by_key,
SortAlgorithm::Unstable => <[EntryBound]>::par_sort_unstable_by_key,
};
sort(bounds, |b: &EntryBound| &tail[tail.len() - b.key_start..][..b.key_length as usize]);
}

#[cfg(not(feature = "rayon"))]
pub fn par_sort_by_key(&mut self, algorithm: SortAlgorithm) {
self.sort_by_key(algorithm);
}

/// Returns an iterator over the keys and datas.
pub fn iter(&self) -> impl Iterator<Item = (&[u8], &[u8])> + '_ {
let bounds_end = self.bounds_count * size_of::<EntryBound>();
Expand Down Expand Up @@ -397,6 +431,7 @@ pub struct Sorter<MF, CC: ChunkCreator = DefaultChunkCreator> {
index_levels: Option<u8>,
chunk_creator: CC,
sort_algorithm: SortAlgorithm,
sort_in_parallel: bool,
merge: MF,
}

Expand Down Expand Up @@ -487,7 +522,11 @@ where
}
let mut writer = writer_builder.build(count_write_chunk);

self.entries.sort_by_key(self.sort_algorithm);
if self.sort_in_parallel {
self.entries.par_sort_by_key(self.sort_algorithm);
} else {
self.entries.sort_by_key(self.sort_algorithm);
}

let mut current = None;
for (key, value) in self.entries.iter() {
Expand All @@ -507,7 +546,7 @@ where

if let Some((key, vals)) = current.take() {
let merged_val = (self.merge)(key, &vals).map_err(Error::Merge)?;
writer.insert(&key, &merged_val)?;
writer.insert(key, &merged_val)?;
}

// We retrieve the wrapped CountWrite and extract
Expand Down

0 comments on commit ce29193

Please sign in to comment.