Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MS connector] Tweaks on incremental sync #6357

Merged
merged 6 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions connectors/src/connectors/microsoft/lib/graph_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ export async function getFilesAndFolders(
return { results: res.value };
}

/**
* Get list of items that have changed Calling without nextLink nor token will
* result in initial delta call
*/
export async function getDeltaResults({
client,
parentInternalId,
Expand All @@ -92,7 +96,7 @@ export async function getDeltaResults({
client: Client;
parentInternalId: string;
} & (
| { nextLink: string; token?: never }
| { nextLink?: string; token?: never }
| { nextLink?: never; token: string }
)) {
const { nodeType, itemAPIPath } = typeAndPathFromInternalId(parentInternalId);
Expand Down Expand Up @@ -143,7 +147,7 @@ export async function getDeltaResults({
export async function getFullDeltaResults(
client: Client,
parentInternalId: string,
initialDeltaLink: string
initialDeltaLink?: string
): Promise<{ results: microsoftgraph.DriveItem[]; deltaLink: string }> {
let nextLink: string | undefined = initialDeltaLink;
let allItems: microsoftgraph.DriveItem[] = [];
Expand Down Expand Up @@ -359,9 +363,7 @@ export function itemToMicrosoftNode<T extends keyof MicrosoftEntityMapping>(
nodeType,
name: item.name ?? null,
internalId: getDriveItemInternalId(item),
parentInternalId: item.parentReference
? getParentReferenceInternalId(item.parentReference)
: null,
parentInternalId: null,
mimeType: null,
};
}
Expand All @@ -371,9 +373,7 @@ export function itemToMicrosoftNode<T extends keyof MicrosoftEntityMapping>(
nodeType,
name: item.name ?? null,
internalId: getDriveItemInternalId(item),
parentInternalId: item.parentReference
? getParentReferenceInternalId(item.parentReference)
: null,
parentInternalId: null,
mimeType: item.file?.mimeType ?? null,
};
}
Expand Down
159 changes: 145 additions & 14 deletions connectors/src/connectors/microsoft/temporal/activities.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { ModelId } from "@dust-tt/types";
import type { Client } from "@microsoft/microsoft-graph-client";
import { GraphError } from "@microsoft/microsoft-graph-client";
import type { DriveItem } from "@microsoft/microsoft-graph-types";
import { heartbeat } from "@temporalio/activity";

Expand Down Expand Up @@ -30,13 +31,15 @@ import {
import { getMimeTypesToSync } from "@connectors/connectors/microsoft/temporal/mime_types";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { concurrentExecutor } from "@connectors/lib/async_utils";
import { updateDocumentParentsField } from "@connectors/lib/data_sources";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import {
MicrosoftConfigurationResource,
MicrosoftNodeResource,
MicrosoftRootResource,
} from "@connectors/resources/microsoft_resource";
import type { DataSourceConfig } from "@connectors/types/data_source_config";

const FILES_SYNC_CONCURRENCY = 10;

Expand Down Expand Up @@ -394,12 +397,6 @@ export async function syncDeltaForNode({

const client = await getClient(connector.connectionId);

if (!node.deltaLink) {
throw new Error(
`Delta link not found for root node resource ${JSON.stringify(node.toJSON())}`
);
}

logger.info({ connectorId, node }, "Syncing delta for node");

// Goes through pagination to return all delta results. This is because delta
Expand All @@ -413,15 +410,14 @@ export async function syncDeltaForNode({
//
// If it ever becomes an issue, redis-caching the list and having activities
// grabbing pages of it can be implemented
const { results, deltaLink } = await getFullDeltaResults(
const { results, deltaLink } = await getDeltaData({
client,
nodeId,
node.deltaLink
);
const uniqueDriveItemList = removeAllButLastOccurences(results);
const sortedDriveItemList = sortForIncrementalUpdate(uniqueDriveItemList);
node,
});
const uniqueChangedItems = removeAllButLastOccurences(results);
const sortedChangedItems = sortForIncrementalUpdate(uniqueChangedItems);

for (const driveItem of sortedDriveItemList) {
for (const driveItem of sortedChangedItems) {
heartbeat();

if (!driveItem.parentReference) {
Expand Down Expand Up @@ -461,11 +457,35 @@ export async function syncDeltaForNode({
// in the delta with the 'deleted' field set
await deleteFolder({ connectorId, internalId });
} else {
const isMoved = await isFolderMovedInSameRoot({
connectorId,
folder: driveItem,
internalId,
});
const resource = await MicrosoftNodeResource.updateOrCreate(
connectorId,
itemToMicrosoftNode("folder", driveItem)
);
await resource.update({ lastSeenTs: new Date() });

// add parent information to new node resource. for the toplevel folder,
// parent is null
const parentInternalId =
resource.internalId === nodeId
? null
: getParentReferenceInternalId(driveItem.parentReference);

await resource.update({
parentInternalId,
lastSeenTs: new Date(),
});

if (isMoved) {
await updateDescendantsParentsInQdrant({
dataSourceConfig,
folder: resource,
startSyncTs,
});
}
}
} else {
throw new Error(`Unexpected: driveItem is neither file nor folder`);
Expand Down Expand Up @@ -590,3 +610,114 @@ function sortForIncrementalUpdate(changedList: DriveItem[]) {

return sortedDriveItemList;
}

async function getDeltaData({
client,
node,
}: {
client: Client;
node: MicrosoftNodeResource;
}) {
if (!node.deltaLink) {
throw new Error(
`Delta link not found for root node resource ${JSON.stringify(node.toJSON())}`
);
}

try {
return await getFullDeltaResults(client, node.internalId, node.deltaLink);
} catch (e) {
if (e instanceof GraphError && e.statusCode === 410) {
// API is answering 'resync required'
// we repopulate the delta from scratch
return await getFullDeltaResults(client, node.internalId);
}
throw e;
}
}

async function isFolderMovedInSameRoot({
connectorId,
folder,
internalId,
}: {
connectorId: ModelId;
folder: DriveItem;
internalId: string;
}) {
if (!folder.parentReference) {
throw new Error(`Unexpected: parent reference missing: ${folder}`);
}

const oldResource = await MicrosoftNodeResource.fetchByInternalId(
connectorId,
internalId
);

if (!oldResource) {
// the folder was not moved internally since we don't have it
return false;
}

const oldParentId = oldResource.parentInternalId;

if (!oldParentId) {
// this means it is a root
return false;
}

const newParentId = getParentReferenceInternalId(folder.parentReference);

return oldParentId !== newParentId;
}

async function updateDescendantsParentsInQdrant({
folder,
dataSourceConfig,
startSyncTs,
}: {
folder: MicrosoftNodeResource;
dataSourceConfig: DataSourceConfig;
startSyncTs: number;
}) {
const children = await folder.fetchChildren();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I added an optional argument where you can pass a list of nodeTypes to filter on. Not gonna change much but you could do:

const children = await folder.fetchChildren(["file", "folder"]);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, i'd rather fetch all children and have the code err if they're not files or folders --- since it should not happen (not sure i'm clear 😅 happy to clarify irl)

const files = children.filter((child) => child.nodeType === "file");
const folders = children.filter((child) => child.nodeType === "folder");
await concurrentExecutor(
files,
async (file) => updateParentsField({ file, dataSourceConfig, startSyncTs }),
{
concurrency: 10,
}
);
for (const childFolder of folders) {
await updateDescendantsParentsInQdrant({
dataSourceConfig,
folder: childFolder,
startSyncTs,
});
}
}

async function updateParentsField({
file,
dataSourceConfig,
startSyncTs,
}: {
file: MicrosoftNodeResource;
dataSourceConfig: DataSourceConfig;
startSyncTs: number;
}) {
const parents = await getParents({
connectorId: file.connectorId,
internalId: file.internalId,
parentInternalId: file.parentInternalId,
startSyncTs,
});

await updateDocumentParentsField({
dataSourceConfig,
documentId: file.internalId,
parents,
});
}
Loading