Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add drop_index #3382

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ message Transaction {

// Add or replace a new secondary index.
//
// - new_indices: the modified indices
// This is also used to remove an index (we are replacing it with nothing)
//
// - new_indices: the modified indices, empty if dropping indices only
// - removed_indices: the indices that are being replaced
message CreateIndex {
repeated IndexMetadata new_indices = 1;
Expand Down
11 changes: 11 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2222,6 +2222,17 @@ def create_index(
)
return self

def drop_index(self, name: str):
"""
Drops an index from the dataset

Note: Indices are dropped by "index name". This is not the same as the field
name. If you did not specify a name when you created the index then a name was
generated for you. You can use the `list_indices` method to get the names of
the indices.
"""
return self._ds.drop_index(name)

def session(self) -> Session:
"""
Return the dataset session, which holds the dataset's state.
Expand Down
1 change: 1 addition & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ class _Dataset:
storage_options: Optional[Dict[str, str]] = None,
kwargs: Optional[Dict[str, Any]] = None,
): ...
def drop_index(self, name: str): ...
def count_fragments(self) -> int: ...
def num_small_files(self, max_rows_per_group: int) -> int: ...
def get_fragments(self) -> List[_Fragment]: ...
Expand Down
32 changes: 32 additions & 0 deletions python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,3 +675,35 @@ def test_optimize_no_new_data(tmp_path: Path):

assert dataset.to_table(filter="btree IS NULL").num_rows == 2
assert dataset.to_table(filter="bitmap IS NULL").num_rows == 2


def test_drop_index(tmp_path):
test_table_size = 100
test_table = pa.table(
{
"btree": list(range(test_table_size)),
"bitmap": list(range(test_table_size)),
"fts": ["a" for _ in range(test_table_size)],
}
)
ds = lance.write_dataset(test_table, tmp_path)
ds.create_scalar_index("btree", index_type="BTREE")
ds.create_scalar_index("bitmap", index_type="BITMAP")
ds.create_scalar_index("fts", index_type="INVERTED")

assert len(ds.list_indices()) == 3

# Attempt to drop index (name does not exist)
with pytest.raises(RuntimeError, match="index not found"):
ds.drop_index("nonexistent_name")

for idx in ds.list_indices():
idx_name = idx["name"]
ds.drop_index(idx_name)

assert len(ds.list_indices()) == 0

# Ensure we can still search columns
assert ds.to_table(filter="btree = 1").num_rows == 1
assert ds.to_table(filter="bitmap = 1").num_rows == 1
assert ds.to_table(filter="fts = 'a'").num_rows == test_table_size
24 changes: 24 additions & 0 deletions python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,3 +1105,27 @@ def test_optimize_indices(indexed_dataset):
indexed_dataset.optimize.optimize_indices(num_indices_to_merge=0)
indices = indexed_dataset.list_indices()
assert len(indices) == 2


def test_drop_indices(indexed_dataset):
idx_name = indexed_dataset.list_indices()[0]["name"]

indexed_dataset.drop_index(idx_name)
indices = indexed_dataset.list_indices()
assert len(indices) == 0

test_vec = (
indexed_dataset.take([0], columns=["vector"]).column("vector").to_pylist()[0]
)

# make sure we can still search the column (will do flat search)
results = indexed_dataset.to_table(
nearest={
"column": "vector",
"q": test_vec,
"k": 15,
"nprobes": 1,
},
)

assert len(results) == 15
9 changes: 9 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,15 @@ impl Dataset {
Ok(())
}

fn drop_index(&mut self, name: &str) -> PyResult<()> {
let mut new_self = self.ds.as_ref().clone();
RT.block_on(None, new_self.drop_index(name))?
.infer_error()?;
self.ds = Arc::new(new_self);

Ok(())
}

fn count_fragments(&self) -> usize {
self.ds.count_fragments()
}
Expand Down
9 changes: 9 additions & 0 deletions rust/lance-index/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ pub trait DatasetIndexExt {
replace: bool,
) -> Result<()>;

/// Drop indices by name.
///
/// Upon finish, a new dataset version is generated.
///
/// Parameters:
///
/// - `name`: the name of the index to drop.
async fn drop_index(&mut self, name: &str) -> Result<()>;

/// Read all indices of this Dataset version.
///
/// The indices are lazy loaded and cached in memory within the [`Dataset`] instance.
Expand Down
107 changes: 102 additions & 5 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,42 @@ impl DatasetIndexExt for Dataset {
Ok(())
}

async fn drop_index(&mut self, name: &str) -> Result<()> {
let indices = self.load_indices_by_name(name).await?;
if indices.is_empty() {
return Err(Error::IndexNotFound {
identity: format!("name={}", name),
location: location!(),
});
}

let transaction = Transaction::new(
self.manifest.version,
Operation::CreateIndex {
new_indices: vec![],
removed_indices: indices.clone(),
},
/*blobs_op= */ None,
None,
);

let (new_manifest, manifest_path) = commit_transaction(
self,
self.object_store(),
self.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
self.manifest_naming_scheme,
)
.await?;

self.manifest = Arc::new(new_manifest);
self.manifest_file = manifest_path;

Ok(())
}

async fn load_indices(&self) -> Result<Arc<Vec<IndexMetadata>>> {
let dataset_dir = self.base.to_string();
if let Some(indices) = self
Expand Down Expand Up @@ -909,6 +945,7 @@ impl DatasetIndexInternalExt for Dataset {
#[cfg(test)]
mod tests {
use crate::dataset::builder::DatasetBuilder;
use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount};

use super::*;

Expand Down Expand Up @@ -984,19 +1021,79 @@ mod tests {
.is_err());
}

#[tokio::test]
async fn test_count_index_rows() {
let test_dir = tempdir().unwrap();
fn sample_vector_field() -> Field {
let dimensions = 16;
let column_name = "vec";
let field = Field::new(
Field::new(
column_name,
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dimensions,
),
false,
);
)
}

#[tokio::test]
async fn test_drop_index() {
let test_dir = tempdir().unwrap();
let schema = Schema::new(vec![
sample_vector_field(),
Field::new("ints", DataType::Int32, false),
]);
let mut dataset = lance_datagen::rand(&schema)
.into_dataset(
test_dir.path().to_str().unwrap(),
FragmentCount::from(1),
FragmentRowCount::from(256),
)
.await
.unwrap();

let idx_name = "name".to_string();
dataset
.create_index(
&["vec"],
IndexType::Vector,
Some(idx_name.clone()),
&VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10),
true,
)
.await
.unwrap();
dataset
.create_index(
&["ints"],
IndexType::BTree,
None,
&ScalarIndexParams::default(),
true,
)
.await
.unwrap();

assert_eq!(dataset.load_indices().await.unwrap().len(), 2);

dataset.drop_index(&idx_name).await.unwrap();

assert_eq!(dataset.load_indices().await.unwrap().len(), 1);

// Even though we didn't give the scalar index a name it still has an auto-generated one we can use
let scalar_idx_name = &dataset.load_indices().await.unwrap()[0].name;
dataset.drop_index(scalar_idx_name).await.unwrap();

assert_eq!(dataset.load_indices().await.unwrap().len(), 0);

// Make sure it returns an error if the index doesn't exist
assert!(dataset.drop_index(scalar_idx_name).await.is_err());
}

#[tokio::test]
async fn test_count_index_rows() {
let test_dir = tempdir().unwrap();
let dimensions = 16;
let column_name = "vec";
let field = sample_vector_field();
let schema = Arc::new(Schema::new(vec![field]));

let float_arr = generate_random_array(512 * dimensions as usize);
Expand Down
Loading