Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add drop_index #3382

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ message Transaction {

// Add or replace a new secondary index.
//
// - new_indices: the modified indices
// This is also used to remove an index (we are replacing it with nothing)
//
// - new_indices: the modified indices, empty if dropping indices only
// - removed_indices: the indices that are being replaced
message CreateIndex {
repeated IndexMetadata new_indices = 1;
Expand Down
10 changes: 10 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2222,6 +2222,16 @@ def create_index(
)
return self

def drop_index(self, column: str, name: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is column required? or solely name is enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I didn't realize index names were unique but after some testing it appears this is the case. So the name should be all we need 🎉

I'll fix the PR.

"""
Drops an index from the dataset

Note: A name is required, even if one was not provided when the index was
created. You can use the `list_indices` method to get the names of the
indices.
"""
return self._ds.drop_index(column, name)

def session(self) -> Session:
"""
Return the dataset session, which holds the dataset's state.
Expand Down
1 change: 1 addition & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ class _Dataset:
storage_options: Optional[Dict[str, str]] = None,
kwargs: Optional[Dict[str, Any]] = None,
): ...
def drop_index(self, column: str, name: str): ...
def count_fragments(self) -> int: ...
def num_small_files(self, max_rows_per_group: int) -> int: ...
def get_fragments(self) -> List[_Fragment]: ...
Expand Down
39 changes: 39 additions & 0 deletions python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,3 +675,42 @@ def test_optimize_no_new_data(tmp_path: Path):

assert dataset.to_table(filter="btree IS NULL").num_rows == 2
assert dataset.to_table(filter="bitmap IS NULL").num_rows == 2


def test_drop_index(tmp_path):
test_table_size = 100
test_table = pa.table(
{
"btree": list(range(test_table_size)),
"bitmap": list(range(test_table_size)),
"fts": ["a" for _ in range(test_table_size)],
}
)
ds = lance.write_dataset(test_table, tmp_path)
ds.create_scalar_index("btree", index_type="BTREE")
ds.create_scalar_index("bitmap", index_type="BITMAP")
ds.create_scalar_index("fts", index_type="INVERTED")

assert len(ds.list_indices()) == 3

test_idx = ds.list_indices()[0]
test_col = test_idx["fields"][0]
test_name = test_idx["name"]
# Attempt to drop index (col does not exist)
with pytest.raises(RuntimeError, match="nonexistent_col does not exist"):
ds.drop_index("nonexistent_col", test_name)
# Attempt to drop index (name does not exist)
with pytest.raises(RuntimeError, match="index not found"):
ds.drop_index(test_col, "nonexistent_name")

for idx in ds.list_indices():
idx_name = idx["name"]
col_name = idx["fields"][0]
ds.drop_index(col_name, idx_name)

assert len(ds.list_indices()) == 0

# Ensure we can still search columns
assert ds.to_table(filter="btree = 1").num_rows == 1
assert ds.to_table(filter="bitmap = 1").num_rows == 1
assert ds.to_table(filter="fts = 'a'").num_rows == test_table_size
24 changes: 24 additions & 0 deletions python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,3 +1105,27 @@ def test_optimize_indices(indexed_dataset):
indexed_dataset.optimize.optimize_indices(num_indices_to_merge=0)
indices = indexed_dataset.list_indices()
assert len(indices) == 2


def test_drop_indices(indexed_dataset):
idx_name = indexed_dataset.list_indices()[0]["name"]

indexed_dataset.drop_index("vector", idx_name)
indices = indexed_dataset.list_indices()
assert len(indices) == 0

test_vec = (
indexed_dataset.take([0], columns=["vector"]).column("vector").to_pylist()[0]
)

# make sure we can still search the column (will do flat search)
results = indexed_dataset.to_table(
nearest={
"column": "vector",
"q": test_vec,
"k": 15,
"nprobes": 1,
},
)

assert len(results) == 15
9 changes: 9 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,15 @@ impl Dataset {
Ok(())
}

fn drop_index(&mut self, column: &str, name: &str) -> PyResult<()> {
let mut new_self = self.ds.as_ref().clone();
RT.block_on(None, new_self.drop_index(column, name))?
.infer_error()?;
self.ds = Arc::new(new_self);

Ok(())
}

fn count_fragments(&self) -> usize {
self.ds.count_fragments()
}
Expand Down
10 changes: 10 additions & 0 deletions rust/lance-index/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ pub trait DatasetIndexExt {
replace: bool,
) -> Result<()>;

/// Drop indices by name.
///
/// Upon finish, a new dataset version is generated.
///
/// Parameters:
///
/// - `column`: the column to drop the index on.
/// - `name`: the name of the index to drop.
async fn drop_index(&mut self, column: &str, name: &str) -> Result<()>;

/// Read all indices of this Dataset version.
///
/// The indices are lazy loaded and cached in memory within the [`Dataset`] instance.
Expand Down
128 changes: 123 additions & 5 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,63 @@ impl DatasetIndexExt for Dataset {
Ok(())
}

async fn drop_index(&mut self, column: &str, name: &str) -> Result<()> {
let indices = self.load_indices_by_name(name).await?;
if indices.is_empty() {
return Err(Error::IndexNotFound {
identity: format!("name={}", name),
location: location!(),
});
}

let field = self.schema().field(column).ok_or_else(|| Error::Index {
message: format!("Column {} does not exist in the schema", column),
location: location!(),
})?;

let indices = indices
.iter()
.filter(|idx| idx.fields.len() == 1 && idx.fields[0] == field.id)
.cloned()
.collect::<Vec<_>>();

if indices.is_empty() {
return Err(Error::IndexNotFound {
identity: format!(
"An index with the name {} was found but it was not on the column {}",
name, column
),
location: location!(),
});
}

let transaction = Transaction::new(
self.manifest.version,
Operation::CreateIndex {
new_indices: vec![],
removed_indices: indices.clone(),
},
/*blobs_op= */ None,
None,
);

let (new_manifest, manifest_path) = commit_transaction(
self,
self.object_store(),
self.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
self.manifest_naming_scheme,
)
.await?;

self.manifest = Arc::new(new_manifest);
self.manifest_file = manifest_path;

Ok(())
}

async fn load_indices(&self) -> Result<Arc<Vec<IndexMetadata>>> {
let dataset_dir = self.base.to_string();
if let Some(indices) = self
Expand Down Expand Up @@ -909,6 +966,7 @@ impl DatasetIndexInternalExt for Dataset {
#[cfg(test)]
mod tests {
use crate::dataset::builder::DatasetBuilder;
use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount};

use super::*;

Expand Down Expand Up @@ -984,19 +1042,79 @@ mod tests {
.is_err());
}

#[tokio::test]
async fn test_count_index_rows() {
let test_dir = tempdir().unwrap();
fn sample_vector_field() -> Field {
let dimensions = 16;
let column_name = "vec";
let field = Field::new(
Field::new(
column_name,
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dimensions,
),
false,
);
)
}

#[tokio::test]
async fn test_drop_index() {
let test_dir = tempdir().unwrap();
let schema = Schema::new(vec![
sample_vector_field(),
Field::new("ints", DataType::Int32, false),
]);
let mut dataset = lance_datagen::rand(&schema)
.into_dataset(
test_dir.path().to_str().unwrap(),
FragmentCount::from(1),
FragmentRowCount::from(256),
)
.await
.unwrap();

let idx_name = "name".to_string();
dataset
.create_index(
&["vec"],
IndexType::Vector,
Some(idx_name.clone()),
&VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 10),
true,
)
.await
.unwrap();
dataset
.create_index(
&["ints"],
IndexType::BTree,
None,
&ScalarIndexParams::default(),
true,
)
.await
.unwrap();

assert_eq!(dataset.load_indices().await.unwrap().len(), 2);

dataset.drop_index("vec", &idx_name).await.unwrap();

assert_eq!(dataset.load_indices().await.unwrap().len(), 1);

// Even though we didn't give the scalar index a name it still has an auto-generated one we can use
let scalar_idx_name = dataset.load_indices().await.unwrap()[0].name.clone();
dataset.drop_index("ints", &scalar_idx_name).await.unwrap();

assert_eq!(dataset.load_indices().await.unwrap().len(), 0);

// Make sure it returns an error if the index doesn't exist
assert!(dataset.drop_index("ints", &scalar_idx_name).await.is_err());
}

#[tokio::test]
async fn test_count_index_rows() {
let test_dir = tempdir().unwrap();
let dimensions = 16;
let column_name = "vec";
let field = sample_vector_field();
let schema = Arc::new(Schema::new(vec![field]));

let float_arr = generate_random_array(512 * dimensions as usize);
Expand Down
Loading