Skip to content

Commit

Permalink
[PERF]: better locking of uncommitted tracking maps (decrease compact…
Browse files Browse the repository at this point in the history
…ion time by 3x) (#2736)
  • Loading branch information
codetheweb authored Aug 29, 2024
1 parent 60d9342 commit b491c2f
Showing 1 changed file with 59 additions and 48 deletions.
107 changes: 59 additions & 48 deletions rust/index/src/fulltext/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use chroma_types::{BooleanOperator, WhereDocument, WhereDocumentOperator};
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tantivy::tokenizer::Token;
use thiserror::Error;
use uuid::Uuid;

Expand Down Expand Up @@ -86,55 +87,59 @@ impl<'me> FullTextIndexWriter<'me> {

async fn populate_frequencies_and_posting_lists_from_previous_version(
&self,
token: &str,
tokens: &[Token],
) -> Result<(), FullTextIndexError> {
let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await;
match uncommitted_frequencies.get(token) {
Some(_) => return Ok(()),
None => {
// (Scoped to limit the lifetime of the lock)
{
let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await;
for token in tokens {
if uncommitted_frequencies.contains_key(&token.text) {
continue;
}

let frequency = match &self.full_text_index_reader {
// Readers are uninitialized until the first compaction finishes
// so there is a case when this is none hence not an error.
None => 0,
Some(reader) => match reader.get_frequencies_for_token(token).await {
Ok(frequency) => frequency,
// New token so start with frequency of 0.
Err(_) => 0,
},
Some(reader) => {
match reader.get_frequencies_for_token(token.text.as_str()).await {
Ok(frequency) => frequency,
// New token so start with frequency of 0.
Err(_) => 0,
}
}
};
uncommitted_frequencies
.insert(token.to_string(), (frequency as i32, frequency as i32));
.insert(token.text.clone(), (frequency as i32, frequency as i32));
}
}

let mut uncommitted_postings = self.uncommitted_postings.lock().await;
match uncommitted_postings.positional_postings.get(token) {
Some(_) => {
// This should never happen -- if uncommitted has the token, then
// uncommitted_frequencies should have had it as well.
tracing::error!(
"Error populating frequencies and posting lists from previous version"
);
return Err(FullTextIndexError::InvariantViolation);
for token in tokens {
if uncommitted_postings
.positional_postings
.contains_key(&token.text)
{
continue;
}
None => {
let results = match &self.full_text_index_reader {
// Readers are uninitialized until the first compaction finishes
// so there is a case when this is none hence not an error.
None => vec![],
Some(reader) => match reader.get_all_results_for_token(token).await {
Ok(results) => results,
// New token so start with empty postings list.
Err(_) => vec![],
},
};
let mut doc_and_positions = HashMap::new();
for result in results {
doc_and_positions.insert(result.0, result.1);
}
uncommitted_postings
.positional_postings
.insert(token.to_string(), doc_and_positions);

let results = match &self.full_text_index_reader {
// Readers are uninitialized until the first compaction finishes
// so there is a case when this is none hence not an error.
None => vec![],
Some(reader) => match reader.get_all_results_for_token(&token.text).await {
Ok(results) => results,
// New token so start with empty postings list.
Err(_) => vec![],
},
};
let mut doc_and_positions = HashMap::new();
for result in results {
doc_and_positions.insert(result.0, result.1);
}
uncommitted_postings
.positional_postings
.insert(token.text.clone(), doc_and_positions);
}
Ok(())
}
Expand All @@ -149,16 +154,19 @@ impl<'me> FullTextIndexWriter<'me> {
offset_id: u32,
) -> Result<(), FullTextIndexError> {
let tokens = self.encode_tokens(document);
for token in tokens.get_tokens() {
self.populate_frequencies_and_posting_lists_from_previous_version(token.text.as_str())
.await?;
let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await;
let tokens = tokens.get_tokens();
self.populate_frequencies_and_posting_lists_from_previous_version(tokens)
.await?;
let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await;
let mut uncommitted_postings = self.uncommitted_postings.lock().await;

for token in tokens {
// The entry should always exist because self.populate_frequencies_and_posting_lists_from_previous_version
// will have created it if this token is new to the system.
uncommitted_frequencies
.entry(token.text.to_string())
.entry(token.text.clone())
.and_modify(|e| (*e).0 += 1);
let mut uncommitted_postings = self.uncommitted_postings.lock().await;

// For a new token, the uncommitted list will not contain any entry so insert
// an empty builder in that case.
let builder = uncommitted_postings
Expand Down Expand Up @@ -191,10 +199,14 @@ impl<'me> FullTextIndexWriter<'me> {
offset_id: u32,
) -> Result<(), FullTextIndexError> {
let tokens = self.encode_tokens(document);
for token in tokens.get_tokens() {
self.populate_frequencies_and_posting_lists_from_previous_version(token.text.as_str())
.await?;
let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await;
let tokens = tokens.get_tokens();

self.populate_frequencies_and_posting_lists_from_previous_version(tokens)
.await?;
let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await;
let mut uncommitted_postings = self.uncommitted_postings.lock().await;

for token in tokens {
match uncommitted_frequencies.get_mut(token.text.as_str()) {
Some(frequency) => {
(*frequency).0 -= 1;
Expand All @@ -205,7 +217,6 @@ impl<'me> FullTextIndexWriter<'me> {
return Err(FullTextIndexError::InvariantViolation);
}
}
let mut uncommitted_postings = self.uncommitted_postings.lock().await;
match uncommitted_postings
.positional_postings
.get_mut(token.text.as_str())
Expand Down

0 comments on commit b491c2f

Please sign in to comment.