Skip to content

Commit

Permalink
fix: allow empty scalar indices and don't drop nulls on update (#3329)
Browse files Browse the repository at this point in the history
Creating empty scalar indices is not terribly useful but it can make
certain workflows simpler where users setup all their indices first and
then regularly add data and call optimize. This PR fixes a few bugs that
would be encountered in that workflow.

This also fixes a more serious issue where null values were being
dropped when indices were updated.
  • Loading branch information
westonpace authored Jan 3, 2025
1 parent c0c1b53 commit 397dc27
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 20 deletions.
81 changes: 81 additions & 0 deletions python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,84 @@ def test_label_list_index(tmp_path: Path):
indices = dataset.list_indices()
assert len(indices) == 1
assert indices[0]["type"] == "LabelList"


def test_create_index_empty_dataset(tmp_path: Path):
# Creating an index on an empty dataset is (currently) not terribly useful but
# we shouldn't return strange errors.
schema = pa.schema(
[
pa.field("btree", pa.int32()),
pa.field("bitmap", pa.int32()),
pa.field("label_list", pa.list_(pa.string())),
pa.field("inverted", pa.string()),
]
)
ds = lance.write_dataset([], tmp_path, schema=schema)

for index_type in ["BTREE", "BITMAP", "LABEL_LIST", "INVERTED"]:
ds.create_scalar_index(index_type.lower(), index_type=index_type)

# Make sure the empty index doesn't cause searches to fail
ds.insert(
pa.table(
{
"btree": pa.array([1], pa.int32()),
"bitmap": pa.array([1], pa.int32()),
"label_list": [["foo", "bar"]],
"inverted": ["blah"],
}
)
)

def test_searches():
assert ds.to_table(filter="btree = 1").num_rows == 1
assert ds.to_table(filter="btree = 0").num_rows == 0
assert ds.to_table(filter="bitmap = 1").num_rows == 1
assert ds.to_table(filter="bitmap = 0").num_rows == 0
assert ds.to_table(filter="array_has_any(label_list, ['foo'])").num_rows == 1
assert ds.to_table(filter="array_has_any(label_list, ['oof'])").num_rows == 0
assert ds.to_table(filter="inverted = 'blah'").num_rows == 1
assert ds.to_table(filter="inverted = 'halb'").num_rows == 0

test_searches()

# Make sure fetching index stats on empty index is ok
for idx in ds.list_indices():
ds.stats.index_stats(idx["name"])

# Make sure updating empty indices is ok
ds.optimize.optimize_indices()

# Finally, make sure we can still search after updating
test_searches()


def test_optimize_no_new_data(tmp_path: Path):
tbl = pa.table(
{"btree": pa.array([None], pa.int64()), "bitmap": pa.array([None], pa.int64())}
)
dataset = lance.write_dataset(tbl, tmp_path)
dataset.create_scalar_index("btree", index_type="BTREE")
dataset.create_scalar_index("bitmap", index_type="BITMAP")

assert dataset.to_table(filter="btree IS NULL").num_rows == 1
assert dataset.to_table(filter="bitmap IS NULL").num_rows == 1

dataset.insert([], schema=tbl.schema)
dataset.optimize.optimize_indices()

assert dataset.to_table(filter="btree IS NULL").num_rows == 1
assert dataset.to_table(filter="bitmap IS NULL").num_rows == 1

dataset.insert(pa.table({"btree": [2]}))
dataset.optimize.optimize_indices()

assert dataset.to_table(filter="btree IS NULL").num_rows == 1
assert dataset.to_table(filter="bitmap IS NULL").num_rows == 2

dataset.insert(pa.table({"bitmap": [2]}))
dataset.optimize.optimize_indices()

assert dataset.to_table(filter="btree IS NULL").num_rows == 2
assert dataset.to_table(filter="bitmap IS NULL").num_rows == 2
47 changes: 37 additions & 10 deletions rust/lance-index/src/scalar/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
};

