Skip to content

Commit

Permalink
Expose and wrap the ParallelSorter
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerollmops committed Nov 1, 2023
1 parent eafb6ae commit 76b8a1f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ pub use self::reader::{PrefixIter, RangeIter, Reader, ReaderCursor, RevPrefixIte
#[cfg(feature = "tempfile")]
pub use self::sorter::TempFileChunk;
pub use self::sorter::{
ChunkCreator, CursorVec, DefaultChunkCreator, SortAlgorithm, Sorter, SorterBuilder,
ChunkCreator, CursorVec, DefaultChunkCreator, ParallelSorter, SortAlgorithm, Sorter,
SorterBuilder,
};
pub use self::writer::{Writer, WriterBuilder};

Expand Down
29 changes: 19 additions & 10 deletions src/sorter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl<MF, CC: ChunkCreator> SorterBuilder<MF, CC> {
CC::Chunk: Send + 'static,
{
match number.get() {
1 => ParallelSorter::Single(self.build()),
1 | 2 => ParallelSorter(ParallelSorterInner::Single(self.build())),
number => {
let (senders, receivers): (Vec<Sender<(usize, Vec<u8>)>>, Vec<_>) =
repeat_with(unbounded).take(number).unzip();
Expand All @@ -236,7 +236,11 @@ impl<MF, CC: ChunkCreator> SorterBuilder<MF, CC> {
}));
}

ParallelSorter::Multi { senders, handles, merge_function: self.merge }
ParallelSorter(ParallelSorterInner::Multi {
senders,
handles,
merge_function: self.merge,
})
}
}
}
Expand Down Expand Up @@ -712,8 +716,13 @@ where
}
}

// TODO Make this private by wrapping it
pub enum ParallelSorter<MF, U, CC: ChunkCreator = DefaultChunkCreator>
pub struct ParallelSorter<MF, U, CC: ChunkCreator = DefaultChunkCreator>(
ParallelSorterInner<MF, U, CC>,
)
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, U>;

enum ParallelSorterInner<MF, U, CC: ChunkCreator = DefaultChunkCreator>
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, U>,
{
Expand All @@ -740,9 +749,9 @@ where
{
let key = key.as_ref();
let val = val.as_ref();
match self {
ParallelSorter::Single(sorter) => sorter.insert(key, val),
ParallelSorter::Multi { senders, .. } => {
match &mut self.0 {
ParallelSorterInner::Single(sorter) => sorter.insert(key, val),
ParallelSorterInner::Multi { senders, .. } => {
let key_length = key.len();
let key_hash = compute_hash(key);

Expand All @@ -766,9 +775,9 @@ where

/// Consumes this [`Sorter`] and outputs a stream of the merged entries in key-order.
pub fn into_stream_merger_iter(self) -> Result<MergerIter<CC::Chunk, MF>, Error<U>> {
match self {
ParallelSorter::Single(sorter) => sorter.into_stream_merger_iter(),
ParallelSorter::Multi { senders, handles, merge_function } => {
match self.0 {
ParallelSorterInner::Single(sorter) => sorter.into_stream_merger_iter(),
ParallelSorterInner::Multi { senders, handles, merge_function } => {
drop(senders);

let mut sources = Vec::new();
Expand Down

0 comments on commit 76b8a1f

Please sign in to comment.