From b491c2fe04ed582ef000f2064851db5a4840b987 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Thu, 29 Aug 2024 12:45:42 -0700 Subject: [PATCH] [PERF]: better locking of uncommitted tracking maps (decrease compaction time by 3x) (#2736) --- rust/index/src/fulltext/types.rs | 107 +++++++++++++++++-------------- 1 file changed, 59 insertions(+), 48 deletions(-) diff --git a/rust/index/src/fulltext/types.rs b/rust/index/src/fulltext/types.rs index 4b035e2f689..dabdf5e4459 100644 --- a/rust/index/src/fulltext/types.rs +++ b/rust/index/src/fulltext/types.rs @@ -7,6 +7,7 @@ use chroma_types::{BooleanOperator, WhereDocument, WhereDocumentOperator}; use parking_lot::Mutex; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use tantivy::tokenizer::Token; use thiserror::Error; use uuid::Uuid; @@ -86,55 +87,59 @@ impl<'me> FullTextIndexWriter<'me> { async fn populate_frequencies_and_posting_lists_from_previous_version( &self, - token: &str, + tokens: &[Token], ) -> Result<(), FullTextIndexError> { - let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await; - match uncommitted_frequencies.get(token) { - Some(_) => return Ok(()), - None => { + // (Scoped to limit the lifetime of the lock) + { + let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await; + for token in tokens { + if uncommitted_frequencies.contains_key(&token.text) { + continue; + } + let frequency = match &self.full_text_index_reader { // Readers are uninitialized until the first compaction finishes // so there is a case when this is none hence not an error. None => 0, - Some(reader) => match reader.get_frequencies_for_token(token).await { - Ok(frequency) => frequency, - // New token so start with frequency of 0. - Err(_) => 0, - }, + Some(reader) => { + match reader.get_frequencies_for_token(token.text.as_str()).await { + Ok(frequency) => frequency, + // New token so start with frequency of 0. + Err(_) => 0, + } + } }; uncommitted_frequencies - .insert(token.to_string(), (frequency as i32, frequency as i32)); + .insert(token.text.clone(), (frequency as i32, frequency as i32)); } } + let mut uncommitted_postings = self.uncommitted_postings.lock().await; - match uncommitted_postings.positional_postings.get(token) { - Some(_) => { - // This should never happen -- if uncommitted has the token, then - // uncommitted_frequencies should have had it as well. - tracing::error!( - "Error populating frequencies and posting lists from previous version" - ); - return Err(FullTextIndexError::InvariantViolation); + for token in tokens { + if uncommitted_postings + .positional_postings + .contains_key(&token.text) + { + continue; } - None => { - let results = match &self.full_text_index_reader { - // Readers are uninitialized until the first compaction finishes - // so there is a case when this is none hence not an error. - None => vec![], - Some(reader) => match reader.get_all_results_for_token(token).await { - Ok(results) => results, - // New token so start with empty postings list. - Err(_) => vec![], - }, - }; - let mut doc_and_positions = HashMap::new(); - for result in results { - doc_and_positions.insert(result.0, result.1); - } - uncommitted_postings - .positional_postings - .insert(token.to_string(), doc_and_positions); + + let results = match &self.full_text_index_reader { + // Readers are uninitialized until the first compaction finishes + // so there is a case when this is none hence not an error. + None => vec![], + Some(reader) => match reader.get_all_results_for_token(&token.text).await { + Ok(results) => results, + // New token so start with empty postings list. + Err(_) => vec![], + }, + }; + let mut doc_and_positions = HashMap::new(); + for result in results { + doc_and_positions.insert(result.0, result.1); } + uncommitted_postings + .positional_postings + .insert(token.text.clone(), doc_and_positions); } Ok(()) } @@ -149,16 +154,19 @@ impl<'me> FullTextIndexWriter<'me> { offset_id: u32, ) -> Result<(), FullTextIndexError> { let tokens = self.encode_tokens(document); - for token in tokens.get_tokens() { - self.populate_frequencies_and_posting_lists_from_previous_version(token.text.as_str()) - .await?; - let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await; + let tokens = tokens.get_tokens(); + self.populate_frequencies_and_posting_lists_from_previous_version(tokens) + .await?; + let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await; + let mut uncommitted_postings = self.uncommitted_postings.lock().await; + + for token in tokens { // The entry should always exist because self.populate_frequencies_and_posting_lists_from_previous_version // will have created it if this token is new to the system. uncommitted_frequencies - .entry(token.text.to_string()) + .entry(token.text.clone()) .and_modify(|e| (*e).0 += 1); - let mut uncommitted_postings = self.uncommitted_postings.lock().await; + // For a new token, the uncommitted list will not contain any entry so insert // an empty builder in that case. let builder = uncommitted_postings @@ -191,10 +199,14 @@ impl<'me> FullTextIndexWriter<'me> { offset_id: u32, ) -> Result<(), FullTextIndexError> { let tokens = self.encode_tokens(document); - for token in tokens.get_tokens() { - self.populate_frequencies_and_posting_lists_from_previous_version(token.text.as_str()) - .await?; - let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await; + let tokens = tokens.get_tokens(); + + self.populate_frequencies_and_posting_lists_from_previous_version(tokens) + .await?; + let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await; + let mut uncommitted_postings = self.uncommitted_postings.lock().await; + + for token in tokens { match uncommitted_frequencies.get_mut(token.text.as_str()) { Some(frequency) => { (*frequency).0 -= 1; @@ -205,7 +217,6 @@ impl<'me> FullTextIndexWriter<'me> { return Err(FullTextIndexError::InvariantViolation); } } - let mut uncommitted_postings = self.uncommitted_postings.lock().await; match uncommitted_postings .positional_postings .get_mut(token.text.as_str())