Skip to content

Commit

Permalink
[Keyword search] Remove node from index on delete
Browse files Browse the repository at this point in the history
Description
---
Fixes #9460

Also moves without changing search_store pre-existing functions

Risks
---
- slow down or fail deletes => unlikely given speed of deletion
- miss some nodes deletion

Deploy
---
core
  • Loading branch information
philipperolet committed Dec 18, 2024
1 parent b34bfcd commit 2dd2380
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 61 deletions.
34 changes: 24 additions & 10 deletions core/bin/core_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1988,6 +1988,7 @@ async fn data_sources_documents_delete(
.delete_document(
state.store.clone(),
state.qdrant_clients.clone(),
state.search_store.clone(),
&document_id,
)
.await
Expand Down Expand Up @@ -2095,6 +2096,7 @@ async fn data_sources_delete(
state.store.clone(),
state.databases_store.clone(),
state.qdrant_clients.clone(),
state.search_store.clone(),
)
.await
{
Expand Down Expand Up @@ -2398,7 +2400,11 @@ async fn tables_delete(
),
Ok(Some(table)) => {
match table
.delete(state.store.clone(), state.databases_store.clone())
.delete(
state.store.clone(),
state.databases_store.clone(),
Some(state.search_store.clone()),
)
.await
{
Err(e) => error_response(
Expand Down Expand Up @@ -3076,15 +3082,23 @@ async fn folders_delete(
Some(e),
)
}
Ok(_) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"success": true,
})),
}),
),
Ok(_) => match state.search_store.delete_node(folder_id).await {
Ok(_) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"success": true,
})),
}),
),
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to delete folder from search index",
Some(e),
),
},
}
}
}
Expand Down
16 changes: 15 additions & 1 deletion core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1714,6 +1714,7 @@ impl DataSource {
&self,
store: Box<dyn Store + Sync + Send>,
qdrant_clients: QdrantClients,
search_store: Box<dyn SearchStore + Sync + Send>,
document_id: &str,
) -> Result<()> {
// Delete the document in the main embedder collection.
Expand All @@ -1738,6 +1739,9 @@ impl DataSource {
.delete_data_source_document(&self.project, &self.data_source_id, document_id)
.await?;

// Delete document from search index.
search_store.delete_node(document_id.to_string()).await?;

// We also scrub it directly. We used to scrub async but now that we store a GCS version
// for each data_source_documents entry we can scrub directly at the time of delete.
self.scrub_document_deleted_versions(store, document_id)
Expand Down Expand Up @@ -1959,6 +1963,7 @@ impl DataSource {
store: Box<dyn Store + Sync + Send>,
databases_store: Box<dyn DatabasesStore + Sync + Send>,
qdrant_clients: QdrantClients,
search_store: Box<dyn SearchStore + Sync + Send>,
) -> Result<()> {
if self.shadow_write_qdrant_cluster().is_some() {
Err(anyhow!(
Expand Down Expand Up @@ -1991,7 +1996,9 @@ impl DataSource {
try_join_all(
tables
.iter()
.map(|t| t.delete(store.clone(), databases_store.clone())),
// not deleting from search index here, as it's done more efficiently in the
// full-nodes deletion below
.map(|t| t.delete(store.clone(), databases_store.clone(), None)),
)
.await?;

Expand All @@ -2001,9 +2008,11 @@ impl DataSource {
"Deleted tables"
);

// Delete folders (concurrently).
let (folders, total) = store
.list_data_source_folders(&self.project, &self.data_source_id, &None, &None, None)
.await?;

try_join_all(folders.iter().map(|f| {
store.delete_data_source_folder(&self.project, &self.data_source_id, &f.folder_id())
}))
Expand All @@ -2015,6 +2024,11 @@ impl DataSource {
"Deleted folders"
);

// Delete all nodes from the search index
search_store
.delete_data_source_nodes(&self.data_source_id)
.await?;

// Delete data source and documents (SQL).
let deleted_rows = store
.delete_data_source(&self.project, &self.data_source_id)
Expand Down
10 changes: 10 additions & 0 deletions core/src/databases/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
databases_store::store::DatabasesStore,
project::Project,
search_filter::{Filterable, SearchFilter},
search_stores::search_store::SearchStore,
sqlite_workers::client::HEARTBEAT_INTERVAL_MS,
stores::store::Store,
utils,
Expand Down Expand Up @@ -170,10 +171,12 @@ impl Table {
self.schema = Some(schema);
}

// if search_store is provided, delete the table node from the search index
pub async fn delete(
&self,
store: Box<dyn Store + Sync + Send>,
databases_store: Box<dyn DatabasesStore + Sync + Send>,
search_store: Option<Box<dyn SearchStore + Sync + Send>>,
) -> Result<()> {
if self.table_type()? == TableType::Local {
// Invalidate the databases that use the table.
Expand Down Expand Up @@ -205,6 +208,13 @@ impl Table {
.delete_data_source_table(&self.project, &self.data_source_id, &self.table_id)
.await?;

// Delete the table node from the search index.
if let Some(search_store) = search_store {
search_store
.delete_node(self.table_id().to_string())
.await?;
}

Ok(())
}

Expand Down
126 changes: 76 additions & 50 deletions core/src/search_stores/search_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use elasticsearch::{
auth::Credentials,
http::transport::{SingleNodeConnectionPool, TransportBuilder},
Elasticsearch, IndexParts, SearchParts,
DeleteByQueryParts, DeleteParts, Elasticsearch, IndexParts, SearchParts,
};
use serde_json::json;
use url::Url;
Expand Down Expand Up @@ -35,10 +35,15 @@ pub trait SearchStore {
filter: Vec<DatasourceViewFilter>,
options: Option<NodesSearchOptions>,
) -> Result<Vec<Node>>;

async fn index_node(&self, node: Node) -> Result<()>;
async fn index_document(&self, document: Document) -> Result<()>;
async fn index_table(&self, table: Table) -> Result<()>;
async fn index_folder(&self, folder: Folder) -> Result<()>;
async fn index_node(&self, node: Node) -> Result<()>;

async fn delete_node(&self, node_id: String) -> Result<()>;
async fn delete_data_source_nodes(&self, data_source_id: &str) -> Result<()>;

fn clone_box(&self) -> Box<dyn SearchStore + Sync + Send>;
}

Expand Down Expand Up @@ -82,54 +87,6 @@ const NODES_INDEX_NAME: &str = "core.data_sources_nodes";

#[async_trait]
impl SearchStore for ElasticsearchSearchStore {
async fn index_document(&self, document: Document) -> Result<()> {
let node = Node::from(document);
self.index_node(node).await
}

async fn index_table(&self, table: Table) -> Result<()> {
let node = Node::from(table);
self.index_node(node).await
}

async fn index_folder(&self, folder: Folder) -> Result<()> {
let node = Node::from(folder);
self.index_node(node).await
}

async fn index_node(&self, node: Node) -> Result<()> {
// todo(kw-search): fail on error
let now = utils::now();
match self
.client
.index(IndexParts::IndexId(NODES_INDEX_NAME, &node.node_id))
.timeout("200ms")
.body(node.clone())
.send()
.await
{
Ok(_) => {
info!(
duration = utils::now() - now,
node_id = node.node_id,
"[ElasticsearchSearchStore] Indexed {}",
node.node_type.to_string()
);
Ok(())
}
Err(e) => {
error!(
error = %e,
duration = utils::now() - now,
node_id = node.node_id,
"[ElasticsearchSearchStore] Failed to index {}",
node.node_type.to_string()
);
Ok(())
}
}
}

async fn search_nodes(
&self,
query: String,
Expand Down Expand Up @@ -190,6 +147,75 @@ impl SearchStore for ElasticsearchSearchStore {
}
}

async fn index_node(&self, node: Node) -> Result<()> {
// todo(kw-search): fail on error
let now = utils::now();
match self
.client
.index(IndexParts::IndexId(NODES_INDEX_NAME, &node.node_id))
.timeout("200ms")
.body(node.clone())
.send()
.await
{
Ok(_) => {
info!(
duration = utils::now() - now,
node_id = node.node_id,
"[ElasticsearchSearchStore] Indexed {}",
node.node_type.to_string()
);
Ok(())
}
Err(e) => {
error!(
error = %e,
duration = utils::now() - now,
node_id = node.node_id,
"[ElasticsearchSearchStore] Failed to index {}",
node.node_type.to_string()
);
Ok(())
}
}
}

async fn index_document(&self, document: Document) -> Result<()> {
let node = Node::from(document);
self.index_node(node).await
}

async fn index_table(&self, table: Table) -> Result<()> {
let node = Node::from(table);
self.index_node(node).await
}

async fn index_folder(&self, folder: Folder) -> Result<()> {
let node = Node::from(folder);
self.index_node(node).await
}

async fn delete_node(&self, node_id: String) -> Result<()> {
self.client
.delete(DeleteParts::IndexId(NODES_INDEX_NAME, &node_id))
.send()
.await?;
Ok(())
}

async fn delete_data_source_nodes(&self, data_source_id: &str) -> Result<()> {
self.client
.delete_by_query(DeleteByQueryParts::Index(&[NODES_INDEX_NAME]))
.body(json!({
"query": {
"term": { "data_source_id": data_source_id }
}
}))
.send()
.await?;
Ok(())
}

fn clone_box(&self) -> Box<dyn SearchStore + Sync + Send> {
Box::new(self.clone())
}
Expand Down

0 comments on commit 2dd2380

Please sign in to comment.