Skip to content

Commit

Permalink
feat: allow lazily chunking unsorted iteration (#55)
Browse files Browse the repository at this point in the history
* Move unsorted code to a new module, refactor, implement skipping.
* Improve reading tests by using two files to test set reading.
  • Loading branch information
macklin-10x authored Sep 6, 2024
1 parent e75937d commit 54f2453
Show file tree
Hide file tree
Showing 3 changed files with 404 additions and 166 deletions.
2 changes: 1 addition & 1 deletion benches/my_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn main() {

// Open finished file
let all_items = if unsorted_read {
UnsortedShardReader::<T1>::open(tmp.path())?.collect::<Result<_, _>>()?
UnsortedShardReader::<T1>::open(tmp.path()).collect::<Result<_, _>>()?
} else {
let reader = ShardReader::<T1>::open(tmp.path())?;

Expand Down
218 changes: 53 additions & 165 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
//! assert_eq!(all_items, all_items_sorted);
//!
//! // If you want to iterate through the items in unsorted order.
//! let unsorted_items: Vec<_> = UnsortedShardReader::<DataStruct>::open(filename)?.collect();
//! let unsorted_items: Vec<_> = UnsortedShardReader::<DataStruct>::open(filename).collect();
//! // You will get the items in the order they are written to disk.
//! assert_eq!(unsorted_items.len(), all_items.len());
//!
Expand All @@ -84,6 +84,7 @@

#![deny(warnings)]
#![deny(missing_docs)]
use std::any::type_name;
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::fs::File;
Expand All @@ -94,7 +95,6 @@ use std::os::unix::fs::FileExt;
use std::path::Path;
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use std::thread;
use std::{any::type_name, path::PathBuf};

use anyhow::{format_err, Error};
use bincode::{deserialize_from, serialize_into};
Expand All @@ -112,6 +112,9 @@ pub mod helper;
pub use crate::range::Range;
use range::Rorder;

mod unsorted;
pub use unsorted::*;

/// The size (in bytes) of a ShardIter object (mostly buffers)
// ? sizeof(T)
// + 8 usize items_remaining
Expand Down Expand Up @@ -1392,157 +1395,6 @@ where
}
}

#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd, Ord)]
/// A group of `len_items` items, from shard `shard`, stored at position `offset`, using `len_bytes` bytes on-disk.
/// Similar to ShardRecord, just that we don't store the key. Used for `UnsortedShardReader`
struct KeylessShardRecord {
offset: usize,
len_bytes: usize,
len_items: usize,
}

/// Read from a collection of shardio files in the order in which items are written without
/// considering the sort order.
///
/// Useful if you just want to iterate over all the items irrespective of the ordering.
///
#[allow(dead_code)]
pub struct UnsortedShardReader<T, S = DefaultSort>
where
S: SortKey<T>,
{
shard_files: Vec<PathBuf>,
// Which file among the shard_files are we reading from
active_file_num: usize,
// The index of the shard file we are reading from
active_file_index: Vec<KeylessShardRecord>,
// Which KeylessShardRecord among the active_file_index are we reading now
active_index_num: usize,
// How many items within a compressed block have we read so far
active_index_items_read: usize,
decoder: Option<lz4::Decoder<BufReader<ReadAdapter<File, File>>>>,
phantom: PhantomData<(T, S)>,
}

impl<T, S> UnsortedShardReader<T, S>
where
T: DeserializeOwned,
<S as SortKey<T>>::Key: Clone + Ord + DeserializeOwned,
S: SortKey<T>,
{
/// Open a single shard file
pub fn open<P: AsRef<Path>>(shard_file: P) -> Result<Self, Error> {
UnsortedShardReader::open_set(&[shard_file])
}

/// Open a set of shard files
pub fn open_set<P: AsRef<Path>>(shard_files: &[P]) -> Result<Self, Error> {
let shard_files: Vec<_> = shard_files.iter().map(|f| f.as_ref().into()).collect();

Ok(UnsortedShardReader {
shard_files,
active_file_num: 0,
active_file_index: Vec::new(),
active_index_num: 0,
active_index_items_read: 0,
decoder: None,
phantom: PhantomData,
})
}
}

impl<T, S> Iterator for UnsortedShardReader<T, S>
where
T: DeserializeOwned,
<S as SortKey<T>>::Key: Clone + Ord + DeserializeOwned,
S: SortKey<T>,
{
type Item = Result<T, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.active_file_num >= self.shard_files.len() {
// We are done going through all the files
return None;
}
if self.decoder.is_none() {
// Open the next file
self.active_index_num = 0;
self.active_index_items_read = 0;

let reader = match ShardReaderSingle::<T, S>::open(
&self.shard_files[self.active_file_num],
) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
};
self.active_file_index = reader
.index
.into_iter()
.map(|r| KeylessShardRecord {
offset: r.offset,
len_bytes: r.len_bytes,
len_items: r.len_items,
})
.collect();

let decoder = match self.active_file_index.first() {
Some(rec) => lz4::Decoder::new(BufReader::new(ReadAdapter::new(
reader.file,
rec.offset,
rec.len_bytes,
))),
None => {
// There are no chunks in this file
self.active_file_num += 1;
continue;
}
};
self.decoder = match decoder {
Ok(d) => Some(d),
Err(e) => return Some(Err(e.into())),
};
}
if self.active_index_items_read
>= self.active_file_index[self.active_index_num].len_items
{
// We are done with this chunk
self.active_index_num += 1;
self.active_index_items_read = 0;

if self.active_index_num >= self.active_file_index.len() {
// We are done with this file
self.decoder = None;
self.active_file_num += 1;
self.active_index_num = 0;
} else {
// Load up the decoder for the next chunk
let decoder = self.decoder.take().unwrap();
let (buf, _) = decoder.finish();
let file = buf.into_inner().file;
let rec = self.active_file_index[self.active_index_num];
let decoder = lz4::Decoder::new(BufReader::new(ReadAdapter::new(
file,
rec.offset,
rec.len_bytes,
)));
self.decoder = match decoder {
Ok(d) => Some(d),
Err(e) => return Some(Err(e.into())),
};
}
continue;
} else {
// Read the next item
self.active_index_items_read += 1;
match deserialize_from(self.decoder.as_mut().unwrap()) {
Ok(item) => return Some(Ok(item)),
Err(e) => return Some(Err(e.into())),
}
}
}
}
}

