diff --git a/connectors/src/connectors/microsoft/temporal/activities.ts b/connectors/src/connectors/microsoft/temporal/activities.ts index 8d586610ea71..07390c20d5c0 100644 --- a/connectors/src/connectors/microsoft/temporal/activities.ts +++ b/connectors/src/connectors/microsoft/temporal/activities.ts @@ -26,6 +26,7 @@ import { deleteFile, deleteFolder, getParents, + isAlreadySeenItem, syncOneFile, } from "@connectors/connectors/microsoft/temporal/file"; import { getMimeTypesToSync } from "@connectors/connectors/microsoft/temporal/mime_types"; @@ -220,10 +221,7 @@ async function isParentAlreadyInNodes({ return false; } -export async function markNodeAsVisited( - connectorId: ModelId, - internalId: string -) { +export async function markNodeAsSeen(connectorId: ModelId, internalId: string) { const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { throw new Error(`Connector ${connectorId} not found`); @@ -238,7 +236,10 @@ export async function markNodeAsVisited( throw new Error(`Node ${internalId} not found`); } - await node.update({ lastSeenTs: new Date() }); + // if node was updated more recently than this sync, we don't need to mark it + if (node.lastSeenTs && node.lastSeenTs < new Date()) { + await node.update({ lastSeenTs: new Date() }); + } } /** @@ -306,14 +307,14 @@ export async function syncFiles({ pdfEnabled: providerConfig.pdfEnabled || false, connector, }); - const childrenToSync = children.filter( + const filesToSync = children.filter( (item) => item.file?.mimeType && mimeTypesToSync.includes(item.file.mimeType) ); // sync files const results = await concurrentExecutor( - childrenToSync, + filesToSync, async (child) => syncOneFile({ connectorId, @@ -338,21 +339,67 @@ export async function syncFiles({ `[SyncFiles] Successful sync.` ); - const childResources = await MicrosoftNodeResource.batchUpdateOrCreate( + // do not update folders that were already seen + const folderResources = await MicrosoftNodeResource.fetchByInternalIds( connectorId, children .filter((item) => item.folder) - .map( - (item): MicrosoftNode => ({ - ...itemToMicrosoftNode("folder", item), - // add parent information to new node resources - parentInternalId, - }) - ) + .map((item) => getDriveItemInternalId(item)) ); + + // compute in O(n) folders that were already seen + const alreadySeenResourcesById: Record = {}; + folderResources.forEach((f) => (alreadySeenResourcesById[f.internalId] = f)); + children.forEach((c) => { + if (!c.folder) { + return; + } + + const cResource = alreadySeenResourcesById[getDriveItemInternalId(c)]; + + if (!cResource) { + return; + } + + if ( + !isAlreadySeenItem({ + driveItem: c, + driveItemResource: cResource, + startSyncTs, + }) + ) { + delete alreadySeenResourcesById[cResource.internalId]; + } + }); + + const alreadySeenResources = Object.values(alreadySeenResourcesById); + + const createdOrUpdatedResources = + await MicrosoftNodeResource.batchUpdateOrCreate( + connectorId, + children + .filter( + (item) => + item.folder && + // only create/update if resource unseen + !alreadySeenResourcesById[getDriveInternalIdFromItem(item)] + ) + .map( + (item): MicrosoftNode => ({ + ...itemToMicrosoftNode("folder", item), + // add parent information to new node resources + parentInternalId, + }) + ) + ); + return { count, - childNodes: childResources.map((r) => r.internalId), + // still visit children of already seen nodes; an already seen node does not + // mean all its children are already seen too + childNodes: [...createdOrUpdatedResources, ...alreadySeenResources].map( + (r) => r.internalId + ), nextLink: childrenResult.nextLink, }; } @@ -721,3 +768,6 @@ async function updateParentsField({ parents, }); } +function isDriveItemAlreadySeen(arg0: {}): unknown { + throw new Error("Function not implemented."); +} diff --git a/connectors/src/connectors/microsoft/temporal/workflows.ts b/connectors/src/connectors/microsoft/temporal/workflows.ts index d181e3639565..088479817465 100644 --- a/connectors/src/connectors/microsoft/temporal/workflows.ts +++ b/connectors/src/connectors/microsoft/temporal/workflows.ts @@ -9,7 +9,7 @@ import { import type * as activities from "@connectors/connectors/microsoft/temporal/activities"; import type * as sync_status from "@connectors/lib/sync_status"; -const { getSiteNodesToSync, syncFiles, markNodeAsVisited, populateDeltas } = +const { getSiteNodesToSync, syncFiles, markNodeAsSeen, populateDeltas } = proxyActivities({ startToCloseTimeout: "30 minutes", }); @@ -89,7 +89,7 @@ export async function fullSyncSitesWorkflow({ ); } while (nextPageLink); - await markNodeAsVisited(connectorId, nodeId); + await markNodeAsSeen(connectorId, nodeId); if (workflowInfo().historyLength > 4000) { await continueAsNew({