-
Notifications
You must be signed in to change notification settings - Fork 114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MS Connector] Compute parents array for document upsertion #6233
Changes from 4 commits
2a10253
f2eda44
cef2e5f
560cc45
04d09b3
fc04f8a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
import type { CoreAPIDataSourceDocumentSection, ModelId } from "@dust-tt/types"; | ||
import { cacheWithRedis } from "@dust-tt/types"; | ||
import type { Client } from "@microsoft/microsoft-graph-client"; | ||
import axios from "axios"; | ||
import mammoth from "mammoth"; | ||
import type { Logger } from "pino"; | ||
|
@@ -7,14 +9,17 @@ import turndown from "turndown"; | |
import { getClient } from "@connectors/connectors/microsoft"; | ||
import { | ||
getAllPaginatedEntities, | ||
getDriveAPIPathFromItem, | ||
getDriveItemAPIPath, | ||
getDriveItemAPIPathFromReference, | ||
getDrives, | ||
getFilesAndFolders, | ||
getItem, | ||
getSiteAPIPath, | ||
getSites, | ||
internalIdFromTypeAndPath, | ||
itemToMicrosoftNode, | ||
typeAndPathFromInternalId, | ||
} from "@connectors/connectors/microsoft/lib/graph_api"; | ||
import type { MicrosoftNode } from "@connectors/connectors/microsoft/lib/types"; | ||
import { getMimeTypesToSync } from "@connectors/connectors/microsoft/temporal/mime_types"; | ||
|
@@ -117,14 +122,84 @@ export async function getSiteNodesToSync( | |
[] as MicrosoftNode[] | ||
); | ||
|
||
// for all folders, check if a parent folder or drive is already in the list, | ||
// in which case remove it this can happen because when a user selects a | ||
// folder to sync, then a parent folder, both are storeed in Microsoft Roots | ||
// table | ||
|
||
// Keeping them both in the sync list can result in various kinds of issues, | ||
// e.g. if a child folder is synced before the parent, then the child folder's | ||
// files' parents array will be incomplete, thus the need to prune the list | ||
const nodesToSync = allNodes.filter( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to be a O(N^2) loop, are we sure it's properly bounded? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No prob here; max max a hundred of items (it's the user selection of folders), and the costly part is only run O(n) 👍 |
||
async (node) => | ||
!( | ||
node.nodeType === "folder" && | ||
(await isParentAlreadyInNodes({ | ||
client, | ||
nodes: allNodes, | ||
folder: node, | ||
})) | ||
) | ||
); | ||
|
||
const nodeResources = await MicrosoftNodeResource.batchUpdateOrCreate( | ||
connectorId, | ||
allNodes | ||
nodesToSync | ||
); | ||
|
||
return nodeResources.map((r) => r.internalId); | ||
} | ||
|
||
async function isParentAlreadyInNodes({ | ||
client, | ||
nodes, | ||
folder, | ||
}: { | ||
client: Client; | ||
nodes: MicrosoftNode[]; | ||
folder: MicrosoftNode; | ||
}) { | ||
const { itemAPIPath } = typeAndPathFromInternalId(folder.internalId); | ||
let driveItem: microsoftgraph.DriveItem = await getItem(client, itemAPIPath); | ||
|
||
// check if the list already contains the drive of this folder | ||
if ( | ||
nodes.some( | ||
(node) => | ||
node.internalId === | ||
internalIdFromTypeAndPath({ | ||
nodeType: "drive", | ||
itemAPIPath: getDriveAPIPathFromItem(driveItem), | ||
}) | ||
) | ||
) { | ||
return true; | ||
} | ||
|
||
// check if the list already contains any parent of this folder | ||
while (!driveItem.root) { | ||
if (!driveItem.parentReference) { | ||
return false; | ||
} | ||
|
||
const parentAPIPath = getDriveItemAPIPathFromReference( | ||
driveItem.parentReference | ||
); | ||
|
||
const parentInternalId = internalIdFromTypeAndPath({ | ||
nodeType: "folder", | ||
itemAPIPath: parentAPIPath, | ||
}); | ||
|
||
if (nodes.some((node) => node.internalId === parentInternalId)) { | ||
return true; | ||
} | ||
|
||
driveItem = await getItem(client, parentAPIPath); | ||
} | ||
return false; | ||
} | ||
|
||
export async function markNodeAsVisited( | ||
connectorId: ModelId, | ||
internalId: string | ||
|
@@ -457,9 +532,12 @@ export async function syncOneFile({ | |
|
||
const isInSizeRange = documentLength > 0 && documentLength < maxDocumentLen; | ||
if (isInSizeRange) { | ||
// TODO(pr): add getParents implementation | ||
const parents = []; | ||
parents.push(documentId); | ||
const parents = await getParents({ | ||
connectorId, | ||
internalId: documentId, | ||
parentInternalId, | ||
startSyncTs, | ||
}); | ||
parents.reverse(); | ||
|
||
await upsertToDatasource({ | ||
|
@@ -505,6 +583,59 @@ export async function syncOneFile({ | |
return isInSizeRange; | ||
} | ||
|
||
async function getParents({ | ||
connectorId, | ||
internalId, | ||
parentInternalId, | ||
startSyncTs, | ||
}: { | ||
connectorId: ModelId; | ||
internalId: string; | ||
parentInternalId: string | null; | ||
startSyncTs: number; | ||
}): Promise<string[]> { | ||
if (!parentInternalId) { | ||
return [internalId]; | ||
} | ||
|
||
const parentParentInternalId = await getParentParentId( | ||
connectorId, | ||
parentInternalId, | ||
startSyncTs | ||
); | ||
|
||
return [ | ||
internalId, | ||
...(await getParents({ | ||
connectorId, | ||
internalId: parentInternalId, | ||
parentInternalId: parentParentInternalId, | ||
startSyncTs, | ||
})), | ||
]; | ||
} | ||
|
||
/* Fetching parent's parent id queries the db for a resource; since those | ||
* fetches can be made a lot of times during a sync, cache for 10mins in a | ||
* per-sync basis (given by startSyncTs) */ | ||
const getParentParentId = cacheWithRedis( | ||
// eslint-disable-next-line @typescript-eslint/no-unused-vars | ||
async (connectorId, parentInternalId, startSyncTs) => { | ||
const parent = await MicrosoftNodeResource.fetchByInternalId( | ||
connectorId, | ||
parentInternalId | ||
); | ||
if (!parent) { | ||
throw new Error(`Parent node not found: ${parentInternalId}`); | ||
} | ||
|
||
return parent.parentInternalId; | ||
}, | ||
(connectorId, parentInternalId, startSyncTs) => | ||
`microsoft-${connectorId}-parent-${parentInternalId}-syncms-${startSyncTs}`, | ||
10 * 60 * 1000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
); | ||
|
||
async function handlePptxFile( | ||
data: ArrayBuffer, | ||
fileId: string | undefined, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing a
.
after "remove it".