use arrow::array::BinaryBuilder;
use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array};
use arrow_array::{new_empty_array, new_null_array, Array, BinaryArray, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use async_trait::async_trait;
use datafusion::physical_plan::SendableRecordBatchStream;
Expand Down Expand Up @@ -39,6 +39,8 @@ pub struct BitmapIndex {
index_map: BTreeMap<OrderableScalarValue, RowIdTreeMap>,
// We put null in its own map to avoid it matching range queries (arrow-rs considers null to come before minval)
null_map: RowIdTreeMap,
// The data type of the values in the index
value_type: DataType,
// Memoized index_map size for DeepSizeOf
index_map_size_bytes: usize,
store: Arc<dyn IndexStore>,
Expand All @@ -48,12 +50,14 @@ impl BitmapIndex {
fn new(
index_map: BTreeMap<OrderableScalarValue, RowIdTreeMap>,
null_map: RowIdTreeMap,
value_type: DataType,
index_map_size_bytes: usize,
store: Arc<dyn IndexStore>,
) -> Self {
Self {
index_map,
null_map,
value_type,
index_map_size_bytes,
store,
}
Expand All @@ -62,13 +66,18 @@ impl BitmapIndex {
// creates a new BitmapIndex from a serialized RecordBatch
fn try_from_serialized(data: RecordBatch, store: Arc<dyn IndexStore>) -> Result<Self> {
if data.num_rows() == 0 {
return Err(Error::Internal {
message: "attempt to load bitmap index from empty record batch".into(),
location: location!(),
});
let data_type = data.schema().field(0).data_type().clone();
return Ok(Self::new(
BTreeMap::new(),
RowIdTreeMap::default(),
data_type,
0,
store,
));
}

let dict_keys = data.column(0);
let value_type = dict_keys.data_type().clone();
let binary_bitmaps = data.column(1);
let bitmap_binary_array = binary_bitmaps
.as_any()
Expand All @@ -94,7 +103,13 @@ impl BitmapIndex {
}
}

Ok(Self::new(index_map, null_map, index_map_size_bytes, store))
Ok(Self::new(
index_map,
null_map,
value_type,
index_map_size_bytes,
store,
))
}
}

Expand Down Expand Up @@ -247,7 +262,7 @@ impl ScalarIndex for BitmapIndex {
(key.0.clone(), bitmap)
})
.collect::<HashMap<_, _>>();
write_bitmap_index(state, dest_store).await
write_bitmap_index(state, dest_store, &self.value_type).await
}

/// Add the new data into the index, creating an updated version of the index in `dest_store`
Expand All @@ -256,11 +271,17 @@ impl ScalarIndex for BitmapIndex {
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
) -> Result<()> {
let state = self
let mut state = self
.index_map
.iter()
.map(|(key, bitmap)| (key.0.clone(), bitmap.clone()))
.collect::<HashMap<_, _>>();

// Also insert the null map
let ex_null = new_null_array(&self.value_type, 1);
let ex_null = ScalarValue::try_from_array(ex_null.as_ref(), 0)?;
state.insert(ex_null, self.null_map.clone());

do_train_bitmap_index(new_data, state, dest_store).await
}
}
Expand Down Expand Up @@ -299,9 +320,14 @@ where
async fn write_bitmap_index(
state: HashMap<ScalarValue, RowIdTreeMap>,
index_store: &dyn IndexStore,
value_type: &DataType,
) -> Result<()> {
let keys_iter = state.keys().cloned();
let keys_array = ScalarValue::iter_to_array(keys_iter)?;
let keys_array = if state.is_empty() {
new_empty_array(value_type)
} else {
ScalarValue::iter_to_array(keys_iter)?
};

let values_iter = state.into_values();
let binary_bitmap_array = get_bitmaps_from_iter(values_iter);
Expand All @@ -321,6 +347,7 @@ async fn do_train_bitmap_index(
mut state: HashMap<ScalarValue, RowIdTreeMap>,
index_store: &dyn IndexStore,
) -> Result<()> {
let value_type = data_source.schema().field(0).data_type().clone();
while let Some(batch) = data_source.try_next().await? {
debug_assert_eq!(batch.num_columns(), 2);
debug_assert_eq!(*batch.column(1).data_type(), DataType::UInt64);
Expand All @@ -339,7 +366,7 @@ async fn do_train_bitmap_index(
}
}

write_bitmap_index(state, index_store).await
write_bitmap_index(state, index_store, &value_type).await
}

pub async fn train_bitmap_index(
Expand Down
27 changes: 18 additions & 9 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
sync::Arc,
};

use arrow_array::{Array, RecordBatch, UInt32Array};
use arrow_array::{new_empty_array, Array, RecordBatch, UInt32Array};
use arrow_schema::{DataType, Field, Schema, SortOptions};
use async_trait::async_trait;
use datafusion::{
Expand Down Expand Up @@ -577,6 +577,7 @@ impl BTreeLookup {
.iter()
.flat_map(|(_, pages)| pages)
.map(|page| page.page_number)
.chain(self.null_pages.iter().copied())
.collect::<Vec<_>>();
ids.dedup();
ids
Expand Down Expand Up @@ -792,10 +793,9 @@ impl BTreeIndex {
let mut null_pages = Vec::<u32>::new();

if data.num_rows() == 0 {
return Err(Error::Internal {
message: "attempt to load btree index from empty stats batch".into(),
location: location!(),
});
let data_type = data.column(0).data_type().clone();
let sub_index = Arc::new(FlatIndexMetadata::new(data_type));
return Ok(Self::new(map, null_pages, store, sub_index, batch_size));
}

let mins = data.column(0);
Expand Down Expand Up @@ -1139,9 +1139,17 @@ async fn train_btree_page(
})
}

fn btree_stats_as_batch(stats: Vec<EncodedBatch>) -> Result<RecordBatch> {
let mins = ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.min.clone()))?;
let maxs = ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.max.clone()))?;
fn btree_stats_as_batch(stats: Vec<EncodedBatch>, value_type: &DataType) -> Result<RecordBatch> {
let mins = if stats.is_empty() {
new_empty_array(value_type)
} else {
ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.min.clone()))?
};
let maxs = if stats.is_empty() {
new_empty_array(value_type)
} else {
ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.max.clone()))?
};
let null_counts = UInt32Array::from_iter_values(stats.iter().map(|stat| stat.stats.null_count));
let page_numbers = UInt32Array::from_iter_values(stats.iter().map(|stat| stat.page_number));

