From d984b86890ae5f5c4e974d7d3aa39c50583c0848 Mon Sep 17 00:00:00 2001 From: Philippe Rolet Date: Sun, 22 Dec 2024 18:57:49 +0100 Subject: [PATCH] [Keyword search] Updates search index on /parents call (#9559) * update parents for table & documents calls * updates parents for folders for microsoft * if document does not exist, do not index * clean --- .../microsoft/temporal/activities.ts | 19 ++++++++++++++++--- core/bin/core_api.rs | 7 ++++++- core/src/data_sources/data_source.rs | 17 +++++++++++++++++ core/src/databases/table.rs | 3 +++ core/src/search_stores/search_store.rs | 2 ++ 5 files changed, 44 insertions(+), 4 deletions(-) diff --git a/connectors/src/connectors/microsoft/temporal/activities.ts b/connectors/src/connectors/microsoft/temporal/activities.ts index d95c7b802624..2557cae78110 100644 --- a/connectors/src/connectors/microsoft/temporal/activities.ts +++ b/connectors/src/connectors/microsoft/temporal/activities.ts @@ -670,7 +670,7 @@ export async function syncDeltaForRootNodesInDrive({ }); if (isMoved) { - await updateDescendantsParentsInQdrant({ + await updateDescendantsParentsInCore({ dataSourceConfig, folder: resource, startSyncTs, @@ -844,7 +844,7 @@ async function isFolderMovedInSameRoot({ return oldParentId !== newParentId; } -async function updateDescendantsParentsInQdrant({ +async function updateDescendantsParentsInCore({ folder, dataSourceConfig, startSyncTs, @@ -856,6 +856,19 @@ async function updateDescendantsParentsInQdrant({ const children = await folder.fetchChildren(); const files = children.filter((child) => child.nodeType === "file"); const folders = children.filter((child) => child.nodeType === "folder"); + + await upsertDataSourceFolder({ + dataSourceConfig, + folderId: folder.internalId, + parents: await getParents({ + connectorId: folder.connectorId, + internalId: folder.internalId, + startSyncTs, + }), + title: folder.name ?? "", + mimeType: "application/vnd.dust.microsoft.folder", + }); + await concurrentExecutor( files, async (file) => updateParentsField({ file, dataSourceConfig, startSyncTs }), @@ -864,7 +877,7 @@ async function updateDescendantsParentsInQdrant({ } ); for (const childFolder of folders) { - await updateDescendantsParentsInQdrant({ + await updateDescendantsParentsInCore({ dataSourceConfig, folder: childFolder, startSyncTs, diff --git a/core/bin/core_api.rs b/core/bin/core_api.rs index e40a5f562fb5..0e920b2ea6b4 100644 --- a/core/bin/core_api.rs +++ b/core/bin/core_api.rs @@ -1550,6 +1550,7 @@ async fn data_sources_documents_update_parents( .update_parents( state.store.clone(), state.qdrant_clients.clone(), + state.search_store.clone(), document_id, payload.parents, ) @@ -2536,7 +2537,11 @@ async fn tables_update_parents( None, ), Ok(Some(table)) => match table - .update_parents(state.store.clone(), payload.parents.clone()) + .update_parents( + state.store.clone(), + state.search_store.clone(), + payload.parents.clone(), + ) .await { Err(e) => error_response( diff --git a/core/src/data_sources/data_source.rs b/core/src/data_sources/data_source.rs index f1fbf0e992eb..e0581f5729c6 100644 --- a/core/src/data_sources/data_source.rs +++ b/core/src/data_sources/data_source.rs @@ -469,6 +469,7 @@ impl DataSource { &self, store: Box, qdrant_clients: QdrantClients, + search_store: Box, document_id: String, parents: Vec, ) -> Result<()> { @@ -485,6 +486,22 @@ impl DataSource { self.update_document_payload(qdrant_clients, document_id_hash, "parents", parents) .await?; + + let document = store + .load_data_source_document( + &self.project, + &self.data_source_id(), + &document_id.to_string(), + &None, + ) + .await?; + + match document { + Some(document) => { + search_store.index_node(Node::from(document)).await?; + } + None => (), + } Ok(()) } diff --git a/core/src/databases/table.rs b/core/src/databases/table.rs index f0694e3c7cd1..450164244b3b 100644 --- a/core/src/databases/table.rs +++ b/core/src/databases/table.rs @@ -222,6 +222,7 @@ impl Table { pub async fn update_parents( &self, store: Box, + search_store: Box, parents: Vec, ) -> Result<()> { store @@ -232,6 +233,8 @@ impl Table { &parents, ) .await?; + + search_store.index_node(Node::from(self.clone())).await?; Ok(()) } } diff --git a/core/src/search_stores/search_store.rs b/core/src/search_stores/search_store.rs index abac1c8295c7..b0ac5b24bf52 100644 --- a/core/src/search_stores/search_store.rs +++ b/core/src/search_stores/search_store.rs @@ -146,6 +146,8 @@ impl SearchStore for ElasticsearchSearchStore { async fn index_node(&self, node: Node) -> Result<()> { // todo(kw-search): fail on error let now = utils::now(); + // Note: in elasticsearch, the index API updates the document if it + // already exists. let response = self .client .index(IndexParts::IndexId(NODES_INDEX_NAME, &node.unique_id()))