Skip to content

Commit

Permalink
[MS Connector] Incremental sync (#6317)
Browse files Browse the repository at this point in the history
* added getDelta function

* add delta to root resource

* scaffolding of launchworkflow, workflow & activity

* query delta using timestamp

* update root resource with delta

* wip - loop start, without handling

* refactor getDriveItemAPIPath -> getDriveItemInternalId

* wip: fill delta sync loop + refactor apipath->internalid

* refactor + spreadsheet deletion + populate deltas

* fix sql relationship

* account for long links

* logging

* fix delta <-> node association

* remove log

* try to grab downloadUrl if not present

* fix: do not delete if file was internally moved

* perform deletion for drive

* mark as visited

* update getDriveItemInternalId

* order before syncing

* jules review
  • Loading branch information
philipperolet authored Jul 19, 2024
1 parent 4f7176a commit 218a091
Show file tree
Hide file tree
Showing 8 changed files with 686 additions and 102 deletions.
14 changes: 4 additions & 10 deletions connectors/src/connectors/microsoft/lib/content_nodes.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { ContentNode } from "@dust-tt/types";

import {
getDriveAPIPath,
getDriveItemAPIPath,
getDriveInternalId,
getDriveItemInternalId,
getSiteAPIPath,
internalIdFromTypeAndPath,
typeAndPathFromInternalId,
Expand Down Expand Up @@ -127,10 +127,7 @@ export function getDriveAsContentNode(
}
return {
provider: "microsoft",
internalId: internalIdFromTypeAndPath({
itemAPIPath: getDriveAPIPath(drive),
nodeType: "drive",
}),
internalId: getDriveInternalId(drive),
parentInternalId,
type: "folder",
title: drive.name || "unnamed",
Expand All @@ -147,10 +144,7 @@ export function getFolderAsContentNode(
): ContentNode {
return {
provider: "microsoft",
internalId: internalIdFromTypeAndPath({
itemAPIPath: getDriveItemAPIPath(folder),
nodeType: "folder",
}),
internalId: getDriveItemInternalId(folder),
parentInternalId,
type: "folder",
title: folder.name || "unnamed",
Expand Down
186 changes: 152 additions & 34 deletions connectors/src/connectors/microsoft/lib/graph_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export async function getFilesAndFolders(
`Invalid node type: ${nodeType} for getFilesAndFolders, expected drive or folder`
);
}

const endpoint =
nodeType === "drive"
? `${parentResourcePath}/root/children`
Expand All @@ -82,6 +83,90 @@ export async function getFilesAndFolders(
return { results: res.value };
}

export async function getDeltaResults({
client,
parentInternalId,
nextLink,
token,
}: {
client: Client;
parentInternalId: string;
} & (
| { nextLink: string; token?: never }
| { nextLink?: never; token: string }
)) {
const { nodeType, itemAPIPath } = typeAndPathFromInternalId(parentInternalId);

if (nodeType !== "drive" && nodeType !== "folder") {
throw new Error(
`Invalid node type: ${nodeType} for delta, expected drive or folder`
);
}

if (nextLink && token) {
throw new Error("nextLink and token cannot be used together");
}

const deltaPath =
(nodeType === "folder"
? itemAPIPath + "/delta"
: itemAPIPath + "/root/delta") + (token ? `?token=${token}` : "");

const res = nextLink
? await client.api(nextLink).get()
: await client
.api(deltaPath)
.header("Prefer", "odata.track-changes, deltaExcludeParent=true")
.get();

if ("@odata.nextLink" in res) {
return {
results: res.value,
nextLink: res["@odata.nextLink"],
};
}

if ("@odata.deltaLink" in res) {
return {
results: res.value,
deltaLink: res["@odata.deltaLink"],
};
}

return { results: res.value };
}

/**
* Similar to getDeltaResults but goes through pagination (returning results and
* the deltalink)
*/
export async function getFullDeltaResults(
client: Client,
parentInternalId: string,
initialDeltaLink: string
): Promise<{ results: microsoftgraph.DriveItem[]; deltaLink: string }> {
let nextLink: string | undefined = initialDeltaLink;
let allItems: microsoftgraph.DriveItem[] = [];
let deltaLink: string | undefined = undefined;

do {
const {
results,
nextLink: newNextLink,
deltaLink: finalDeltaLink,
} = await getDeltaResults({ client, parentInternalId, nextLink });
allItems = allItems.concat(results);
nextLink = newNextLink;
deltaLink = finalDeltaLink;
} while (nextLink);

if (!deltaLink) {
throw new Error("Delta link not found");
}

return { results: allItems, deltaLink };
}

export async function getWorksheets(
client: Client,
internalId: string,
Expand Down Expand Up @@ -202,6 +287,7 @@ export async function getMessages(

return { results: res.value };
}

/**
* Given a getter function with a single nextLink optional parameter, this function
* fetches all items by following nextLinks
Expand All @@ -227,6 +313,18 @@ export async function getItem(client: Client, itemApiPath: string) {
return client.api(itemApiPath).get();
}

export async function getFileDownloadURL(client: Client, internalId: string) {
const { nodeType, itemAPIPath } = typeAndPathFromInternalId(internalId);

if (nodeType !== "file") {
throw new Error(`Invalid node type: ${nodeType} for getFileDownloadURL`);
}

const res = await client.api(`${itemAPIPath}`).get();

return res["@microsoft.graph.downloadUrl"];
}

type MicrosoftEntity = {
folder: MicrosoftGraph.DriveItem;
drive: MicrosoftGraph.Drive;
Expand Down Expand Up @@ -260,11 +358,10 @@ export function itemToMicrosoftNode<T extends keyof MicrosoftEntityMapping>(
return {
nodeType,
name: item.name ?? null,
internalId: internalIdFromTypeAndPath({
nodeType,
itemAPIPath: getDriveItemAPIPath(item),
}),
parentInternalId: null,
internalId: getDriveItemInternalId(item),
parentInternalId: item.parentReference
? getParentReferenceInternalId(item.parentReference)
: null,
mimeType: null,
};
}
Expand All @@ -273,11 +370,10 @@ export function itemToMicrosoftNode<T extends keyof MicrosoftEntityMapping>(
return {
nodeType,
name: item.name ?? null,
internalId: internalIdFromTypeAndPath({
nodeType,
itemAPIPath: getDriveItemAPIPath(item),
}),
parentInternalId: null,
internalId: getDriveItemInternalId(item),
parentInternalId: item.parentReference
? getParentReferenceInternalId(item.parentReference)
: null,
mimeType: item.file?.mimeType ?? null,
};
}
Expand All @@ -286,10 +382,7 @@ export function itemToMicrosoftNode<T extends keyof MicrosoftEntityMapping>(
return {
nodeType,
name: item.name ?? null,
internalId: internalIdFromTypeAndPath({
nodeType,
itemAPIPath: getDriveAPIPath(item),
}),
internalId: getDriveInternalId(item),
parentInternalId: null,
mimeType: null,
};
Expand Down Expand Up @@ -363,27 +456,53 @@ export function typeAndPathFromInternalId(internalId: string): {
return { nodeType, itemAPIPath: resourcePathArr.join("/") };
}

export function getDriveItemAPIPath(item: MicrosoftGraph.DriveItem) {
export function getDriveItemInternalId(item: MicrosoftGraph.DriveItem) {
const { parentReference } = item;

if (!parentReference?.driveId) {
throw new Error("Unexpected: no drive id for item");
}

return `/drives/${parentReference.driveId}/items/${item.id}`;
const nodeType = item.folder ? "folder" : item.file ? "file" : null;

if (!nodeType) {
throw new Error("Unexpected: item is neither folder nor file");
}

if (item.root) {
return internalIdFromTypeAndPath({
nodeType: "drive",
itemAPIPath: `/drives/${parentReference.driveId}`,
});
}

return internalIdFromTypeAndPath({
nodeType,
itemAPIPath: `/drives/${parentReference.driveId}/items/${item.id}`,
});
}

export function getParentReferenceAPIPath(
export function getParentReferenceInternalId(
parentReference: MicrosoftGraph.ItemReference
) {
if (!parentReference.driveId) {
throw new Error("Unexpected: no drive id for item");
}

return `/drives/${parentReference.driveId}/items/${parentReference.id}`;
if (parentReference.path && !parentReference.path.endsWith("root:")) {
return internalIdFromTypeAndPath({
nodeType: "folder",
itemAPIPath: `/drives/${parentReference.driveId}/items/${parentReference.id}`,
});
}

return internalIdFromTypeAndPath({
nodeType: "drive",
itemAPIPath: `/drives/${parentReference.driveId}`,
});
}

export function getWorksheetAPIPath(
export function getWorksheetInternalId(
item: MicrosoftGraph.WorkbookWorksheet,
parentInternalId: string
) {
Expand All @@ -394,29 +513,28 @@ export function getWorksheetAPIPath(
throw new Error(`Invalid parent nodeType: ${nodeType}`);
}

return `${parentItemApiPath}/workbook/worksheets/${item.id}`;
return internalIdFromTypeAndPath({
itemAPIPath: `${parentItemApiPath}/workbook/worksheets/${item.id}`,
nodeType: "worksheet",
});
}

export function getDriveAPIPath(drive: MicrosoftGraph.Drive) {
return `/drives/${drive.id}`;
export function getDriveInternalId(drive: MicrosoftGraph.Drive) {
return internalIdFromTypeAndPath({
nodeType: "drive",
itemAPIPath: `/drives/${drive.id}`,
});
}

export function getDriveAPIPathFromItem(item: MicrosoftGraph.DriveItem) {
export function getDriveInternalIdFromItem(item: MicrosoftGraph.DriveItem) {
if (!item.parentReference?.driveId) {
throw new Error("Unexpected: no drive id for item");
}

return `/drives/${item.parentReference.driveId}`;
}

export function getDriveItemAPIPathFromReference(
parentReference: MicrosoftGraph.ItemReference
) {
if (!parentReference.driveId) {
throw new Error("Unexpected: no drive id for item");
}

return `/drives/${parentReference.driveId}/items/${parentReference.id}`;
return internalIdFromTypeAndPath({
nodeType: "drive",
itemAPIPath: `/drives/${item.parentReference.driveId}`,
});
}

export function getSiteAPIPath(site: MicrosoftGraph.Site) {
Expand Down
Loading

0 comments on commit 218a091

Please sign in to comment.