Skip to content

Commit

Permalink
perf: parallelize indexing partitions (#3303)
Browse files Browse the repository at this point in the history
  • Loading branch information
BubbleCal authored Jan 3, 2025
1 parent 397dc27 commit 8585207
Showing 1 changed file with 128 additions and 74 deletions.
202 changes: 128 additions & 74 deletions rust/lance/src/index/vector/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use arrow::array::AsArray;
use arrow_array::{RecordBatch, UInt64Array};
use futures::prelude::stream::{StreamExt, TryStreamExt};
use futures::stream;
use futures::{stream, FutureExt};
use itertools::Itertools;
use lance_arrow::RecordBatchExt;
use lance_core::cache::FileMetadataCache;
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct IvfIndexBuilder<S: IvfSubIndex, Q: Quantization> {
// fields will be set during build
ivf: Option<IvfModel>,
quantizer: Option<Q>,
shuffle_reader: Option<Box<dyn ShuffleReader>>,
shuffle_reader: Option<Arc<dyn ShuffleReader>>,
partition_sizes: Vec<(usize, usize)>,

// fields for merging indices / remapping
Expand Down Expand Up @@ -412,7 +412,7 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
Some(Err(e)) => panic!("do this better: error reading first batch: {:?}", e),
None => {
log::info!("no data to shuffle");
self.shuffle_reader = Some(Box::new(IvfShufflerReader::new(
self.shuffle_reader = Some(Arc::new(IvfShufflerReader::new(
Arc::new(self.store.clone()),
self.temp_dir.clone(),
vec![0; ivf.num_partitions()],
Expand All @@ -427,18 +427,30 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
schema,
transformed_stream,
)))
.await?,
.await?
.into(),
);

Ok(self)
}

async fn build_partitions(&mut self) -> Result<&mut Self> {
let dataset = self.dataset.as_ref().ok_or(Error::invalid_input(
"dataset not set before building partitions",
location!(),
))?;
let ivf = self.ivf.as_ref().ok_or(Error::invalid_input(
"IVF not set before building partitions",
location!(),
))?;

let quantizer = self.quantizer.clone().ok_or(Error::invalid_input(
"quantizer not set before building partition",
location!(),
))?;
let sub_index_params = self.sub_index_params.clone().ok_or(Error::invalid_input(
"sub index params not set before building partition",
location!(),
))?;
let reader = self.shuffle_reader.as_ref().ok_or(Error::invalid_input(
"shuffle reader not set before building partitions",
location!(),
Expand All @@ -454,77 +466,78 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
.map(|(idx, _)| idx)
.collect::<Vec<_>>();

let dataset = Arc::new(dataset.clone());
let reader = reader.clone();
let existing_indices = Arc::new(self.existing_indices.clone());
let distance_type = self.distance_type;
let mut partition_sizes = vec![(0, 0); ivf.num_partitions()];
for (i, &partition) in partition_build_order.iter().enumerate() {
log::info!(
"building partition {}, progress {}/{}",
partition,
i + 1,
ivf.num_partitions(),
);
let mut batches = Vec::new();
for existing_index in self.existing_indices.iter() {
let existing_index = existing_index
.as_any()
.downcast_ref::<IVFIndex<S, Q>>()
.ok_or(Error::invalid_input(
"existing index is not IVF index",
location!(),
))?;

let part_storage = existing_index.load_partition_storage(partition).await?;
batches.extend(
self.take_vectors(part_storage.row_ids().cloned().collect_vec().as_ref())
.await?,
);
}
let build_iter = partition_build_order.iter().map(|&partition| {
let dataset = dataset.clone();
let reader = reader.clone();
let existing_indices = existing_indices.clone();
let column = self.column.clone();
let store = self.store.clone();
let temp_dir = self.temp_dir.clone();
let quantizer = quantizer.clone();
let sub_index_params = sub_index_params.clone();
async move {
let batches = Self::take_partition_batches(
partition,
existing_indices.as_ref(),
reader.as_ref(),
dataset.as_ref(),
&column,
&store,
)
.await?;

match reader.partition_size(partition)? {
0 => continue,
_ => {
let partition_data =
reader.read_partition(partition).await?.ok_or(Error::io(
format!("partition {} is empty", partition).as_str(),
location!(),
))?;
batches.extend(partition_data.try_collect::<Vec<_>>().await?);
let num_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
if num_rows == 0 {
return Ok((0, 0));
}
}
let batch = arrow::compute::concat_batches(&batches[0].schema(), batches.iter())?;

let num_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
if num_rows == 0 {
continue;
Self::build_partition(
&temp_dir,
column,
distance_type,
quantizer,
sub_index_params,
batch,
partition,
)
.await
}
let batch = arrow::compute::concat_batches(&batches[0].schema(), batches.iter())?;
let sizes = self.build_partition(partition, &batch).await?;
partition_sizes[partition] = sizes;
log::info!(
"partition {} built, progress {}/{}",
partition,
i + 1,
ivf.num_partitions()
);
});
let results = stream::iter(build_iter)
.buffered(get_num_compute_intensive_cpus())
.try_collect::<Vec<_>>()
.boxed()
.await?;

for (i, result) in results.into_iter().enumerate() {
partition_sizes[partition_build_order[i]] = result;
}

self.partition_sizes = partition_sizes;
Ok(self)
}

async fn build_partition(&self, part_id: usize, batch: &RecordBatch) -> Result<(usize, usize)> {
let quantizer = self.quantizer.clone().ok_or(Error::invalid_input(
"quantizer not set before building partition",
location!(),
))?;
let sub_index_params = self.sub_index_params.clone().ok_or(Error::invalid_input(
"sub index params not set before building partition",
location!(),
))?;

async fn build_partition(
temp_dir: &Path,
column: String,
distance_type: DistanceType,
quantizer: Q,
sub_index_params: S::BuildParams,
batch: RecordBatch,
part_id: usize,
) -> Result<(usize, usize)> {
let local_store = ObjectStore::local();
// build quantized vector storage
let storage_len = {
let storage = StorageBuilder::new(self.column.clone(), self.distance_type, quantizer)
.build(batch)?;
let path = self.temp_dir.child(format!("storage_part{}", part_id));
let storage =
StorageBuilder::new(column.clone(), distance_type, quantizer).build(&batch)?;
let path = temp_dir.child(format!("storage_part{}", part_id));
let batches = storage.to_batches()?;
FileWriter::create_file_with_batches(
&local_store,
Expand All @@ -538,10 +551,10 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>

// build the sub index, with in-memory storage
let index_len = {
let vectors = batch[&self.column].as_fixed_size_list();
let flat_storage = FlatFloatStorage::new(vectors.clone(), self.distance_type);
let vectors = batch[&column].as_fixed_size_list();
let flat_storage = FlatFloatStorage::new(vectors.clone(), distance_type);
let sub_index = S::index_vectors(&flat_storage, sub_index_params)?;
let path = self.temp_dir.child(format!("index_part{}", part_id));
let path = temp_dir.child(format!("index_part{}", part_id));
let index_batch = sub_index.to_batch()?;
let schema = index_batch.schema().as_ref().try_into()?;
FileWriter::create_file_with_batches(
Expand All @@ -557,6 +570,47 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
Ok((storage_len, index_len))
}

async fn take_partition_batches(
part_id: usize,
existing_indices: &[Arc<dyn VectorIndex>],
reader: &dyn ShuffleReader,
dataset: &Dataset,
column: &str,
store: &ObjectStore,
) -> Result<Vec<RecordBatch>> {
let mut batches = Vec::new();
for existing_index in existing_indices.iter() {
let existing_index = existing_index
.as_any()
.downcast_ref::<IVFIndex<S, Q>>()
.ok_or(Error::invalid_input(
"existing index is not IVF index",
location!(),
))?;

let part_storage = existing_index.load_partition_storage(part_id).await?;
batches.extend(
Self::take_vectors(
dataset,
column,
store,
part_storage.row_ids().cloned().collect_vec().as_ref(),
)
.await?,
);
}

if reader.partition_size(part_id)? > 0 {
let partition_data = reader.read_partition(part_id).await?.ok_or(Error::io(
format!("partition {} is empty", part_id).as_str(),
location!(),
))?;
batches.extend(partition_data.try_collect::<Vec<_>>().await?);
}

Ok(batches)
}

async fn merge_partitions(&mut self) -> Result<()> {
let ivf = self.ivf.as_ref().ok_or(Error::invalid_input(
"IVF not set before merge partitions",
Expand Down Expand Up @@ -707,16 +761,16 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>

// take vectors from the dataset
// used for reading vectors from existing indices
async fn take_vectors(&self, row_ids: &[u64]) -> Result<Vec<RecordBatch>> {
let dataset = self.dataset.as_ref().ok_or(Error::invalid_input(
"dataset not set before taking vectors",
location!(),
))?;
let column = self.column.clone();
let projection = Arc::new(dataset.schema().project(&[column.as_str()])?);
async fn take_vectors(
dataset: &Dataset,
column: &str,
store: &ObjectStore,
row_ids: &[u64],
) -> Result<Vec<RecordBatch>> {
let projection = Arc::new(dataset.schema().project(&[column])?);
// arrow uses i32 for index, so we chunk the row ids to avoid large batch causing overflow
let mut batches = Vec::new();
for chunk in row_ids.chunks(self.store.block_size()) {
for chunk in row_ids.chunks(store.block_size()) {
let batch = dataset
.take_rows(chunk, ProjectionRequest::Schema(projection.clone()))
.await?;
Expand Down

0 comments on commit 8585207

Please sign in to comment.