#[cfg(test)]
mod shard_tests {
use super::*;
Expand All @@ -1553,6 +1405,7 @@ mod shard_tests {
use std::fmt::Debug;
use std::hash::Hash;
use std::iter::{repeat, FromIterator};
use std::path::PathBuf;

#[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Debug, PartialOrd, Ord, Hash)]
struct T1 {
Expand Down Expand Up @@ -1835,7 +1688,7 @@ mod shard_tests {
}

let unsorted_items =
UnsortedShardReader::<T, S>::open_set(&files)?.collect::<Result<Vec<_>, _>>()?;
UnsortedShardReader::<T, S>::open_set(&files).collect::<Result<Vec<_>, _>>()?;
assert!(set_compare(&out_items, &unsorted_items));

Ok(out_items)
Expand Down Expand Up @@ -1938,10 +1791,12 @@ mod shard_tests {
disk_chunk_size, producer_chunk_size, n_items
);

let tmp = tempfile::NamedTempFile::new()?;
// Write two files to check file set reading logic.

// Write and close file
let true_items = {
let create = || -> Result<_, Error> {
let tmp = tempfile::NamedTempFile::new()?;

// Write and close file
let mut writer: ShardWriter<T1> = ShardWriter::new(
tmp.path(),
producer_chunk_size,
Expand All @@ -1955,12 +1810,20 @@ mod shard_tests {

writer.finish()?;
true_items.sort();
true_items
Ok((tmp, true_items))
};

let (tmp0, true_items0) = create()?;
let (tmp1, true_items1) = create()?;

let mut true_items = Vec::from_iter(true_items0.into_iter().chain(true_items1));
true_items.sort();

let file_set = [tmp0.path(), tmp1.path()];

if do_read {
// Open finished file
let reader = ShardReader::<T1>::open(tmp.path())?;
// Open finished files
let reader = ShardReader::<T1>::open_set(&file_set)?;
let iter = reader.iter_range(&Range::all())?;

let all_items_res: Result<Vec<_>, Error> = iter.collect();
Expand All @@ -1974,8 +1837,8 @@ mod shard_tests {
}

for rc in [1, 3, 8, 15, 27].iter() {
// Open finished file & test chunked reads
let set_reader = ShardReader::<T1>::open(tmp.path())?;
// Open finished files & test chunked reads
let set_reader = ShardReader::<T1>::open_set(&file_set)?;
let mut all_items_chunks = Vec::new();

// Read in chunks
Expand All @@ -2000,10 +1863,29 @@ mod shard_tests {
}

// Check the unsorted read
let unsorted_reader = UnsortedShardReader::<T1>::open(tmp.path())?;
assert_eq!(2 * n_items, UnsortedShardReader::<T1>::len(&file_set)?);
let unsorted_reader = UnsortedShardReader::<T1>::open_set(&file_set);
let all_items_res: Result<Vec<_>, Error> = unsorted_reader.collect();
let all_items = all_items_res?;
assert!(set_compare(&true_items, &all_items));

let check_unsorted_skip = |to_skip: usize| -> Result<(), Error> {
let mut unsorted_reader_skip = UnsortedShardReader::<T1>::open_set(&file_set);
let skipped = unsorted_reader_skip.skip_lazy(to_skip)?;
assert_eq!(to_skip, skipped);
let all_items_res_skip: Result<Vec<_>, Error> = unsorted_reader_skip.collect();
let all_items_skip = all_items_res_skip?;
assert_eq!(&all_items[to_skip..], &all_items_skip);
Ok(())
};

check_unsorted_skip(0)?;
check_unsorted_skip(1)?;
check_unsorted_skip(disk_chunk_size)?;
check_unsorted_skip((disk_chunk_size * 3) + 1)?;
check_unsorted_skip(n_items)?; // skip entire first file
check_unsorted_skip(n_items + 1)?; // skip entire first file plus next item
check_unsorted_skip(n_items * 2)?; // skip everything
}
Ok(())
}
Expand Down Expand Up @@ -2084,7 +1966,7 @@ mod shard_tests {
assert!(set_compare(&true_items, &all_items_chunks));

// Check the unsorted read
let unsorted_reader = UnsortedShardReader::<T1, FieldDSort>::open(tmp.path())?;
let unsorted_reader = UnsortedShardReader::<T1, FieldDSort>::open(tmp.path());
let all_items_res: Result<Vec<_>, Error> = unsorted_reader.collect();
let all_items = all_items_res?;
assert!(set_compare(&true_items, &all_items));
Expand Down Expand Up @@ -2262,7 +2144,13 @@ mod shard_tests {
#[test]
fn test_empty_open_set() {
let shard_files = Vec::<PathBuf>::new();
let reader = UnsortedShardReader::<u8>::open_set(&shard_files).unwrap();
let reader = UnsortedShardReader::<u8>::open_set(&shard_files);
assert_eq!(reader.count(), 0);

// Test that skipping an empty set works correctly.
let mut reader = UnsortedShardReader::<u8>::open_set(&shard_files);
let skipped = reader.skip_lazy(10).unwrap();
assert_eq!(0, skipped);
assert_eq!(reader.count(), 0);
}
}
Loading

0 comments on commit 54f2453

Please sign in to comment.