Skip to content

Commit

Permalink
do not update already seen folders
Browse files Browse the repository at this point in the history
  • Loading branch information
philipperolet committed Jul 21, 2024
1 parent 22a78a8 commit c914300
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 18 deletions.
82 changes: 66 additions & 16 deletions connectors/src/connectors/microsoft/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
deleteFile,
deleteFolder,
getParents,
isAlreadySeenItem,
syncOneFile,
} from "@connectors/connectors/microsoft/temporal/file";
import { getMimeTypesToSync } from "@connectors/connectors/microsoft/temporal/mime_types";
Expand Down Expand Up @@ -220,10 +221,7 @@ async function isParentAlreadyInNodes({
return false;
}

export async function markNodeAsVisited(
connectorId: ModelId,
internalId: string
) {
export async function markNodeAsSeen(connectorId: ModelId, internalId: string) {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
throw new Error(`Connector ${connectorId} not found`);
Expand All @@ -238,7 +236,10 @@ export async function markNodeAsVisited(
throw new Error(`Node ${internalId} not found`);
}

await node.update({ lastSeenTs: new Date() });
// if node was updated more recently than this sync, we don't need to mark it
if (node.lastSeenTs && node.lastSeenTs < new Date()) {
await node.update({ lastSeenTs: new Date() });
}
}

/**
Expand Down Expand Up @@ -306,14 +307,14 @@ export async function syncFiles({
pdfEnabled: providerConfig.pdfEnabled || false,
connector,
});
const childrenToSync = children.filter(
const filesToSync = children.filter(
(item) =>
item.file?.mimeType && mimeTypesToSync.includes(item.file.mimeType)
);

// sync files
const results = await concurrentExecutor(
childrenToSync,
filesToSync,
async (child) =>
syncOneFile({
connectorId,
Expand All @@ -338,21 +339,67 @@ export async function syncFiles({
`[SyncFiles] Successful sync.`
);

const childResources = await MicrosoftNodeResource.batchUpdateOrCreate(
// do not update folders that were already seen
const folderResources = await MicrosoftNodeResource.fetchByInternalIds(
connectorId,
children
.filter((item) => item.folder)
.map(
(item): MicrosoftNode => ({
...itemToMicrosoftNode("folder", item),
// add parent information to new node resources
parentInternalId,
})
)
.map((item) => getDriveItemInternalId(item))
);

// compute in O(n) folders that were already seen
const alreadySeenResourcesById: Record<string, MicrosoftNodeResource> = {};
folderResources.forEach((f) => (alreadySeenResourcesById[f.internalId] = f));
children.forEach((c) => {
if (!c.folder) {
return;
}

const cResource = alreadySeenResourcesById[getDriveItemInternalId(c)];

if (!cResource) {
return;
}

if (
!isAlreadySeenItem({
driveItem: c,
driveItemResource: cResource,
startSyncTs,
})
) {
delete alreadySeenResourcesById[cResource.internalId];
}
});

const alreadySeenResources = Object.values(alreadySeenResourcesById);

const createdOrUpdatedResources =
await MicrosoftNodeResource.batchUpdateOrCreate(
connectorId,
children
.filter(
(item) =>
item.folder &&
// only create/update if resource unseen
!alreadySeenResourcesById[getDriveInternalIdFromItem(item)]
)
.map(
(item): MicrosoftNode => ({
...itemToMicrosoftNode("folder", item),
// add parent information to new node resources
parentInternalId,
})
)
);

return {
count,
childNodes: childResources.map((r) => r.internalId),
// still visit children of already seen nodes; an already seen node does not
// mean all its children are already seen too
childNodes: [...createdOrUpdatedResources, ...alreadySeenResources].map(
(r) => r.internalId
),
nextLink: childrenResult.nextLink,
};
}
Expand Down Expand Up @@ -721,3 +768,6 @@ async function updateParentsField({
parents,
});
}
function isDriveItemAlreadySeen(arg0: {}): unknown {
throw new Error("Function not implemented.");
}
4 changes: 2 additions & 2 deletions connectors/src/connectors/microsoft/temporal/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
import type * as activities from "@connectors/connectors/microsoft/temporal/activities";
import type * as sync_status from "@connectors/lib/sync_status";

const { getSiteNodesToSync, syncFiles, markNodeAsVisited, populateDeltas } =
const { getSiteNodesToSync, syncFiles, markNodeAsSeen, populateDeltas } =
proxyActivities<typeof activities>({
startToCloseTimeout: "30 minutes",
});
Expand Down Expand Up @@ -89,7 +89,7 @@ export async function fullSyncSitesWorkflow({
);
} while (nextPageLink);

await markNodeAsVisited(connectorId, nodeId);
await markNodeAsSeen(connectorId, nodeId);

if (workflowInfo().historyLength > 4000) {
await continueAsNew<typeof fullSyncSitesWorkflow>({
Expand Down

0 comments on commit c914300

Please sign in to comment.