From 2aaa73be515a29b771a897ff70569e2fac34b7d0 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 14 Jan 2025 13:39:03 -0800 Subject: [PATCH 1/2] add drop_index --- protos/transaction.proto | 4 +- python/python/lance/dataset.py | 10 ++ python/python/lance/lance/__init__.pyi | 1 + python/python/tests/test_scalar_index.py | 39 +++++++ python/python/tests/test_vector_index.py | 24 +++++ python/src/dataset.rs | 9 ++ rust/lance-index/src/traits.rs | 10 ++ rust/lance/src/index.rs | 128 ++++++++++++++++++++++- 8 files changed, 219 insertions(+), 6 deletions(-) diff --git a/protos/transaction.proto b/protos/transaction.proto index 1d14fd49a5..9959c5e75a 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -67,7 +67,9 @@ message Transaction { // Add or replace a new secondary index. // - // - new_indices: the modified indices + // This is also used to remove an index (we are replacing it with nothing) + // + // - new_indices: the modified indices, empty if dropping indices only // - removed_indices: the indices that are being replaced message CreateIndex { repeated IndexMetadata new_indices = 1; diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 27361148fc..e3d9aa77c6 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2222,6 +2222,16 @@ def create_index( ) return self + def drop_index(self, column: str, name: str): + """ + Drops an index from the dataset + + Note: A name is required, even if one was not provided when the index was + created. You can use the `list_indices` method to get the names of the + indices. + """ + return self._ds.drop_index(column, name) + def session(self) -> Session: """ Return the dataset session, which holds the dataset's state. diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index ed862b1fa1..c83d97bded 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -265,6 +265,7 @@ class _Dataset: storage_options: Optional[Dict[str, str]] = None, kwargs: Optional[Dict[str, Any]] = None, ): ... + def drop_index(self, column: str, name: str): ... def count_fragments(self) -> int: ... def num_small_files(self, max_rows_per_group: int) -> int: ... def get_fragments(self) -> List[_Fragment]: ... diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 1dadd3c202..e04dd11694 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -675,3 +675,42 @@ def test_optimize_no_new_data(tmp_path: Path): assert dataset.to_table(filter="btree IS NULL").num_rows == 2 assert dataset.to_table(filter="bitmap IS NULL").num_rows == 2 + + +def test_drop_index(tmp_path): + test_table_size = 100 + test_table = pa.table( + { + "btree": list(range(test_table_size)), + "bitmap": list(range(test_table_size)), + "fts": ["a" for _ in range(test_table_size)], + } + ) + ds = lance.write_dataset(test_table, tmp_path) + ds.create_scalar_index("btree", index_type="BTREE") + ds.create_scalar_index("bitmap", index_type="BITMAP") + ds.create_scalar_index("fts", index_type="INVERTED") + + assert len(ds.list_indices()) == 3 + + test_idx = ds.list_indices()[0] + test_col = test_idx["fields"][0] + test_name = test_idx["name"] + # Attempt to drop index (col does not exist) + with pytest.raises(RuntimeError, match="nonexistent_col does not exist"): + ds.drop_index("nonexistent_col", test_name) + # Attempt to drop index (name does not exist) + with pytest.raises(RuntimeError, match="index not found"): + ds.drop_index(test_col, "nonexistent_name") + + for idx in ds.list_indices(): + idx_name = idx["name"] + col_name = idx["fields"][0] + ds.drop_index(col_name, idx_name) + + assert len(ds.list_indices()) == 0 + + # Ensure we can still search columns + assert ds.to_table(filter="btree = 1").num_rows == 1 + assert ds.to_table(filter="bitmap = 1").num_rows == 1 + assert ds.to_table(filter="fts = 'a'").num_rows == test_table_size diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 6b3fc51368..fec0b1e8e3 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -1105,3 +1105,27 @@ def test_optimize_indices(indexed_dataset): indexed_dataset.optimize.optimize_indices(num_indices_to_merge=0) indices = indexed_dataset.list_indices() assert len(indices) == 2 + + +def test_drop_indices(indexed_dataset): + idx_name = indexed_dataset.list_indices()[0]["name"] + + indexed_dataset.drop_index("vector", idx_name) + indices = indexed_dataset.list_indices() + assert len(indices) == 0 + + test_vec = ( + indexed_dataset.take([0], columns=["vector"]).column("vector").to_pylist()[0] + ) + + # make sure we can still search the column (will do flat search) + results = indexed_dataset.to_table( + nearest={ + "column": "vector", + "q": test_vec, + "k": 15, + "nprobes": 1, + }, + ) + + assert len(results) == 15 diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 4072978436..b83b939166 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1266,6 +1266,15 @@ impl Dataset { Ok(()) } + fn drop_index(&mut self, column: &str, name: &str) -> PyResult<()> { + let mut new_self = self.ds.as_ref().clone(); + RT.block_on(None, new_self.drop_index(column, name))? + .infer_error()?; + self.ds = Arc::new(new_self); + + Ok(()) + } + fn count_fragments(&self) -> usize { self.ds.count_fragments() } diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs index 5db6d188ba..e87669998b 100644 --- a/rust/lance-index/src/traits.rs +++ b/rust/lance-index/src/traits.rs @@ -34,6 +34,16 @@ pub trait DatasetIndexExt { replace: bool, ) -> Result<()>; + /// Drop indices by name. + /// + /// Upon finish, a new dataset version is generated. + /// + /// Parameters: + /// + /// - `column`: the column to drop the index on. + /// - `name`: the name of the index to drop. + async fn drop_index(&mut self, column: &str, name: &str) -> Result<()>; + /// Read all indices of this Dataset version. /// /// The indices are lazy loaded and cached in memory within the [`Dataset`] instance. diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 589bf2db84..8045f2c798 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -347,6 +347,63 @@ impl DatasetIndexExt for Dataset { Ok(()) } + async fn drop_index(&mut self, column: &str, name: &str) -> Result<()> { + let indices = self.load_indices_by_name(name).await?; + if indices.is_empty() { + return Err(Error::IndexNotFound { + identity: format!("name={}", name), + location: location!(), + }); + } + + let field = self.schema().field(column).ok_or_else(|| Error::Index { + message: format!("Column {} does not exist in the schema", column), + location: location!(), + })?; + + let indices = indices + .iter() + .filter(|idx| idx.fields.len() == 1 && idx.fields[0] == field.id) + .cloned() + .collect::>(); + + if indices.is_empty() { + return Err(Error::IndexNotFound { + identity: format!( + "An index with the name {} was found but it was not on the column {}", + name, column + ), + location: location!(), + }); + } + + let transaction = Transaction::new( + self.manifest.version, + Operation::CreateIndex { + new_indices: vec![], + removed_indices: indices.clone(), + }, + /*blobs_op= */ None, + None, + ); + + let (new_manifest, manifest_path) = commit_transaction( + self, + self.object_store(), + self.commit_handler.as_ref(), + &transaction, + &Default::default(), + &Default::default(), + self.manifest_naming_scheme, + ) + .await?; + + self.manifest = Arc::new(new_manifest); + self.manifest_file = manifest_path; + + Ok(()) + } + async fn load_indices(&self) -> Result>> { let dataset_dir = self.base.to_string(); if let Some(indices) = self @@ -909,6 +966,7 @@ impl DatasetIndexInternalExt for Dataset { #[cfg(test)] mod tests { use crate::dataset::builder::DatasetBuilder; + use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; use super::*; @@ -984,19 +1042,79 @@ mod tests { .is_err()); } - #[tokio::test] - async fn test_count_index_rows() { - let test_dir = tempdir().unwrap(); + fn sample_vector_field() -> Field { let dimensions = 16; let column_name = "vec"; - let field = Field::new( + Field::new( column_name, DataType::FixedSizeList( Arc::new(Field::new("item", DataType::Float32, true)), dimensions, ), false, - ); + ) + } + + #[tokio::test] + async fn test_drop_index() { + let test_dir = tempdir().unwrap(); + let schema = Schema::new(vec![ + sample_vector_field(), + Field::new("ints", DataType::Int32, false), + ]); + let mut dataset = lance_datagen::rand(&schema) + .into_dataset( + test_dir.path().to_str().unwrap(), + FragmentCount::from(1), + FragmentRowCount::from(256), + ) + .await + .unwrap(); + + let idx_name = "name".to_string(); + dataset + .create_index( + &["vec"], + IndexType::Vector, + Some(idx_name.clone()), + &VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10), + true, + ) + .await + .unwrap(); + dataset + .create_index( + &["ints"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + assert_eq!(dataset.load_indices().await.unwrap().len(), 2); + + dataset.drop_index("vec", &idx_name).await.unwrap(); + + assert_eq!(dataset.load_indices().await.unwrap().len(), 1); + + // Even though we didn't give the scalar index a name it still has an auto-generated one we can use + let scalar_idx_name = dataset.load_indices().await.unwrap()[0].name.clone(); + dataset.drop_index("ints", &scalar_idx_name).await.unwrap(); + + assert_eq!(dataset.load_indices().await.unwrap().len(), 0); + + // Make sure it returns an error if the index doesn't exist + assert!(dataset.drop_index("ints", &scalar_idx_name).await.is_err()); + } + + #[tokio::test] + async fn test_count_index_rows() { + let test_dir = tempdir().unwrap(); + let dimensions = 16; + let column_name = "vec"; + let field = sample_vector_field(); let schema = Arc::new(Schema::new(vec![field])); let float_arr = generate_random_array(512 * dimensions as usize); From 0ef9b92919d04cd38a8d8d502633f99a68eb3725 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 15 Jan 2025 04:40:34 -0800 Subject: [PATCH 2/2] Change API to only require index name when dropping index --- python/python/lance/dataset.py | 11 +++++---- python/python/lance/lance/__init__.pyi | 2 +- python/python/tests/test_scalar_index.py | 11 ++------- python/python/tests/test_vector_index.py | 2 +- python/src/dataset.rs | 4 +-- rust/lance-index/src/traits.rs | 3 +-- rust/lance/src/index.rs | 31 ++++-------------------- 7 files changed, 18 insertions(+), 46 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index e3d9aa77c6..af485caaca 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2222,15 +2222,16 @@ def create_index( ) return self - def drop_index(self, column: str, name: str): + def drop_index(self, name: str): """ Drops an index from the dataset - Note: A name is required, even if one was not provided when the index was - created. You can use the `list_indices` method to get the names of the - indices. + Note: Indices are dropped by "index name". This is not the same as the field + name. If you did not specify a name when you created the index then a name was + generated for you. You can use the `list_indices` method to get the names of + the indices. """ - return self._ds.drop_index(column, name) + return self._ds.drop_index(name) def session(self) -> Session: """ diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index c83d97bded..b894dec238 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -265,7 +265,7 @@ class _Dataset: storage_options: Optional[Dict[str, str]] = None, kwargs: Optional[Dict[str, Any]] = None, ): ... - def drop_index(self, column: str, name: str): ... + def drop_index(self, name: str): ... def count_fragments(self) -> int: ... def num_small_files(self, max_rows_per_group: int) -> int: ... def get_fragments(self) -> List[_Fragment]: ... diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index e04dd11694..5dca65cf82 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -693,20 +693,13 @@ def test_drop_index(tmp_path): assert len(ds.list_indices()) == 3 - test_idx = ds.list_indices()[0] - test_col = test_idx["fields"][0] - test_name = test_idx["name"] - # Attempt to drop index (col does not exist) - with pytest.raises(RuntimeError, match="nonexistent_col does not exist"): - ds.drop_index("nonexistent_col", test_name) # Attempt to drop index (name does not exist) with pytest.raises(RuntimeError, match="index not found"): - ds.drop_index(test_col, "nonexistent_name") + ds.drop_index("nonexistent_name") for idx in ds.list_indices(): idx_name = idx["name"] - col_name = idx["fields"][0] - ds.drop_index(col_name, idx_name) + ds.drop_index(idx_name) assert len(ds.list_indices()) == 0 diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index fec0b1e8e3..742dec4338 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -1110,7 +1110,7 @@ def test_optimize_indices(indexed_dataset): def test_drop_indices(indexed_dataset): idx_name = indexed_dataset.list_indices()[0]["name"] - indexed_dataset.drop_index("vector", idx_name) + indexed_dataset.drop_index(idx_name) indices = indexed_dataset.list_indices() assert len(indices) == 0 diff --git a/python/src/dataset.rs b/python/src/dataset.rs index b83b939166..80475bfd82 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1266,9 +1266,9 @@ impl Dataset { Ok(()) } - fn drop_index(&mut self, column: &str, name: &str) -> PyResult<()> { + fn drop_index(&mut self, name: &str) -> PyResult<()> { let mut new_self = self.ds.as_ref().clone(); - RT.block_on(None, new_self.drop_index(column, name))? + RT.block_on(None, new_self.drop_index(name))? .infer_error()?; self.ds = Arc::new(new_self); diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs index e87669998b..5477cc45f1 100644 --- a/rust/lance-index/src/traits.rs +++ b/rust/lance-index/src/traits.rs @@ -40,9 +40,8 @@ pub trait DatasetIndexExt { /// /// Parameters: /// - /// - `column`: the column to drop the index on. /// - `name`: the name of the index to drop. - async fn drop_index(&mut self, column: &str, name: &str) -> Result<()>; + async fn drop_index(&mut self, name: &str) -> Result<()>; /// Read all indices of this Dataset version. /// diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 8045f2c798..2e3f9d6939 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -347,7 +347,7 @@ impl DatasetIndexExt for Dataset { Ok(()) } - async fn drop_index(&mut self, column: &str, name: &str) -> Result<()> { + async fn drop_index(&mut self, name: &str) -> Result<()> { let indices = self.load_indices_by_name(name).await?; if indices.is_empty() { return Err(Error::IndexNotFound { @@ -356,27 +356,6 @@ impl DatasetIndexExt for Dataset { }); } - let field = self.schema().field(column).ok_or_else(|| Error::Index { - message: format!("Column {} does not exist in the schema", column), - location: location!(), - })?; - - let indices = indices - .iter() - .filter(|idx| idx.fields.len() == 1 && idx.fields[0] == field.id) - .cloned() - .collect::>(); - - if indices.is_empty() { - return Err(Error::IndexNotFound { - identity: format!( - "An index with the name {} was found but it was not on the column {}", - name, column - ), - location: location!(), - }); - } - let transaction = Transaction::new( self.manifest.version, Operation::CreateIndex { @@ -1095,18 +1074,18 @@ mod tests { assert_eq!(dataset.load_indices().await.unwrap().len(), 2); - dataset.drop_index("vec", &idx_name).await.unwrap(); + dataset.drop_index(&idx_name).await.unwrap(); assert_eq!(dataset.load_indices().await.unwrap().len(), 1); // Even though we didn't give the scalar index a name it still has an auto-generated one we can use - let scalar_idx_name = dataset.load_indices().await.unwrap()[0].name.clone(); - dataset.drop_index("ints", &scalar_idx_name).await.unwrap(); + let scalar_idx_name = &dataset.load_indices().await.unwrap()[0].name; + dataset.drop_index(scalar_idx_name).await.unwrap(); assert_eq!(dataset.load_indices().await.unwrap().len(), 0); // Make sure it returns an error if the index doesn't exist - assert!(dataset.drop_index("ints", &scalar_idx_name).await.is_err()); + assert!(dataset.drop_index(scalar_idx_name).await.is_err()); } #[tokio::test]