Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ff #49

Merged
merged 68 commits into from
Sep 24, 2023
Merged

ff #49

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
bd538d4
fix bloom_filter_comparison
chris-ha458 Aug 19, 2023
d3ece54
shard.rs clippy
chris-ha458 Aug 19, 2023
7c58c46
singlechar deduper fix
chris-ha458 Aug 19, 2023
9774453
collapse if
chris-ha458 Aug 19, 2023
d8bc812
needless_returns
chris-ha458 Aug 19, 2023
64a7b7d
char, enumerator
chris-ha458 Aug 19, 2023
81b74f9
remove unnecessary comments
chris-ha458 Aug 19, 2023
dc128f9
Update shard.rs
chris-ha458 Aug 19, 2023
4446dbd
further simplify resp
chris-ha458 Aug 19, 2023
54b46ac
better vec init
chris-ha458 Aug 19, 2023
0285532
simpler string formatting
chris-ha458 Aug 19, 2023
e960516
unneeded {unwrap,Vec[]}
chris-ha458 Aug 19, 2023
58c21a7
formatting
chris-ha458 Aug 19, 2023
35d2470
single match fix
chris-ha458 Aug 19, 2023
5595fde
needless borrows
chris-ha458 Aug 19, 2023
c4173e5
expect is often unidiomatic
chris-ha458 Aug 19, 2023
9617b62
Update deduper.rs
chris-ha458 Aug 19, 2023
8350e39
Merge branch 'main' into rust_fixes
soldni Aug 22, 2023
c0ae05c
fix bloom_filter_comparison
chris-ha458 Aug 19, 2023
75e6731
shard.rs clippy
chris-ha458 Aug 19, 2023
107ccea
singlechar deduper fix
chris-ha458 Aug 19, 2023
16e2605
collapse if
chris-ha458 Aug 19, 2023
431093f
needless_returns
chris-ha458 Aug 19, 2023
f47cb2d
char, enumerator
chris-ha458 Aug 19, 2023
34ace67
remove unnecessary comments
chris-ha458 Aug 19, 2023
a294eee
Update shard.rs
chris-ha458 Aug 19, 2023
c52ed18
further simplify resp
chris-ha458 Aug 19, 2023
0ebd516
better vec init
chris-ha458 Aug 19, 2023
ced2969
simpler string formatting
chris-ha458 Aug 19, 2023
a770be3
unneeded {unwrap,Vec[]}
chris-ha458 Aug 19, 2023
55331b9
formatting
chris-ha458 Aug 19, 2023
39af44a
single match fix
chris-ha458 Aug 19, 2023
92670ef
needless borrows
chris-ha458 Aug 19, 2023
867d5b9
expect is often unidiomatic
chris-ha458 Aug 19, 2023
ae0ab28
Update deduper.rs
chris-ha458 Aug 19, 2023
18978b7
Merge branch 'rust_fixes' of https://github.com/chris-ha458/dolma int…
chris-ha458 Aug 22, 2023
9bbe3fc
Merge branch 'main' into rust_fixes
soldni Aug 22, 2023
a7fd4ec
retrieve index from enumerate()
chris-ha458 Aug 22, 2023
92fe888
fix last of clippy
chris-ha458 Aug 22, 2023
7ea0d2e
Update mixer.rs
chris-ha458 Aug 23, 2023
23a2657
simplify match
chris-ha458 Aug 23, 2023
3ffa2e9
Update lib.rs
chris-ha458 Aug 23, 2023
cc2ad29
simplify mixer
chris-ha458 Aug 23, 2023
14da71d
idiomatic Vec construction
chris-ha458 Aug 24, 2023
2bfc4a8
most atomic initialization
chris-ha458 Aug 24, 2023
b3ba6c9
change initialization for read
chris-ha458 Aug 24, 2023
debc75f
better init for dedupe_key
chris-ha458 Aug 24, 2023
5bf89cc
we don't need fetch_add just for load
chris-ha458 Aug 24, 2023
12cbd1e
replace match
chris-ha458 Aug 24, 2023
3809ee1
improve error handling
chris-ha458 Aug 24, 2023
41b349a
improve error handling
chris-ha458 Aug 24, 2023
b08ceaa
simplify match
chris-ha458 Aug 24, 2023
2281bc0
Merge branch 'main' into rust_fixes
chris-ha458 Aug 27, 2023
97b4374
follow same patter as new_client()
chris-ha458 Aug 27, 2023
a44b8ca
simplify
chris-ha458 Aug 27, 2023
1cf934f
refactor s3 skipping
chris-ha458 Aug 27, 2023
6976fd1
? operator to replace match
chris-ha458 Aug 27, 2023
42e886a
remove unwrap
chris-ha458 Aug 27, 2023
4753e06
remove inner match as well
chris-ha458 Aug 27, 2023
d944c1d
more concise
chris-ha458 Aug 27, 2023
428d261
change from_str to to_string
chris-ha458 Aug 27, 2023
f8211c7
add bloom tests
chris-ha458 Aug 27, 2023
8264ec5
Update bloom_test.rs
chris-ha458 Aug 27, 2023
f84210c
Merge branch 'main' into rust_fixes
chris-ha458 Aug 28, 2023
13d5f17
Merge branch 'main' into add_tests
soldni Aug 30, 2023
ae04b16
Merge pull request #23 from chris-ha458/rust_fixes
soldni Aug 30, 2023
cf6d299
Merge branch 'main' into add_tests
soldni Aug 30, 2023
9ada514
Merge pull request #35 from chris-ha458/add_tests
soldni Aug 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions src/bloom_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::mem::size_of;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, Ordering};

