diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 6669f27a7f..e58069b4a4 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -407,3 +407,84 @@ def test_label_list_index(tmp_path: Path): indices = dataset.list_indices() assert len(indices) == 1 assert indices[0]["type"] == "LabelList" + + +def test_create_index_empty_dataset(tmp_path: Path): + # Creating an index on an empty dataset is (currently) not terribly useful but + # we shouldn't return strange errors. + schema = pa.schema( + [ + pa.field("btree", pa.int32()), + pa.field("bitmap", pa.int32()), + pa.field("label_list", pa.list_(pa.string())), + pa.field("inverted", pa.string()), + ] + ) + ds = lance.write_dataset([], tmp_path, schema=schema) + + for index_type in ["BTREE", "BITMAP", "LABEL_LIST", "INVERTED"]: + ds.create_scalar_index(index_type.lower(), index_type=index_type) + + # Make sure the empty index doesn't cause searches to fail + ds.insert( + pa.table( + { + "btree": pa.array([1], pa.int32()), + "bitmap": pa.array([1], pa.int32()), + "label_list": [["foo", "bar"]], + "inverted": ["blah"], + } + ) + ) + + def test_searches(): + assert ds.to_table(filter="btree = 1").num_rows == 1 + assert ds.to_table(filter="btree = 0").num_rows == 0 + assert ds.to_table(filter="bitmap = 1").num_rows == 1 + assert ds.to_table(filter="bitmap = 0").num_rows == 0 + assert ds.to_table(filter="array_has_any(label_list, ['foo'])").num_rows == 1 + assert ds.to_table(filter="array_has_any(label_list, ['oof'])").num_rows == 0 + assert ds.to_table(filter="inverted = 'blah'").num_rows == 1 + assert ds.to_table(filter="inverted = 'halb'").num_rows == 0 + + test_searches() + + # Make sure fetching index stats on empty index is ok + for idx in ds.list_indices(): + ds.stats.index_stats(idx["name"]) + + # Make sure updating empty indices is ok + ds.optimize.optimize_indices() + + # Finally, make sure we can still search after updating + test_searches() + + +def test_optimize_no_new_data(tmp_path: Path): + tbl = pa.table( + {"btree": pa.array([None], pa.int64()), "bitmap": pa.array([None], pa.int64())} + ) + dataset = lance.write_dataset(tbl, tmp_path) + dataset.create_scalar_index("btree", index_type="BTREE") + dataset.create_scalar_index("bitmap", index_type="BITMAP") + + assert dataset.to_table(filter="btree IS NULL").num_rows == 1 + assert dataset.to_table(filter="bitmap IS NULL").num_rows == 1 + + dataset.insert([], schema=tbl.schema) + dataset.optimize.optimize_indices() + + assert dataset.to_table(filter="btree IS NULL").num_rows == 1 + assert dataset.to_table(filter="bitmap IS NULL").num_rows == 1 + + dataset.insert(pa.table({"btree": [2]})) + dataset.optimize.optimize_indices() + + assert dataset.to_table(filter="btree IS NULL").num_rows == 1 + assert dataset.to_table(filter="bitmap IS NULL").num_rows == 2 + + dataset.insert(pa.table({"bitmap": [2]})) + dataset.optimize.optimize_indices() + + assert dataset.to_table(filter="btree IS NULL").num_rows == 2 + assert dataset.to_table(filter="bitmap IS NULL").num_rows == 2 diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index ca03d25018..0c0454b81b 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -10,7 +10,7 @@ use std::{ }; use arrow::array::BinaryBuilder; -use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array}; +use arrow_array::{new_empty_array, new_null_array, Array, BinaryArray, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use async_trait::async_trait; use datafusion::physical_plan::SendableRecordBatchStream; @@ -39,6 +39,8 @@ pub struct BitmapIndex { index_map: BTreeMap, // We put null in its own map to avoid it matching range queries (arrow-rs considers null to come before minval) null_map: RowIdTreeMap, + // The data type of the values in the index + value_type: DataType, // Memoized index_map size for DeepSizeOf index_map_size_bytes: usize, store: Arc, @@ -48,12 +50,14 @@ impl BitmapIndex { fn new( index_map: BTreeMap, null_map: RowIdTreeMap, + value_type: DataType, index_map_size_bytes: usize, store: Arc, ) -> Self { Self { index_map, null_map, + value_type, index_map_size_bytes, store, } @@ -62,13 +66,18 @@ impl BitmapIndex { // creates a new BitmapIndex from a serialized RecordBatch fn try_from_serialized(data: RecordBatch, store: Arc) -> Result { if data.num_rows() == 0 { - return Err(Error::Internal { - message: "attempt to load bitmap index from empty record batch".into(), - location: location!(), - }); + let data_type = data.schema().field(0).data_type().clone(); + return Ok(Self::new( + BTreeMap::new(), + RowIdTreeMap::default(), + data_type, + 0, + store, + )); } let dict_keys = data.column(0); + let value_type = dict_keys.data_type().clone(); let binary_bitmaps = data.column(1); let bitmap_binary_array = binary_bitmaps .as_any() @@ -94,7 +103,13 @@ impl BitmapIndex { } } - Ok(Self::new(index_map, null_map, index_map_size_bytes, store)) + Ok(Self::new( + index_map, + null_map, + value_type, + index_map_size_bytes, + store, + )) } } @@ -247,7 +262,7 @@ impl ScalarIndex for BitmapIndex { (key.0.clone(), bitmap) }) .collect::>(); - write_bitmap_index(state, dest_store).await + write_bitmap_index(state, dest_store, &self.value_type).await } /// Add the new data into the index, creating an updated version of the index in `dest_store` @@ -256,11 +271,17 @@ impl ScalarIndex for BitmapIndex { new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, ) -> Result<()> { - let state = self + let mut state = self .index_map .iter() .map(|(key, bitmap)| (key.0.clone(), bitmap.clone())) .collect::>(); + + // Also insert the null map + let ex_null = new_null_array(&self.value_type, 1); + let ex_null = ScalarValue::try_from_array(ex_null.as_ref(), 0)?; + state.insert(ex_null, self.null_map.clone()); + do_train_bitmap_index(new_data, state, dest_store).await } } @@ -299,9 +320,14 @@ where async fn write_bitmap_index( state: HashMap, index_store: &dyn IndexStore, + value_type: &DataType, ) -> Result<()> { let keys_iter = state.keys().cloned(); - let keys_array = ScalarValue::iter_to_array(keys_iter)?; + let keys_array = if state.is_empty() { + new_empty_array(value_type) + } else { + ScalarValue::iter_to_array(keys_iter)? + }; let values_iter = state.into_values(); let binary_bitmap_array = get_bitmaps_from_iter(values_iter); @@ -321,6 +347,7 @@ async fn do_train_bitmap_index( mut state: HashMap, index_store: &dyn IndexStore, ) -> Result<()> { + let value_type = data_source.schema().field(0).data_type().clone(); while let Some(batch) = data_source.try_next().await? { debug_assert_eq!(batch.num_columns(), 2); debug_assert_eq!(*batch.column(1).data_type(), DataType::UInt64); @@ -339,7 +366,7 @@ async fn do_train_bitmap_index( } } - write_bitmap_index(state, index_store).await + write_bitmap_index(state, index_store, &value_type).await } pub async fn train_bitmap_index( diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 2590e2863b..2624c81691 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -10,7 +10,7 @@ use std::{ sync::Arc, }; -use arrow_array::{Array, RecordBatch, UInt32Array}; +use arrow_array::{new_empty_array, Array, RecordBatch, UInt32Array}; use arrow_schema::{DataType, Field, Schema, SortOptions}; use async_trait::async_trait; use datafusion::{ @@ -577,6 +577,7 @@ impl BTreeLookup { .iter() .flat_map(|(_, pages)| pages) .map(|page| page.page_number) + .chain(self.null_pages.iter().copied()) .collect::>(); ids.dedup(); ids @@ -792,10 +793,9 @@ impl BTreeIndex { let mut null_pages = Vec::::new(); if data.num_rows() == 0 { - return Err(Error::Internal { - message: "attempt to load btree index from empty stats batch".into(), - location: location!(), - }); + let data_type = data.column(0).data_type().clone(); + let sub_index = Arc::new(FlatIndexMetadata::new(data_type)); + return Ok(Self::new(map, null_pages, store, sub_index, batch_size)); } let mins = data.column(0); @@ -1139,9 +1139,17 @@ async fn train_btree_page( }) } -fn btree_stats_as_batch(stats: Vec) -> Result { - let mins = ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.min.clone()))?; - let maxs = ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.max.clone()))?; +fn btree_stats_as_batch(stats: Vec, value_type: &DataType) -> Result { + let mins = if stats.is_empty() { + new_empty_array(value_type) + } else { + ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.min.clone()))? + }; + let maxs = if stats.is_empty() { + new_empty_array(value_type) + } else { + ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.max.clone()))? + }; let null_counts = UInt32Array::from_iter_values(stats.iter().map(|stat| stat.stats.null_count)); let page_numbers = UInt32Array::from_iter_values(stats.iter().map(|stat| stat.page_number)); @@ -1207,6 +1215,7 @@ pub async fn train_btree_index( let mut encoded_batches = Vec::new(); let mut batch_idx = 0; let mut batches_source = data_source.scan_ordered_chunks(batch_size).await?; + let value_type = batches_source.schema().field(0).data_type().clone(); while let Some(batch) = batches_source.try_next().await? { debug_assert_eq!(batch.num_columns(), 2); debug_assert_eq!(*batch.column(1).data_type(), DataType::UInt64); @@ -1216,7 +1225,7 @@ pub async fn train_btree_index( batch_idx += 1; } sub_index_file.finish().await?; - let record_batch = btree_stats_as_batch(encoded_batches)?; + let record_batch = btree_stats_as_batch(encoded_batches, &value_type)?; let mut file_schema = record_batch.schema().as_ref().clone(); file_schema .metadata diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index a54fcf9ed5..a15be077da 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -158,7 +158,9 @@ impl ScalarIndex for LabelListIndex { new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, ) -> Result<()> { - self.values_index.update(new_data, dest_store).await + self.values_index + .update(unnest_chunks(new_data)?, dest_store) + .await } }