From c56431f4cb5ec30e590bc329d4ecc8031aa19336 Mon Sep 17 00:00:00 2001 From: Philippe Rolet Date: Sun, 21 Jul 2024 16:59:57 +0200 Subject: [PATCH] [MS connector] Tweaks on incremental sync (#6357) * fix: parentInternalId should be null by default, and always null for roots * handle 410 require resync error * update parents on folder movement * fix movement detection * lint * clean com --- .../src/connectors/microsoft/lib/graph_api.ts | 16 +- .../microsoft/temporal/activities.ts | 159 ++++++++++++++++-- 2 files changed, 153 insertions(+), 22 deletions(-) diff --git a/connectors/src/connectors/microsoft/lib/graph_api.ts b/connectors/src/connectors/microsoft/lib/graph_api.ts index 720cb988e8779..b44636799c42c 100644 --- a/connectors/src/connectors/microsoft/lib/graph_api.ts +++ b/connectors/src/connectors/microsoft/lib/graph_api.ts @@ -83,6 +83,10 @@ export async function getFilesAndFolders( return { results: res.value }; } +/** + * Get list of items that have changed Calling without nextLink nor token will + * result in initial delta call + */ export async function getDeltaResults({ client, parentInternalId, @@ -92,7 +96,7 @@ export async function getDeltaResults({ client: Client; parentInternalId: string; } & ( - | { nextLink: string; token?: never } + | { nextLink?: string; token?: never } | { nextLink?: never; token: string } )) { const { nodeType, itemAPIPath } = typeAndPathFromInternalId(parentInternalId); @@ -143,7 +147,7 @@ export async function getDeltaResults({ export async function getFullDeltaResults( client: Client, parentInternalId: string, - initialDeltaLink: string + initialDeltaLink?: string ): Promise<{ results: microsoftgraph.DriveItem[]; deltaLink: string }> { let nextLink: string | undefined = initialDeltaLink; let allItems: microsoftgraph.DriveItem[] = []; @@ -359,9 +363,7 @@ export function itemToMicrosoftNode( nodeType, name: item.name ?? null, internalId: getDriveItemInternalId(item), - parentInternalId: item.parentReference - ? getParentReferenceInternalId(item.parentReference) - : null, + parentInternalId: null, mimeType: null, }; } @@ -371,9 +373,7 @@ export function itemToMicrosoftNode( nodeType, name: item.name ?? null, internalId: getDriveItemInternalId(item), - parentInternalId: item.parentReference - ? getParentReferenceInternalId(item.parentReference) - : null, + parentInternalId: null, mimeType: item.file?.mimeType ?? null, }; } diff --git a/connectors/src/connectors/microsoft/temporal/activities.ts b/connectors/src/connectors/microsoft/temporal/activities.ts index f8a7a76733830..8d586610ea715 100644 --- a/connectors/src/connectors/microsoft/temporal/activities.ts +++ b/connectors/src/connectors/microsoft/temporal/activities.ts @@ -1,5 +1,6 @@ import type { ModelId } from "@dust-tt/types"; import type { Client } from "@microsoft/microsoft-graph-client"; +import { GraphError } from "@microsoft/microsoft-graph-client"; import type { DriveItem } from "@microsoft/microsoft-graph-types"; import { heartbeat } from "@temporalio/activity"; @@ -30,6 +31,7 @@ import { import { getMimeTypesToSync } from "@connectors/connectors/microsoft/temporal/mime_types"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; import { concurrentExecutor } from "@connectors/lib/async_utils"; +import { updateDocumentParentsField } from "@connectors/lib/data_sources"; import logger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; import { @@ -37,6 +39,7 @@ import { MicrosoftNodeResource, MicrosoftRootResource, } from "@connectors/resources/microsoft_resource"; +import type { DataSourceConfig } from "@connectors/types/data_source_config"; const FILES_SYNC_CONCURRENCY = 10; @@ -394,12 +397,6 @@ export async function syncDeltaForNode({ const client = await getClient(connector.connectionId); - if (!node.deltaLink) { - throw new Error( - `Delta link not found for root node resource ${JSON.stringify(node.toJSON())}` - ); - } - logger.info({ connectorId, node }, "Syncing delta for node"); // Goes through pagination to return all delta results. This is because delta @@ -413,15 +410,14 @@ export async function syncDeltaForNode({ // // If it ever becomes an issue, redis-caching the list and having activities // grabbing pages of it can be implemented - const { results, deltaLink } = await getFullDeltaResults( + const { results, deltaLink } = await getDeltaData({ client, - nodeId, - node.deltaLink - ); - const uniqueDriveItemList = removeAllButLastOccurences(results); - const sortedDriveItemList = sortForIncrementalUpdate(uniqueDriveItemList); + node, + }); + const uniqueChangedItems = removeAllButLastOccurences(results); + const sortedChangedItems = sortForIncrementalUpdate(uniqueChangedItems); - for (const driveItem of sortedDriveItemList) { + for (const driveItem of sortedChangedItems) { heartbeat(); if (!driveItem.parentReference) { @@ -461,11 +457,35 @@ export async function syncDeltaForNode({ // in the delta with the 'deleted' field set await deleteFolder({ connectorId, internalId }); } else { + const isMoved = await isFolderMovedInSameRoot({ + connectorId, + folder: driveItem, + internalId, + }); const resource = await MicrosoftNodeResource.updateOrCreate( connectorId, itemToMicrosoftNode("folder", driveItem) ); - await resource.update({ lastSeenTs: new Date() }); + + // add parent information to new node resource. for the toplevel folder, + // parent is null + const parentInternalId = + resource.internalId === nodeId + ? null + : getParentReferenceInternalId(driveItem.parentReference); + + await resource.update({ + parentInternalId, + lastSeenTs: new Date(), + }); + + if (isMoved) { + await updateDescendantsParentsInQdrant({ + dataSourceConfig, + folder: resource, + startSyncTs, + }); + } } } else { throw new Error(`Unexpected: driveItem is neither file nor folder`); @@ -590,3 +610,114 @@ function sortForIncrementalUpdate(changedList: DriveItem[]) { return sortedDriveItemList; } + +async function getDeltaData({ + client, + node, +}: { + client: Client; + node: MicrosoftNodeResource; +}) { + if (!node.deltaLink) { + throw new Error( + `Delta link not found for root node resource ${JSON.stringify(node.toJSON())}` + ); + } + + try { + return await getFullDeltaResults(client, node.internalId, node.deltaLink); + } catch (e) { + if (e instanceof GraphError && e.statusCode === 410) { + // API is answering 'resync required' + // we repopulate the delta from scratch + return await getFullDeltaResults(client, node.internalId); + } + throw e; + } +} + +async function isFolderMovedInSameRoot({ + connectorId, + folder, + internalId, +}: { + connectorId: ModelId; + folder: DriveItem; + internalId: string; +}) { + if (!folder.parentReference) { + throw new Error(`Unexpected: parent reference missing: ${folder}`); + } + + const oldResource = await MicrosoftNodeResource.fetchByInternalId( + connectorId, + internalId + ); + + if (!oldResource) { + // the folder was not moved internally since we don't have it + return false; + } + + const oldParentId = oldResource.parentInternalId; + + if (!oldParentId) { + // this means it is a root + return false; + } + + const newParentId = getParentReferenceInternalId(folder.parentReference); + + return oldParentId !== newParentId; +} + +async function updateDescendantsParentsInQdrant({ + folder, + dataSourceConfig, + startSyncTs, +}: { + folder: MicrosoftNodeResource; + dataSourceConfig: DataSourceConfig; + startSyncTs: number; +}) { + const children = await folder.fetchChildren(); + const files = children.filter((child) => child.nodeType === "file"); + const folders = children.filter((child) => child.nodeType === "folder"); + await concurrentExecutor( + files, + async (file) => updateParentsField({ file, dataSourceConfig, startSyncTs }), + { + concurrency: 10, + } + ); + for (const childFolder of folders) { + await updateDescendantsParentsInQdrant({ + dataSourceConfig, + folder: childFolder, + startSyncTs, + }); + } +} + +async function updateParentsField({ + file, + dataSourceConfig, + startSyncTs, +}: { + file: MicrosoftNodeResource; + dataSourceConfig: DataSourceConfig; + startSyncTs: number; +}) { + const parents = await getParents({ + connectorId: file.connectorId, + internalId: file.internalId, + parentInternalId: file.parentInternalId, + startSyncTs, + }); + + await updateDocumentParentsField({ + dataSourceConfig, + documentId: file.internalId, + parents, + }); +}