mod bloom_test;
// A thread-safe bloom filter.
pub struct BloomFilter {
bits: Vec<AtomicU32>,
Expand Down Expand Up @@ -85,13 +86,10 @@ impl BloomFilter {
hash_builder_seeds.push(seeds);
}

let mut bits = Vec::new();
let number_of_u32 = size_in_bytes / size_of::<AtomicU32>();
bits.reserve_exact(number_of_u32);
for _ in 0..number_of_u32 {
bits.push(AtomicU32::new(0));
}

let bits: Vec<AtomicU32> = std::iter::repeat_with(|| AtomicU32::new(0))
.take(number_of_u32)
.collect();
Self {
bits,
hash_builder_seeds,
Expand Down Expand Up @@ -138,8 +136,7 @@ impl BloomFilter {
}

let number_of_elements = stream.read_u64::<LittleEndian>()?;
let mut bits = Vec::new();
bits.reserve_exact(number_of_elements as usize);
let mut bits = Vec::with_capacity(number_of_elements as usize);
for _ in 0..number_of_elements {
bits.push(AtomicU32::new(stream.read_u32::<NativeEndian>()?));
}
Expand Down Expand Up @@ -220,8 +217,7 @@ impl BloomFilter {
return false;
}
}

return true;
true
}

pub fn contains(&self, s: &VecDeque<&str>) -> bool {
Expand All @@ -237,7 +233,7 @@ impl BloomFilter {
} else {
log::info!("Creating new bloom filter...");
let mut bloom_filter_size: usize = config.size_in_bytes;
if bloom_filter_size <= 0 {
if bloom_filter_size == 0 {
bloom_filter_size = BloomFilter::suggest_size_in_bytes(
config.estimated_doc_count,
config.desired_false_positive_rate,
Expand Down
56 changes: 56 additions & 0 deletions src/bloom_filter/bloom_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#[cfg(test)]
mod tests {
use super::super::BloomFilter;

// n: number of items in filter. p: false positive rate
// m: number of bits in filter. k: number of hashers
// n = ceil(m / (-k / log(1 - exp(log(p) / k))))
// p = pow(1 - exp(-k / (m / n)), k)
// m = ceil((n * log(p)) / log(1 / pow(2, log(2))));
// k = round((m / n) * log(2));

#[test]
fn bloom_optimal_hasher_number() {
let size_in_bytes = 1_000_000_000;
let expected_elements = 1_000_000_000;
assert_eq!(
BloomFilter::optimal_number_of_hashers(size_in_bytes, expected_elements),
6
);
assert_eq!(
BloomFilter::optimal_number_of_hashers(1_000_000, 500_000),
12
)
}
#[test]
fn bloom_test_prob_of_false_positive() {
// calculated from https://hur.st/bloomfilter/
let size_in_bytes = 1_000_000_000;
let expected_elements = 1_000_000_000;
let num_hashers = 8;
assert_eq!(
BloomFilter::prob_of_false_positive(size_in_bytes, expected_elements, num_hashers),
0.025_491_740_593_406_025 as f64
);
assert_eq!(
BloomFilter::prob_of_false_positive(1_048_576, 524288, 2),
0.013_806_979_447_406_826 as f64
)
}

#[test]
fn bloom_suggest_size() {
// it's hard to derive this exactly since the algorithm is doing closest power of 2
// instead of exact theoretical optimum
let expected_elements = 1_000_000;
let desired_false_positive_rate = 0.0001 as f64;
let theoretical_optimum = ((expected_elements as f64 * desired_false_positive_rate.ln())
/ f64::ln(1.0 / 2.0f64.powf(2.0f64.ln())))
.ceil()
.div_euclid(8f64) as usize;
let suggested_size =
BloomFilter::suggest_size_in_bytes(expected_elements, desired_false_positive_rate);
assert_eq!(suggested_size, 4_194_304);
assert_eq!(suggested_size, theoretical_optimum.next_power_of_two())
}
}
117 changes: 52 additions & 65 deletions src/deduper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,32 @@ pub fn run(config: DeduperConfig) -> Result<u32, u32> {
let failed_shard_count_ref = failed_shard_count_ref.clone();
threadpool.execute(move || {
let result = write_attributes(path, work_dirs, dedupe, bloom_filter);
match result {
Ok(_) => {}
Err(e) => {
log::error!("Failed to process {:?}: {}", p, e);
failed_shard_count_ref.fetch_add(1, Ordering::Relaxed);
}
if let Err(e) = result {
log::error!("Failed to process {:?}: {}", p, e);
failed_shard_count_ref.fetch_add(1, Ordering::Relaxed);
}
});
}
threadpool.join();

let bloom_filter_file = PathBuf::from(&config.bloom_filter.file);
log::info!("Writing bloom filter to {:?}...", config.bloom_filter.file);
bloom_filter.write_to_file(&bloom_filter_file).unwrap();
log::info!("Bloom filter written.");

let failure_count = failed_shard_count_ref.fetch_add(0, Ordering::Relaxed);
match failure_count {
0 => {
log::info!("Done!");
return Ok(failure_count);
}
_ => {
log::error!("{} shards failed to process.", failure_count);
return Err(failure_count);
match bloom_filter.write_to_file(&bloom_filter_file) {
Ok(_) => log::info!("Bloom filter written."),
Err(e) => {
log::error!("Write failed: {}", e);
panic!("Failed to write bloom filter");
}
}

let failure_count = failed_shard_count_ref.load(Ordering::Relaxed);
if failure_count == 0 {
log::info!("Done!");
Ok(failure_count)
} else {
log::error!("{} shards failed to process.", failure_count);
Err(failure_count)
}
}

// Write attributes for the documents in the given file:
Expand All @@ -87,12 +86,8 @@ fn write_attributes(
};

let attrs_location = {
let mut attr_prefix = "/attributes/".to_owned();
attr_prefix.push_str(&dedupe_config.name);
attr_prefix.push_str("/");
docs_location
.to_owned()
.replace("/documents/", &attr_prefix)
let attr_prefix = format!("/attributes/{}/", &dedupe_config.name);
docs_location.replace("/documents/", &attr_prefix)
};
let local_output = cache.prepare_output(&attrs_location)?;
if local_output.exists() {
Expand Down Expand Up @@ -133,10 +128,9 @@ fn write_attributes(
GzEncoder::new(tmp_output, Compression::default()),
);

let mut line_number = 0;
for line in reader.lines() {
match line {
Ok(_) => {}
for (line_number, line) in reader.lines().enumerate() {
let line = match line {
Ok(line) => line,
Err(e) => {
log::error!(
"Error reading line {} of {}: {}",
Expand All @@ -146,45 +140,39 @@ fn write_attributes(
);
break;
}
}
line_number += 1;
let line = line?;
};
let data: Value = serde_json::from_str(&line)?;
let mut attributes = json!({});

match dedupe_config.documents {
Some(ref cfg) => {
let document_key = {
let mut finder = jsonpath_rust::JsonPathFinder::from_str("{}", &cfg.key)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.unwrap();
finder.set_json(Box::new(data.clone()));
finder
.find()
.as_array()
.unwrap()
.get(0)
.unwrap()
.as_str()
.unwrap()
.to_string()
};
if let Some(ref cfg) = dedupe_config.documents {
let document_key = {
let mut finder = jsonpath_rust::JsonPathFinder::from_str("{}", &cfg.key)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.unwrap();
finder.set_json(Box::new(data.clone()));
finder
.find()
.as_array()
.unwrap()
.get(0)
.unwrap()
.as_str()
.unwrap()
.to_string()
};

if dedupe_config.skip_empty.unwrap_or(false) && document_key.trim().is_empty() {
// skip empty documents if dedupe_config.skip_empty is true
// and the document key is empty after trimming (i.e., removing whitespace)
continue;
} else {
let mut dedupe_key = VecDeque::with_capacity(1);
dedupe_key.push_back(document_key.as_str());
if bloom_filter.contains(&dedupe_key) {
attributes[&cfg.attribute_name] = Value::Bool(true);
} else if !bloom_filter.read_only {
bloom_filter.insert(&dedupe_key);
}
if dedupe_config.skip_empty.unwrap_or(false) && document_key.trim().is_empty() {
// skip empty documents if dedupe_config.skip_empty is true
// and the document key is empty after trimming (i.e., removing whitespace)
continue;
} else {
let dedupe_key = VecDeque::from([document_key.as_str()]);
if bloom_filter.contains(&dedupe_key) {
attributes[&cfg.attribute_name] = Value::Bool(true);
} else if !bloom_filter.read_only {
bloom_filter.insert(&dedupe_key);
}
}
None => {}
}
match dedupe_config.paragraphs {
None => {}
Expand All @@ -193,7 +181,7 @@ fn write_attributes(
let text = data["text"].as_str().unwrap();
let text_length = text.len();
let mut offset = 0;
let paragraphs = text.split("\n");
let paragraphs = text.split('\n');
let mut duplicate_paragraph_spans = Vec::new();
for p in paragraphs {
let par_start = offset;
Expand All @@ -208,13 +196,12 @@ fn write_attributes(
// and the paragraph is empty after trimming (i.e., removing whitespace)
continue;
} else {
let mut dedupe_key = VecDeque::with_capacity(1);
dedupe_key.push_back(p);
let dedupe_key = VecDeque::from([p]);
if bloom_filter.contains(&dedupe_key) {
let span = vec![
Value::Number(par_start.into()),
Value::Number(par_end.into()),
Value::Number(1.into()),
Value::from(1),
];
// add span to duplicate_paragraph_spans
duplicate_paragraph_spans.push(Value::Array(span));
Expand Down
16 changes: 8 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,26 @@ use std::env;
fn deduper_entrypoint(config_str: &str) -> PyResult<()> {
let config: DeduperConfig = DeduperConfig::parse_from_string(config_str).unwrap();

match deduper::run(config) {
Ok(_) => Ok(()),
Err(cnt) => Err(exceptions::PyRuntimeError::new_err(format!(
if let Err(cnt) = deduper::run(config) {
return Err(exceptions::PyRuntimeError::new_err(format!(
"Failed with {} errors",
cnt
))),
)));
}
Ok(())
}

#[pyfunction]
fn mixer_entrypoint(config_str: &str) -> PyResult<()> {
//Result<u32, PyErr> {
let config: MixerConfig = MixerConfig::parse_from_string(config_str).unwrap();
match mixer::run(config) {
Ok(_) => Ok(()),
Err(cnt) => Err(exceptions::PyRuntimeError::new_err(format!(
if let Err(cnt) = mixer::run(config) {
return Err(exceptions::PyRuntimeError::new_err(format!(
"Failed with {} errors",
cnt
))),
)));
}
Ok(())
}

// A Python module implemented in Rust. The name of this function must match
Expand Down
29 changes: 11 additions & 18 deletions src/mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ pub fn run(config: MixerConfig) -> Result<u32, u32> {
let shards = Shard::split_streams(&config.streams).unwrap();

let threadpool = ThreadPool::new(config.processes);
let failed_shard_count = AtomicU32::new(0);
let failed_shard_count_ref = Arc::new(failed_shard_count);
let failed_shard_count_ref = Arc::new(AtomicU32::new(0));
for shard in shards {
let output_path = Path::new(&config.work_dir.output.clone()).join(&shard.output);
if output_path.exists() {
Expand All @@ -26,27 +25,21 @@ pub fn run(config: MixerConfig) -> Result<u32, u32> {

threadpool.execute(move || {
log::info!("Building output {:?}...", shard.output);
match shard.clone().process(work_dirs) {
Ok(_) => {}
Err(e) => {
log::error!("Error processing {:?}: {}", shard.output, e);
failed_shard_count_ref.fetch_add(1, Ordering::Relaxed);
}
if let Err(e) = shard.clone().process(work_dirs) {
log::error!("Error processing {:?}: {}", shard.output, e);
failed_shard_count_ref.fetch_add(1, Ordering::Relaxed);
}
});
}
threadpool.join();

let failure_count = failed_shard_count_ref.fetch_add(0, Ordering::Relaxed);
match failure_count {
0 => {
log::info!("Done!");
return Ok(failure_count);
}
_ => {
log::error!("{} shards failed to process.", failure_count);
return Err(failure_count);
}
let failure_count = failed_shard_count_ref.load(Ordering::Relaxed);
if failure_count == 0 {
log::info!("Done!");
Ok(failure_count)
} else {
log::error!("{} shards failed to process.", failure_count);
Err(failure_count)
}
}

Expand Down
Loading
Loading