Expand Down Expand Up @@ -1207,6 +1215,7 @@ pub async fn train_btree_index(
let mut encoded_batches = Vec::new();
let mut batch_idx = 0;
let mut batches_source = data_source.scan_ordered_chunks(batch_size).await?;
let value_type = batches_source.schema().field(0).data_type().clone();
while let Some(batch) = batches_source.try_next().await? {
debug_assert_eq!(batch.num_columns(), 2);
debug_assert_eq!(*batch.column(1).data_type(), DataType::UInt64);
Expand All @@ -1216,7 +1225,7 @@ pub async fn train_btree_index(
batch_idx += 1;
}
sub_index_file.finish().await?;
let record_batch = btree_stats_as_batch(encoded_batches)?;
let record_batch = btree_stats_as_batch(encoded_batches, &value_type)?;
let mut file_schema = record_batch.schema().as_ref().clone();
file_schema
.metadata
Expand Down
4 changes: 3 additions & 1 deletion rust/lance-index/src/scalar/label_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ impl ScalarIndex for LabelListIndex {
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
) -> Result<()> {
self.values_index.update(new_data, dest_store).await
self.values_index
.update(unnest_chunks(new_data)?, dest_store)
.await
}
}

Expand Down

0 comments on commit 397dc27

Please sign in to comment.