Skip to content

Commit

Permalink
[Keyword search] Updates search index on /parents call (#9559)
Browse files Browse the repository at this point in the history
* update parents for table & documents calls

* updates parents for folders for microsoft

* if document does not exist, do not index

* clean
  • Loading branch information
philipperolet authored Dec 22, 2024
1 parent 9cdc903 commit d984b86
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 4 deletions.
19 changes: 16 additions & 3 deletions connectors/src/connectors/microsoft/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ export async function syncDeltaForRootNodesInDrive({
});

if (isMoved) {
await updateDescendantsParentsInQdrant({
await updateDescendantsParentsInCore({
dataSourceConfig,
folder: resource,
startSyncTs,
Expand Down Expand Up @@ -844,7 +844,7 @@ async function isFolderMovedInSameRoot({
return oldParentId !== newParentId;
}

async function updateDescendantsParentsInQdrant({
async function updateDescendantsParentsInCore({
folder,
dataSourceConfig,
startSyncTs,
Expand All @@ -856,6 +856,19 @@ async function updateDescendantsParentsInQdrant({
const children = await folder.fetchChildren();
const files = children.filter((child) => child.nodeType === "file");
const folders = children.filter((child) => child.nodeType === "folder");

await upsertDataSourceFolder({
dataSourceConfig,
folderId: folder.internalId,
parents: await getParents({
connectorId: folder.connectorId,
internalId: folder.internalId,
startSyncTs,
}),
title: folder.name ?? "",
mimeType: "application/vnd.dust.microsoft.folder",
});

await concurrentExecutor(
files,
async (file) => updateParentsField({ file, dataSourceConfig, startSyncTs }),
Expand All @@ -864,7 +877,7 @@ async function updateDescendantsParentsInQdrant({
}
);
for (const childFolder of folders) {
await updateDescendantsParentsInQdrant({
await updateDescendantsParentsInCore({
dataSourceConfig,
folder: childFolder,
startSyncTs,
Expand Down
7 changes: 6 additions & 1 deletion core/bin/core_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,7 @@ async fn data_sources_documents_update_parents(
.update_parents(
state.store.clone(),
state.qdrant_clients.clone(),
state.search_store.clone(),
document_id,
payload.parents,
)
Expand Down Expand Up @@ -2536,7 +2537,11 @@ async fn tables_update_parents(
None,
),
Ok(Some(table)) => match table
.update_parents(state.store.clone(), payload.parents.clone())
.update_parents(
state.store.clone(),
state.search_store.clone(),
payload.parents.clone(),
)
.await
{
Err(e) => error_response(
Expand Down
17 changes: 17 additions & 0 deletions core/src/data_sources/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ impl DataSource {
&self,
store: Box<dyn Store + Sync + Send>,
qdrant_clients: QdrantClients,
search_store: Box<dyn SearchStore + Sync + Send>,
document_id: String,
parents: Vec<String>,
) -> Result<()> {
Expand All @@ -485,6 +486,22 @@ impl DataSource {

self.update_document_payload(qdrant_clients, document_id_hash, "parents", parents)
.await?;

let document = store
.load_data_source_document(
&self.project,
&self.data_source_id(),
&document_id.to_string(),
&None,
)
.await?;

match document {
Some(document) => {
search_store.index_node(Node::from(document)).await?;
}
None => (),
}
Ok(())
}

Expand Down
3 changes: 3 additions & 0 deletions core/src/databases/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ impl Table {
pub async fn update_parents(
&self,
store: Box<dyn Store + Sync + Send>,
search_store: Box<dyn SearchStore + Sync + Send>,
parents: Vec<String>,
) -> Result<()> {
store
Expand All @@ -232,6 +233,8 @@ impl Table {
&parents,
)
.await?;

search_store.index_node(Node::from(self.clone())).await?;
Ok(())
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/search_stores/search_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ impl SearchStore for ElasticsearchSearchStore {
async fn index_node(&self, node: Node) -> Result<()> {
// todo(kw-search): fail on error
let now = utils::now();
// Note: in elasticsearch, the index API updates the document if it
// already exists.
let response = self
.client
.index(IndexParts::IndexId(NODES_INDEX_NAME, &node.unique_id()))
Expand Down

0 comments on commit d984b86

Please sign in to comment.