diff --git a/core/bin/core_api.rs b/core/bin/core_api.rs index 71bee00e5aea9..d9573204637d6 100644 --- a/core/bin/core_api.rs +++ b/core/bin/core_api.rs @@ -1988,6 +1988,7 @@ async fn data_sources_documents_delete( .delete_document( state.store.clone(), state.qdrant_clients.clone(), + state.search_store.clone(), &document_id, ) .await @@ -2095,6 +2096,7 @@ async fn data_sources_delete( state.store.clone(), state.databases_store.clone(), state.qdrant_clients.clone(), + state.search_store.clone(), ) .await { @@ -2398,7 +2400,11 @@ async fn tables_delete( ), Ok(Some(table)) => { match table - .delete(state.store.clone(), state.databases_store.clone()) + .delete( + state.store.clone(), + state.databases_store.clone(), + Some(state.search_store.clone()), + ) .await { Err(e) => error_response( @@ -3076,15 +3082,23 @@ async fn folders_delete( Some(e), ) } - Ok(_) => ( - StatusCode::OK, - Json(APIResponse { - error: None, - response: Some(json!({ - "success": true, - })), - }), - ), + Ok(_) => match state.search_store.delete_node(folder_id).await { + Ok(_) => ( + StatusCode::OK, + Json(APIResponse { + error: None, + response: Some(json!({ + "success": true, + })), + }), + ), + Err(e) => error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "internal_server_error", + "Failed to delete folder from search index", + Some(e), + ), + }, } } } diff --git a/core/src/data_sources/data_source.rs b/core/src/data_sources/data_source.rs index 817287fcc23b9..f7085e029bcb6 100644 --- a/core/src/data_sources/data_source.rs +++ b/core/src/data_sources/data_source.rs @@ -1714,6 +1714,7 @@ impl DataSource { &self, store: Box, qdrant_clients: QdrantClients, + search_store: Box, document_id: &str, ) -> Result<()> { // Delete the document in the main embedder collection. @@ -1738,6 +1739,9 @@ impl DataSource { .delete_data_source_document(&self.project, &self.data_source_id, document_id) .await?; + // Delete document from search index. + search_store.delete_node(document_id.to_string()).await?; + // We also scrub it directly. We used to scrub async but now that we store a GCS version // for each data_source_documents entry we can scrub directly at the time of delete. self.scrub_document_deleted_versions(store, document_id) @@ -1959,6 +1963,7 @@ impl DataSource { store: Box, databases_store: Box, qdrant_clients: QdrantClients, + search_store: Box, ) -> Result<()> { if self.shadow_write_qdrant_cluster().is_some() { Err(anyhow!( @@ -1991,7 +1996,9 @@ impl DataSource { try_join_all( tables .iter() - .map(|t| t.delete(store.clone(), databases_store.clone())), + // not deleting from search index here, as it's done more efficiently in the + // full-nodes deletion below + .map(|t| t.delete(store.clone(), databases_store.clone(), None)), ) .await?; @@ -2001,9 +2008,11 @@ impl DataSource { "Deleted tables" ); + // Delete folders (concurrently). let (folders, total) = store .list_data_source_folders(&self.project, &self.data_source_id, &None, &None, None) .await?; + try_join_all(folders.iter().map(|f| { store.delete_data_source_folder(&self.project, &self.data_source_id, &f.folder_id()) })) @@ -2015,6 +2024,11 @@ impl DataSource { "Deleted folders" ); + // Delete all nodes from the search index + search_store + .delete_data_source_nodes(&self.data_source_id) + .await?; + // Delete data source and documents (SQL). let deleted_rows = store .delete_data_source(&self.project, &self.data_source_id) diff --git a/core/src/databases/table.rs b/core/src/databases/table.rs index 91f0cceb6adbb..24a9359fd5d8f 100644 --- a/core/src/databases/table.rs +++ b/core/src/databases/table.rs @@ -12,6 +12,7 @@ use crate::{ databases_store::store::DatabasesStore, project::Project, search_filter::{Filterable, SearchFilter}, + search_stores::search_store::SearchStore, sqlite_workers::client::HEARTBEAT_INTERVAL_MS, stores::store::Store, utils, @@ -170,10 +171,12 @@ impl Table { self.schema = Some(schema); } + // if search_store is provided, delete the table node from the search index pub async fn delete( &self, store: Box, databases_store: Box, + search_store: Option>, ) -> Result<()> { if self.table_type()? == TableType::Local { // Invalidate the databases that use the table. @@ -205,6 +208,13 @@ impl Table { .delete_data_source_table(&self.project, &self.data_source_id, &self.table_id) .await?; + // Delete the table node from the search index. + if let Some(search_store) = search_store { + search_store + .delete_node(self.table_id().to_string()) + .await?; + } + Ok(()) } diff --git a/core/src/search_stores/search_store.rs b/core/src/search_stores/search_store.rs index 27bd47d7f2557..53d2c9baabb83 100644 --- a/core/src/search_stores/search_store.rs +++ b/core/src/search_stores/search_store.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use elasticsearch::{ auth::Credentials, http::transport::{SingleNodeConnectionPool, TransportBuilder}, - Elasticsearch, IndexParts, SearchParts, + DeleteByQueryParts, DeleteParts, Elasticsearch, IndexParts, SearchParts, }; use serde_json::json; use url::Url; @@ -35,10 +35,15 @@ pub trait SearchStore { filter: Vec, options: Option, ) -> Result>; + + async fn index_node(&self, node: Node) -> Result<()>; async fn index_document(&self, document: Document) -> Result<()>; async fn index_table(&self, table: Table) -> Result<()>; async fn index_folder(&self, folder: Folder) -> Result<()>; - async fn index_node(&self, node: Node) -> Result<()>; + + async fn delete_node(&self, node_id: String) -> Result<()>; + async fn delete_data_source_nodes(&self, data_source_id: &str) -> Result<()>; + fn clone_box(&self) -> Box; } @@ -82,54 +87,6 @@ const NODES_INDEX_NAME: &str = "core.data_sources_nodes"; #[async_trait] impl SearchStore for ElasticsearchSearchStore { - async fn index_document(&self, document: Document) -> Result<()> { - let node = Node::from(document); - self.index_node(node).await - } - - async fn index_table(&self, table: Table) -> Result<()> { - let node = Node::from(table); - self.index_node(node).await - } - - async fn index_folder(&self, folder: Folder) -> Result<()> { - let node = Node::from(folder); - self.index_node(node).await - } - - async fn index_node(&self, node: Node) -> Result<()> { - // todo(kw-search): fail on error - let now = utils::now(); - match self - .client - .index(IndexParts::IndexId(NODES_INDEX_NAME, &node.node_id)) - .timeout("200ms") - .body(node.clone()) - .send() - .await - { - Ok(_) => { - info!( - duration = utils::now() - now, - node_id = node.node_id, - "[ElasticsearchSearchStore] Indexed {}", - node.node_type.to_string() - ); - Ok(()) - } - Err(e) => { - error!( - error = %e, - duration = utils::now() - now, - node_id = node.node_id, - "[ElasticsearchSearchStore] Failed to index {}", - node.node_type.to_string() - ); - Ok(()) - } - } - } - async fn search_nodes( &self, query: String, @@ -190,6 +147,75 @@ impl SearchStore for ElasticsearchSearchStore { } } + async fn index_node(&self, node: Node) -> Result<()> { + // todo(kw-search): fail on error + let now = utils::now(); + match self + .client + .index(IndexParts::IndexId(NODES_INDEX_NAME, &node.node_id)) + .timeout("200ms") + .body(node.clone()) + .send() + .await + { + Ok(_) => { + info!( + duration = utils::now() - now, + node_id = node.node_id, + "[ElasticsearchSearchStore] Indexed {}", + node.node_type.to_string() + ); + Ok(()) + } + Err(e) => { + error!( + error = %e, + duration = utils::now() - now, + node_id = node.node_id, + "[ElasticsearchSearchStore] Failed to index {}", + node.node_type.to_string() + ); + Ok(()) + } + } + } + + async fn index_document(&self, document: Document) -> Result<()> { + let node = Node::from(document); + self.index_node(node).await + } + + async fn index_table(&self, table: Table) -> Result<()> { + let node = Node::from(table); + self.index_node(node).await + } + + async fn index_folder(&self, folder: Folder) -> Result<()> { + let node = Node::from(folder); + self.index_node(node).await + } + + async fn delete_node(&self, node_id: String) -> Result<()> { + self.client + .delete(DeleteParts::IndexId(NODES_INDEX_NAME, &node_id)) + .send() + .await?; + Ok(()) + } + + async fn delete_data_source_nodes(&self, data_source_id: &str) -> Result<()> { + self.client + .delete_by_query(DeleteByQueryParts::Index(&[NODES_INDEX_NAME])) + .body(json!({ + "query": { + "term": { "data_source_id": data_source_id } + } + })) + .send() + .await?; + Ok(()) + } + fn clone_box(&self) -> Box { Box::new(self.clone()) }