Skip to content

Commit

Permalink
[PERF] Decrease compaction RAM usage and increase speed (#2729)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
- BTree of block deltas for postings list is now Vec<i32> instead of
IntArray since the overhead of IntArray was seen to be 2 KB per btree
add v/s 816 bytes for Vec<i32>
- When getting blocks from block deltas at commit time, the block deltas
are now drained. This reduces RAM consumption by about 50%
- Removed the intermediary postings list builder that was more of an
unnecessary complexity and also was doing clones
- Changed get() for postings list to read slices instead of Vec<i32>
getting rid of one more deep copy.

## Test plan
*How are these changes tested?*
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
sanketkedia authored and spikechroma committed Sep 12, 2024
1 parent 2d049af commit a7f80ff
Show file tree
Hide file tree
Showing 14 changed files with 470 additions and 749 deletions.
107 changes: 58 additions & 49 deletions rust/blockstore/src/arrow/block/delta/data_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::{
};
use arrow::{
array::{
Array, ArrayRef, BinaryBuilder, FixedSizeListBuilder, Float32Builder, StringBuilder,
StructArray,
Array, ArrayRef, BinaryBuilder, FixedSizeListBuilder, Float32Builder, RecordBatch,
StringBuilder, StructArray,
},
datatypes::{Field, Fields},
util::bit_util,
Expand Down Expand Up @@ -340,57 +340,63 @@ impl DataRecordStorage {
inner.storage.len()
}

pub(super) fn build_keys(&self, builder: BlockKeyArrowBuilder) -> BlockKeyArrowBuilder {
let inner = self.inner.read();
let mut builder = builder;
for (key, _) in inner.storage.iter() {
builder.add_key(key.clone());
}
builder
}

pub(super) fn to_arrow(&self) -> (Field, ArrayRef) {
let inner = self.inner.read();

let item_capacity = inner.storage.len();
pub(super) fn to_arrow(
self,
key_builder: BlockKeyArrowBuilder,
) -> Result<RecordBatch, arrow::error::ArrowError> {
// build arrow key.
let mut key_builder = key_builder;
let mut embedding_builder;
let mut id_builder;
let mut metadata_builder;
let mut document_builder;
let embedding_dim;
if item_capacity == 0 {
// ok to initialize fixed size float list with fixed size as 0.
embedding_dim = 0;
embedding_builder = FixedSizeListBuilder::new(Float32Builder::new(), 0);
id_builder = StringBuilder::new();
metadata_builder = BinaryBuilder::new();
document_builder = StringBuilder::new();
} else {
embedding_dim = inner.storage.iter().next().unwrap().1 .1.len();
// Assumes all embeddings are of the same length, which is guaranteed by calling code
// TODO: validate this assumption by throwing an error if it's not true
let total_embedding_count = embedding_dim * item_capacity;
id_builder = StringBuilder::with_capacity(item_capacity, inner.id_size);
embedding_builder = FixedSizeListBuilder::with_capacity(
Float32Builder::with_capacity(total_embedding_count),
embedding_dim as i32,
item_capacity,
);
metadata_builder = BinaryBuilder::with_capacity(item_capacity, inner.metadata_size);
document_builder = StringBuilder::with_capacity(item_capacity, inner.document_size);
}

let iter = inner.storage.iter();
for (_key, (id, embedding, metadata, document)) in iter {
id_builder.append_value(id);
let embedding_arr = embedding_builder.values();
for entry in embedding.iter() {
embedding_arr.append_value(*entry);
match Arc::try_unwrap(self.inner) {
Ok(inner) => {
let inner = inner.into_inner();
let storage = inner.storage;
let item_capacity = storage.len();
if item_capacity == 0 {
// ok to initialize fixed size float list with fixed size as 0.
embedding_dim = 0;
embedding_builder = FixedSizeListBuilder::new(Float32Builder::new(), 0);
id_builder = StringBuilder::new();
metadata_builder = BinaryBuilder::new();
document_builder = StringBuilder::new();
} else {
embedding_dim = storage.iter().next().unwrap().1 .1.len();
// Assumes all embeddings are of the same length, which is guaranteed by calling code
// TODO: validate this assumption by throwing an error if it's not true
let total_embedding_count = embedding_dim * item_capacity;
id_builder = StringBuilder::with_capacity(item_capacity, inner.id_size);
embedding_builder = FixedSizeListBuilder::with_capacity(
Float32Builder::with_capacity(total_embedding_count),
embedding_dim as i32,
item_capacity,
);
metadata_builder =
BinaryBuilder::with_capacity(item_capacity, inner.metadata_size);
document_builder =
StringBuilder::with_capacity(item_capacity, inner.document_size);
}
for (key, (id, embedding, metadata, document)) in storage.into_iter() {
key_builder.add_key(key);
id_builder.append_value(id);
let embedding_arr = embedding_builder.values();
for entry in embedding {
embedding_arr.append_value(entry);
}
embedding_builder.append(true);
metadata_builder.append_option(metadata.as_deref());
document_builder.append_option(document.as_deref());
}
}
Err(_) => {
panic!("Invariant violation: SingleColumnStorage inner should have only one reference.");
}
embedding_builder.append(true);
metadata_builder.append_option(metadata.as_deref());
document_builder.append_option(document.as_deref());
}
// Build arrow key with fields.
let (prefix_field, prefix_arr, key_field, key_arr) = key_builder.to_arrow();

let id_field = Field::new("id", arrow::datatypes::DataType::Utf8, true);
let embedding_field = Field::new(
Expand Down Expand Up @@ -439,9 +445,12 @@ impl DataRecordStorage {
arrow::datatypes::DataType::Struct(struct_fields),
true,
);
(
let value_arr = (&struct_arr as &dyn Array).slice(0, struct_arr.len());
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
prefix_field,
key_field,
struct_field,
(&struct_arr as &dyn Array).slice(0, struct_arr.len()),
)
]));
RecordBatch::try_new(schema, vec![prefix_arr, key_arr, value_arr])
}
}
49 changes: 25 additions & 24 deletions rust/blockstore/src/arrow/block/delta/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl BlockDelta {
self.builder.get_size::<K>()
}

pub fn finish<K: ArrowWriteableKey, V: ArrowWriteableValue>(&self) -> RecordBatch {
pub fn finish<K: ArrowWriteableKey, V: ArrowWriteableValue>(self) -> RecordBatch {
self.builder.to_record_batch::<K>()
}

Expand Down Expand Up @@ -121,7 +121,6 @@ impl BlockDelta {
#[cfg(test)]
mod test {
use crate::arrow::{block::Block, config::TEST_MAX_BLOCK_SIZE_BYTES, provider::BlockManager};
use arrow::array::Int32Array;
use chroma_cache::{
cache::Cache,
config::{CacheConfig, UnboundedCacheConfig},
Expand Down Expand Up @@ -154,7 +153,7 @@ mod test {
let storage = Storage::Local(LocalStorage::new(path));
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<&str, Int32Array>();
let delta = block_manager.create::<&str, Vec<u32>>();

let n = 2000;
for i in 0..n {
Expand All @@ -163,27 +162,25 @@ mod test {
let value_len: usize = rand::thread_rng().gen_range(1..100);
let mut new_vec = Vec::with_capacity(value_len);
for _ in 0..value_len {
new_vec.push(random::<i32>());
new_vec.push(random::<u32>());
}
delta.add::<&str, Int32Array>(prefix, &key, Int32Array::from(new_vec));
delta.add::<&str, Vec<u32>>(prefix, &key, new_vec);
}

let size = delta.get_size::<&str, Int32Array>();
// TODO: should commit take ownership of delta?
// Semantically, that makes sense, since a delta is unsuable after commit
let size = delta.get_size::<&str, Vec<u32>>();

let block = block_manager.commit::<&str, Int32Array>(&delta);
let block = block_manager.commit::<&str, Vec<u32>>(delta);
let mut values_before_flush = vec![];
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, Int32Array>("prefix", &key).unwrap();
values_before_flush.push(read);
let read = block.get::<&str, &[u32]>("prefix", &key).unwrap();
values_before_flush.push(read.to_vec());
}
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&block.clone().id).await.unwrap();
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, Int32Array>("prefix", &key).unwrap();
let read = block.get::<&str, &[u32]>("prefix", &key).unwrap();
assert_eq!(read, values_before_flush[i]);
}
test_save_load_size(path, &block);
Expand All @@ -208,7 +205,7 @@ mod test {
delta.add(prefix, key.as_str(), value.to_owned());
}
let size = delta.get_size::<&str, String>();
let block = block_manager.commit::<&str, String>(&delta);
let block = block_manager.commit::<&str, String>(delta);
let mut values_before_flush = vec![];
for i in 0..n {
let key = format!("key{}", i);
Expand Down Expand Up @@ -237,7 +234,7 @@ mod test {
// test fork
let forked_block = block_manager.fork::<&str, String>(&delta_id).await;
let new_id = forked_block.id.clone();
let block = block_manager.commit::<&str, String>(&forked_block);
let block = block_manager.commit::<&str, String>(forked_block);
block_manager.flush(&block).await.unwrap();
let forked_block = block_manager.get(&new_id).await.unwrap();
for i in 0..n {
Expand Down Expand Up @@ -265,15 +262,16 @@ mod test {
}

let size = delta.get_size::<f32, String>();
let block = block_manager.commit::<f32, String>(&delta);
let delta_id = delta.id.clone();
let block = block_manager.commit::<f32, String>(delta);
let mut values_before_flush = vec![];
for i in 0..n {
let key = i as f32;
let read = block.get::<f32, &str>("prefix", key).unwrap();
values_before_flush.push(read);
}
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap();
assert_eq!(size, block.get_size());
for i in 0..n {
let key = i as f32;
Expand Down Expand Up @@ -302,9 +300,10 @@ mod test {
}

let size = delta.get_size::<&str, RoaringBitmap>();
let block = block_manager.commit::<&str, RoaringBitmap>(&delta);
let delta_id = delta.id.clone();
let block = block_manager.commit::<&str, RoaringBitmap>(delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap();

assert_eq!(size, block.get_size());

Expand Down Expand Up @@ -366,9 +365,10 @@ mod test {
}

let size = delta.get_size::<&str, &DataRecord>();
let block = block_manager.commit::<&str, &DataRecord>(&delta);
let delta_id = delta.id.clone();
let block = block_manager.commit::<&str, &DataRecord>(delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap();
for i in 0..3 {
let read = block.get::<&str, DataRecord>("", ids[i]).unwrap();
assert_eq!(read.id, ids[i]);
Expand Down Expand Up @@ -400,9 +400,10 @@ mod test {
}

let size = delta.get_size::<u32, String>();
let block = block_manager.commit::<u32, String>(&delta);
let delta_id = delta.id.clone();
let block = block_manager.commit::<u32, String>(delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap();
assert_eq!(size, block.get_size());

// test save/load
Expand All @@ -427,7 +428,7 @@ mod test {
delta.add(prefix, key, value);
}
let size = delta.get_size::<u32, u32>();
let block = block_manager.commit::<u32, u32>(&delta);
let block = block_manager.commit::<u32, u32>(delta);
let mut values_before_flush = vec![];
for i in 0..n {
let key = i as u32;
Expand Down Expand Up @@ -456,7 +457,7 @@ mod test {
// test fork
let forked_block = block_manager.fork::<u32, u32>(&delta_id).await;
let new_id = forked_block.id.clone();
let block = block_manager.commit::<u32, u32>(&forked_block);
let block = block_manager.commit::<u32, u32>(forked_block);
block_manager.flush(&block).await.unwrap();
let forked_block = block_manager.get(&new_id).await.unwrap();
for i in 0..n {
Expand Down
Loading

0 comments on commit a7f80ff

Please sign in to comment.