Skip to content

Commit

Permalink
[MS connector] Tweaks on incremental sync (#6357)
Browse files Browse the repository at this point in the history
* fix: parentInternalId should be null by default, and always null for roots

* handle 410 require resync error

* update parents on folder movement

* fix movement detection

* lint

* clean com
  • Loading branch information
philipperolet authored and albandum committed Aug 28, 2024
1 parent 91125fb commit c56431f
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 22 deletions.
16 changes: 8 additions & 8 deletions connectors/src/connectors/microsoft/lib/graph_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ export async function getFilesAndFolders(
return { results: res.value };
}

/**
* Get list of items that have changed Calling without nextLink nor token will
* result in initial delta call
*/
export async function getDeltaResults({
client,
parentInternalId,
Expand All @@ -92,7 +96,7 @@ export async function getDeltaResults({
client: Client;
parentInternalId: string;
} & (
| { nextLink: string; token?: never }
| { nextLink?: string; token?: never }
| { nextLink?: never; token: string }
)) {
const { nodeType, itemAPIPath } = typeAndPathFromInternalId(parentInternalId);
Expand Down Expand Up @@ -143,7 +147,7 @@ export async function getDeltaResults({
export async function getFullDeltaResults(
client: Client,
parentInternalId: string,
initialDeltaLink: string
initialDeltaLink?: string
): Promise<{ results: microsoftgraph.DriveItem[]; deltaLink: string }> {
let nextLink: string | undefined = initialDeltaLink;
let allItems: microsoftgraph.DriveItem[] = [];
Expand Down Expand Up @@ -359,9 +363,7 @@ export function itemToMicrosoftNode<T extends keyof MicrosoftEntityMapping>(
nodeType,
name: item.name ?? null,
internalId: getDriveItemInternalId(item),
parentInternalId: item.parentReference
? getParentReferenceInternalId(item.parentReference)
: null,
parentInternalId: null,
mimeType: null,
};
}
Expand All @@ -371,9 +373,7 @@ export function itemToMicrosoftNode<T extends keyof MicrosoftEntityMapping>(
nodeType,
name: item.name ?? null,
internalId: getDriveItemInternalId(item),
parentInternalId: item.parentReference
? getParentReferenceInternalId(item.parentReference)
: null,
parentInternalId: null,
mimeType: item.file?.mimeType ?? null,
};
}
Expand Down
159 changes: 145 additions & 14 deletions connectors/src/connectors/microsoft/temporal/activities.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { ModelId } from "@dust-tt/types";
import type { Client } from "@microsoft/microsoft-graph-client";
import { GraphError } from "@microsoft/microsoft-graph-client";
import type { DriveItem } from "@microsoft/microsoft-graph-types";
import { heartbeat } from "@temporalio/activity";

Expand Down Expand Up @@ -30,13 +31,15 @@ import {
import { getMimeTypesToSync } from "@connectors/connectors/microsoft/temporal/mime_types";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { concurrentExecutor } from "@connectors/lib/async_utils";
import { updateDocumentParentsField } from "@connectors/lib/data_sources";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import {
MicrosoftConfigurationResource,
MicrosoftNodeResource,
MicrosoftRootResource,
} from "@connectors/resources/microsoft_resource";
import type { DataSourceConfig } from "@connectors/types/data_source_config";

const FILES_SYNC_CONCURRENCY = 10;

Expand Down Expand Up @@ -394,12 +397,6 @@ export async function syncDeltaForNode({

const client = await getClient(connector.connectionId);

if (!node.deltaLink) {
throw new Error(
`Delta link not found for root node resource ${JSON.stringify(node.toJSON())}`
);
}

logger.info({ connectorId, node }, "Syncing delta for node");

// Goes through pagination to return all delta results. This is because delta
Expand All @@ -413,15 +410,14 @@ export async function syncDeltaForNode({
//
// If it ever becomes an issue, redis-caching the list and having activities
// grabbing pages of it can be implemented
const { results, deltaLink } = await getFullDeltaResults(
const { results, deltaLink } = await getDeltaData({
client,
nodeId,
node.deltaLink
);
const uniqueDriveItemList = removeAllButLastOccurences(results);
const sortedDriveItemList = sortForIncrementalUpdate(uniqueDriveItemList);
node,
});
const uniqueChangedItems = removeAllButLastOccurences(results);
const sortedChangedItems = sortForIncrementalUpdate(uniqueChangedItems);

for (const driveItem of sortedDriveItemList) {
for (const driveItem of sortedChangedItems) {
heartbeat();

if (!driveItem.parentReference) {
Expand Down Expand Up @@ -461,11 +457,35 @@ export async function syncDeltaForNode({
// in the delta with the 'deleted' field set
await deleteFolder({ connectorId, internalId });
} else {
const isMoved = await isFolderMovedInSameRoot({
connectorId,
folder: driveItem,
internalId,
});
const resource = await MicrosoftNodeResource.updateOrCreate(
connectorId,
itemToMicrosoftNode("folder", driveItem)
);
await resource.update({ lastSeenTs: new Date() });

// add parent information to new node resource. for the toplevel folder,
// parent is null
const parentInternalId =
resource.internalId === nodeId
? null
: getParentReferenceInternalId(driveItem.parentReference);

await resource.update({
parentInternalId,
lastSeenTs: new Date(),
});

if (isMoved) {
await updateDescendantsParentsInQdrant({
dataSourceConfig,
folder: resource,
startSyncTs,
});
}
}
} else {
throw new Error(`Unexpected: driveItem is neither file nor folder`);
Expand Down Expand Up @@ -590,3 +610,114 @@ function sortForIncrementalUpdate(changedList: DriveItem[]) {

return sortedDriveItemList;
}

async function getDeltaData({
client,
node,
}: {
client: Client;
node: MicrosoftNodeResource;
}) {
if (!node.deltaLink) {
throw new Error(
`Delta link not found for root node resource ${JSON.stringify(node.toJSON())}`
);
}

try {
return await getFullDeltaResults(client, node.internalId, node.deltaLink);
} catch (e) {
if (e instanceof GraphError && e.statusCode === 410) {
// API is answering 'resync required'
// we repopulate the delta from scratch
return await getFullDeltaResults(client, node.internalId);
}
throw e;
}
}

async function isFolderMovedInSameRoot({
connectorId,
folder,
internalId,
}: {
connectorId: ModelId;
folder: DriveItem;
internalId: string;
}) {
if (!folder.parentReference) {
throw new Error(`Unexpected: parent reference missing: ${folder}`);
}

const oldResource = await MicrosoftNodeResource.fetchByInternalId(
connectorId,
internalId
);

if (!oldResource) {
// the folder was not moved internally since we don't have it
return false;
}

const oldParentId = oldResource.parentInternalId;

if (!oldParentId) {
// this means it is a root
return false;
}

const newParentId = getParentReferenceInternalId(folder.parentReference);

return oldParentId !== newParentId;
}

async function updateDescendantsParentsInQdrant({
folder,
dataSourceConfig,
startSyncTs,
}: {
folder: MicrosoftNodeResource;
dataSourceConfig: DataSourceConfig;
startSyncTs: number;
}) {
const children = await folder.fetchChildren();
const files = children.filter((child) => child.nodeType === "file");
const folders = children.filter((child) => child.nodeType === "folder");
await concurrentExecutor(
files,
async (file) => updateParentsField({ file, dataSourceConfig, startSyncTs }),
{
concurrency: 10,
}
);
for (const childFolder of folders) {
await updateDescendantsParentsInQdrant({
dataSourceConfig,
folder: childFolder,
startSyncTs,
});
}
}

async function updateParentsField({
file,
dataSourceConfig,
startSyncTs,
}: {
file: MicrosoftNodeResource;
dataSourceConfig: DataSourceConfig;
startSyncTs: number;
}) {
const parents = await getParents({
connectorId: file.connectorId,
internalId: file.internalId,
parentInternalId: file.parentInternalId,
startSyncTs,
});

await updateDocumentParentsField({
dataSourceConfig,
documentId: file.internalId,
parents,
});
}

0 comments on commit c56431f

Please sign in to comment.