Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Decrease compaction RAM usage and increase speed #2729

Merged
merged 13 commits into from
Aug 29, 2024
98 changes: 58 additions & 40 deletions rust/blockstore/src/arrow/block/delta/data_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::{
};
use arrow::{
array::{
Array, ArrayRef, BinaryBuilder, FixedSizeListBuilder, Float32Builder, StringBuilder,
StructArray,
Array, ArrayRef, BinaryBuilder, FixedSizeListBuilder, Float32Builder, RecordBatch,
StringBuilder, StructArray,
},
datatypes::{Field, Fields},
util::bit_util,
Expand Down Expand Up @@ -349,48 +349,63 @@ impl DataRecordStorage {
builder
}

pub(super) fn to_arrow(&self) -> (Field, ArrayRef) {
let inner = self.inner.read();

let item_capacity = inner.storage.len();
pub(super) fn to_arrow(
self,
key_builder: BlockKeyArrowBuilder,
) -> Result<RecordBatch, arrow::error::ArrowError> {
// build arrow key.
let mut key_builder = key_builder;
let mut embedding_builder;
let mut id_builder;
let mut metadata_builder;
let mut document_builder;
let embedding_dim;
if item_capacity == 0 {
// ok to initialize fixed size float list with fixed size as 0.
embedding_dim = 0;
embedding_builder = FixedSizeListBuilder::new(Float32Builder::new(), 0);
id_builder = StringBuilder::new();
metadata_builder = BinaryBuilder::new();
document_builder = StringBuilder::new();
} else {
embedding_dim = inner.storage.iter().next().unwrap().1 .1.len();
// Assumes all embeddings are of the same length, which is guaranteed by calling code
// TODO: validate this assumption by throwing an error if it's not true
let total_embedding_count = embedding_dim * item_capacity;
id_builder = StringBuilder::with_capacity(item_capacity, inner.id_size);
embedding_builder = FixedSizeListBuilder::with_capacity(
Float32Builder::with_capacity(total_embedding_count),
embedding_dim as i32,
item_capacity,
);
metadata_builder = BinaryBuilder::with_capacity(item_capacity, inner.metadata_size);
document_builder = StringBuilder::with_capacity(item_capacity, inner.document_size);
}

let iter = inner.storage.iter();
for (_key, (id, embedding, metadata, document)) in iter {
id_builder.append_value(id);
let embedding_arr = embedding_builder.values();
for entry in embedding.iter() {
embedding_arr.append_value(*entry);
match Arc::try_unwrap(self.inner) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we do let = match { match { so that we can avoid the nesting?

Ok(inner) => {
let inner = inner.into_inner();
let storage = inner.storage;
let item_capacity = storage.len();
if item_capacity == 0 {
// ok to initialize fixed size float list with fixed size as 0.
embedding_dim = 0;
embedding_builder = FixedSizeListBuilder::new(Float32Builder::new(), 0);
id_builder = StringBuilder::new();
metadata_builder = BinaryBuilder::new();
document_builder = StringBuilder::new();
} else {
embedding_dim = storage.iter().next().unwrap().1 .1.len();
// Assumes all embeddings are of the same length, which is guaranteed by calling code
// TODO: validate this assumption by throwing an error if it's not true
let total_embedding_count = embedding_dim * item_capacity;
id_builder = StringBuilder::with_capacity(item_capacity, inner.id_size);
embedding_builder = FixedSizeListBuilder::with_capacity(
Float32Builder::with_capacity(total_embedding_count),
embedding_dim as i32,
item_capacity,
);
metadata_builder =
BinaryBuilder::with_capacity(item_capacity, inner.metadata_size);
document_builder =
StringBuilder::with_capacity(item_capacity, inner.document_size);
}
for (key, (id, embedding, metadata, document)) in storage.into_iter() {
key_builder.add_key(key);
id_builder.append_value(id);
let embedding_arr = embedding_builder.values();
for entry in embedding {
embedding_arr.append_value(entry);
}
embedding_builder.append(true);
metadata_builder.append_option(metadata.as_deref());
document_builder.append_option(document.as_deref());
}
}
Err(_) => {
panic!("Invariant violation: SingleColumnStorage inner should have only one reference.");
}
embedding_builder.append(true);
metadata_builder.append_option(metadata.as_deref());
document_builder.append_option(document.as_deref());
}
// Build arrow key with fields.
let (prefix_field, prefix_arr, key_field, key_arr) = key_builder.to_arrow();

let id_field = Field::new("id", arrow::datatypes::DataType::Utf8, true);
let embedding_field = Field::new(
Expand Down Expand Up @@ -439,9 +454,12 @@ impl DataRecordStorage {
arrow::datatypes::DataType::Struct(struct_fields),
true,
);
(
let value_arr = (&struct_arr as &dyn Array).slice(0, struct_arr.len());
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
prefix_field,
key_field,
struct_field,
(&struct_arr as &dyn Array).slice(0, struct_arr.len()),
)
]));
RecordBatch::try_new(schema, vec![prefix_arr, key_arr, value_arr])
}
}
46 changes: 24 additions & 22 deletions rust/blockstore/src/arrow/block/delta/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl BlockDelta {
self.builder.get_size::<K>()
}

pub fn finish<K: ArrowWriteableKey, V: ArrowWriteableValue>(&self) -> RecordBatch {
pub fn finish<K: ArrowWriteableKey, V: ArrowWriteableValue>(self) -> RecordBatch {
self.builder.to_record_batch::<K>()
}

Expand Down Expand Up @@ -154,7 +154,7 @@ mod test {
let storage = Storage::Local(LocalStorage::new(path));
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<&str, Int32Array>();
let delta = block_manager.create::<&str, Vec<i32>>();

let n = 2000;
for i in 0..n {
Expand All @@ -165,25 +165,23 @@ mod test {
for _ in 0..value_len {
new_vec.push(random::<i32>());
}
delta.add::<&str, Int32Array>(prefix, &key, Int32Array::from(new_vec));
delta.add::<&str, Vec<i32>>(prefix, &key, new_vec);
}

let size = delta.get_size::<&str, Int32Array>();
// TODO: should commit take ownership of delta?
// Semantically, that makes sense, since a delta is unsuable after commit
let size = delta.get_size::<&str, Vec<i32>>();

let block = block_manager.commit::<&str, Int32Array>(&delta);
let block = block_manager.commit::<&str, Vec<i32>>(delta);
let mut values_before_flush = vec![];
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, Int32Array>("prefix", &key).unwrap();
values_before_flush.push(read);
let read = block.get::<&str, &[i32]>("prefix", &key).unwrap();
values_before_flush.push(read.to_vec());
}
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&block.clone().id).await.unwrap();
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, Int32Array>("prefix", &key).unwrap();
let read = block.get::<&str, &[i32]>("prefix", &key).unwrap();
assert_eq!(read, values_before_flush[i]);
}
test_save_load_size(path, &block);
Expand All @@ -208,7 +206,7 @@ mod test {
delta.add(prefix, key.as_str(), value.to_owned());
}
let size = delta.get_size::<&str, String>();
let block = block_manager.commit::<&str, String>(&delta);
let block = block_manager.commit::<&str, String>(delta);
let mut values_before_flush = vec![];
for i in 0..n {
let key = format!("key{}", i);
Expand Down Expand Up @@ -237,7 +235,7 @@ mod test {
// test fork
let forked_block = block_manager.fork::<&str, String>(&delta_id).await;
let new_id = forked_block.id.clone();
let block = block_manager.commit::<&str, String>(&forked_block);
let block = block_manager.commit::<&str, String>(forked_block);
block_manager.flush(&block).await.unwrap();
let forked_block = block_manager.get(&new_id).await.unwrap();
for i in 0..n {
Expand Down Expand Up @@ -265,15 +263,16 @@ mod test {
}

let size = delta.get_size::<f32, String>();
let block = block_manager.commit::<f32, String>(&delta);
let delta_id = delta.id.clone();
let block = block_manager.commit::<f32, String>(delta);
let mut values_before_flush = vec![];
for i in 0..n {
let key = i as f32;
let read = block.get::<f32, &str>("prefix", key).unwrap();
values_before_flush.push(read);
}
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap();
assert_eq!(size, block.get_size());
for i in 0..n {
let key = i as f32;
Expand Down Expand Up @@ -302,9 +301,10 @@ mod test {
}

let size = delta.get_size::<&str, RoaringBitmap>();
let block = block_manager.commit::<&str, RoaringBitmap>(&delta);
let delta_id = delta.id.clone();
let block = block_manager.commit::<&str, RoaringBitmap>(delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap();

assert_eq!(size, block.get_size());

Expand Down Expand Up @@ -366,9 +366,10 @@ mod test {
}

let size = delta.get_size::<&str, &DataRecord>();
let block = block_manager.commit::<&str, &DataRecord>(&delta);
let delta_id = delta.id.clone();
let block = block_manager.commit::<&str, &DataRecord>(delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap();
for i in 0..3 {
let read = block.get::<&str, DataRecord>("", ids[i]).unwrap();
assert_eq!(read.id, ids[i]);
Expand Down Expand Up @@ -400,9 +401,10 @@ mod test {
}

let size = delta.get_size::<u32, String>();
let block = block_manager.commit::<u32, String>(&delta);
let delta_id = delta.id.clone();
let block = block_manager.commit::<u32, String>(delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap();
assert_eq!(size, block.get_size());

// test save/load
Expand All @@ -427,7 +429,7 @@ mod test {
delta.add(prefix, key, value);
}
let size = delta.get_size::<u32, u32>();
let block = block_manager.commit::<u32, u32>(&delta);
let block = block_manager.commit::<u32, u32>(delta);
let mut values_before_flush = vec![];
for i in 0..n {
let key = i as u32;
Expand Down Expand Up @@ -456,7 +458,7 @@ mod test {
// test fork
let forked_block = block_manager.fork::<u32, u32>(&delta_id).await;
let new_id = forked_block.id.clone();
let block = block_manager.commit::<u32, u32>(&forked_block);
let block = block_manager.commit::<u32, u32>(forked_block);
block_manager.flush(&block).await.unwrap();
let forked_block = block_manager.get(&new_id).await.unwrap();
for i in 0..n {
Expand Down
Loading
Loading