From 5d533967cf304a0b5bcc987a4445d54e5811ffae Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Wed, 10 Jan 2024 12:07:20 +0100 Subject: [PATCH] Github code: activity concurrency (#3124) * Github code: activity concurrency * use p-queue onIdle --- .../connectors/github/temporal/activities.ts | 293 +++++++++--------- 1 file changed, 152 insertions(+), 141 deletions(-) diff --git a/connectors/src/connectors/github/temporal/activities.ts b/connectors/src/connectors/github/temporal/activities.ts index d6c2a07d37bd..ab5e532eeaad 100644 --- a/connectors/src/connectors/github/temporal/activities.ts +++ b/connectors/src/connectors/github/temporal/activities.ts @@ -771,12 +771,16 @@ async function garbageCollectCodeSync( "GarbageCollectCodeSync: deleting files" ); - for (const f of filesToDelete) { - await deleteFromDataSource(dataSourceConfig, f.documentId, loggerArgs); - // Only destroy once we succesfully removed from the data source. This is idempotent and will - // work as expected when retried. - await f.destroy(); - } + const fq = new PQueue({ concurrency: 8 }); + filesToDelete.forEach((f) => + fq.add(async () => { + await deleteFromDataSource(dataSourceConfig, f.documentId, loggerArgs); + // Only destroy once we succesfully removed from the data source. This is idempotent and will + // work as expected when retried. + await f.destroy(); + }) + ); + await fq.onIdle(); } const directoriesToDelete = await GithubCodeDirectory.findAll({ @@ -888,156 +892,163 @@ export async function githubCodeSyncActivity({ // given that syncing stallness is already considered an incident. const rootInternalId = `github-code-${repoId}`; - const updatedDirectories: { [key: string]: boolean } = {}; - for (const f of files) { - // Read file (files are 1MB at most). - const content = await fs.readFile(f.localFilePath); - const contentHash = blake3(content).toString("hex"); - const parentInternalId = f.parentInternalId || rootInternalId; - - // Find file or create it with an empty contentHash. - let githubCodeFile = await GithubCodeFile.findOne({ - where: { - connectorId: connector.id, - repoId: repoId.toString(), - documentId: f.documentId, - }, - }); - - if (!githubCodeFile) { - githubCodeFile = await GithubCodeFile.create({ - connectorId: connector.id, - repoId: repoId.toString(), - documentId: f.documentId, - parentInternalId, - fileName: f.fileName, - sourceUrl: f.sourceUrl, - contentHash: "", - createdAt: codeSyncStartedAt, - updatedAt: codeSyncStartedAt, - lastSeenAt: codeSyncStartedAt, + const fq = new PQueue({ concurrency: 4 }); + files.forEach((f) => + fq.add(async () => { + // Read file (files are 1MB at most). + const content = await fs.readFile(f.localFilePath); + const contentHash = blake3(content).toString("hex"); + const parentInternalId = f.parentInternalId || rootInternalId; + + // Find file or create it with an empty contentHash. + let githubCodeFile = await GithubCodeFile.findOne({ + where: { + connectorId: connector.id, + repoId: repoId.toString(), + documentId: f.documentId, + }, }); - } - - // If the parents have updated then the documentId gets updated as well so we should never - // have an udpate to parentInternalId. We check that this is always the case. If the file is - // moved (the parents change) then it will trigger the creation of a new file with a new - // docuemntId and the existing GithubCodeFile (with old documentId) will be cleaned up at the - // end of the activity. - if (parentInternalId !== githubCodeFile.parentInternalId) { - throw new Error( - `File parentInternalId mismatch for ${connector.id}/${f.documentId}` + - ` (expected ${parentInternalId}, got ${githubCodeFile.parentInternalId})` - ); - } - // We update the if the file name, source url or content has changed. - const needsUpdate = - f.fileName !== githubCodeFile.fileName || - f.sourceUrl !== githubCodeFile.sourceUrl || - contentHash !== githubCodeFile.contentHash; + if (!githubCodeFile) { + githubCodeFile = await GithubCodeFile.create({ + connectorId: connector.id, + repoId: repoId.toString(), + documentId: f.documentId, + parentInternalId, + fileName: f.fileName, + sourceUrl: f.sourceUrl, + contentHash: "", + createdAt: codeSyncStartedAt, + updatedAt: codeSyncStartedAt, + lastSeenAt: codeSyncStartedAt, + }); + } - if (needsUpdate) { - // Record the parent directories to update their updatedAt. - for (const parentInternalId of f.parents) { - updatedDirectories[parentInternalId] = true; + // If the parents have updated then the documentId gets updated as well so we should never + // have an udpate to parentInternalId. We check that this is always the case. If the file + // is moved (the parents change) then it will trigger the creation of a new file with a + // new docuemntId and the existing GithubCodeFile (with old documentId) will be cleaned up + // at the end of the activity. + if (parentInternalId !== githubCodeFile.parentInternalId) { + throw new Error( + `File parentInternalId mismatch for ${connector.id}/${f.documentId}` + + ` (expected ${parentInternalId}, got ${githubCodeFile.parentInternalId})` + ); } - const tags = [ - `title:${f.fileName}`, - `lasUpdatedAt:${codeSyncStartedAt.getTime()}`, - ]; + // We update the if the file name, source url or content has changed. + const needsUpdate = + f.fileName !== githubCodeFile.fileName || + f.sourceUrl !== githubCodeFile.sourceUrl || + contentHash !== githubCodeFile.contentHash; + + if (needsUpdate) { + // Record the parent directories to update their updatedAt. + for (const parentInternalId of f.parents) { + updatedDirectories[parentInternalId] = true; + } + + const tags = [ + `title:${f.fileName}`, + `lasUpdatedAt:${codeSyncStartedAt.getTime()}`, + ]; + + // Time to upload the file to the data source. + await upsertToDatasource({ + dataSourceConfig, + documentId: f.documentId, + documentContent: formatCodeContentForUpsert(f.sourceUrl, content), + documentUrl: f.sourceUrl, + timestampMs: codeSyncStartedAt.getTime(), + tags, + parents: [...f.parents, rootInternalId, repoId.toString()], + retries: 3, + delayBetweenRetriesMs: 1000, + loggerArgs: { ...loggerArgs, provider: "github" }, + upsertContext: { + sync_type: isBatchSync ? "batch" : "incremental", + }, + }); + + // Finally update the file. + githubCodeFile.fileName = f.fileName; + githubCodeFile.sourceUrl = f.sourceUrl; + githubCodeFile.contentHash = contentHash; + githubCodeFile.updatedAt = codeSyncStartedAt; + } else { + localLogger.info( + { + repoId, + fileName: f.fileName, + documentId: f.documentId, + }, + "Skipping update of unchanged GithubCodeFile" + ); + } - // Time to upload the file to the data source. - await upsertToDatasource({ - dataSourceConfig, - documentId: f.documentId, - documentContent: formatCodeContentForUpsert(f.sourceUrl, content), - documentUrl: f.sourceUrl, - timestampMs: codeSyncStartedAt.getTime(), - tags, - parents: [...f.parents, rootInternalId, repoId.toString()], - retries: 3, - delayBetweenRetriesMs: 1000, - loggerArgs: { ...loggerArgs, provider: "github" }, - upsertContext: { - sync_type: isBatchSync ? "batch" : "incremental", + // Finally we update the lastSeenAt for all files we've seen, and save. + githubCodeFile.lastSeenAt = codeSyncStartedAt; + await githubCodeFile.save(); + }) + ); + await fq.onIdle(); + + const dq = new PQueue({ concurrency: 8 }); + directories.forEach((d) => + dq.add(async () => { + const parentInternalId = d.parentInternalId || rootInternalId; + + // Find directory or create it. + let githubCodeDirectory = await GithubCodeDirectory.findOne({ + where: { + connectorId: connector.id, + repoId: repoId.toString(), + internalId: d.internalId, }, }); - // Finally update the file. - githubCodeFile.fileName = f.fileName; - githubCodeFile.sourceUrl = f.sourceUrl; - githubCodeFile.contentHash = contentHash; - githubCodeFile.updatedAt = codeSyncStartedAt; - } else { - localLogger.info( - { - repoId, - fileName: f.fileName, - documentId: f.documentId, - }, - "Skipping update of unchanged GithubCodeFile" - ); - } - - // Finally we update the lastSeenAt for all files we've seen, and save. - githubCodeFile.lastSeenAt = codeSyncStartedAt; - await githubCodeFile.save(); - } - - for (const d of directories) { - const parentInternalId = d.parentInternalId || rootInternalId; - - // Find directory or create it. - let githubCodeDirectory = await GithubCodeDirectory.findOne({ - where: { - connectorId: connector.id, - repoId: repoId.toString(), - internalId: d.internalId, - }, - }); - - if (!githubCodeDirectory) { - githubCodeDirectory = await GithubCodeDirectory.create({ - connectorId: connector.id, - repoId: repoId.toString(), - internalId: d.internalId, - parentInternalId, - dirName: d.dirName, - sourceUrl: d.sourceUrl, - createdAt: codeSyncStartedAt, - updatedAt: codeSyncStartedAt, - lastSeenAt: codeSyncStartedAt, - }); - } + if (!githubCodeDirectory) { + githubCodeDirectory = await GithubCodeDirectory.create({ + connectorId: connector.id, + repoId: repoId.toString(), + internalId: d.internalId, + parentInternalId, + dirName: d.dirName, + sourceUrl: d.sourceUrl, + createdAt: codeSyncStartedAt, + updatedAt: codeSyncStartedAt, + lastSeenAt: codeSyncStartedAt, + }); + } - // If the parents have updated then the internalId gets updated as well so we should never - // have an udpate to parentInternalId. We check that this is always the case. If the directory - // is moved (the parents change) then it will trigger the creation of a new directory with a - // new internalId and the existing GithubCodeDirectory (with old internalId) will be cleaned - // up at the end of the activity. - if (parentInternalId !== githubCodeDirectory.parentInternalId) { - throw new Error( - `Directory parentInternalId mismatch for ${connector.id}/${d.internalId}` + - ` (expected ${parentInternalId}, got ${githubCodeDirectory.parentInternalId})` - ); - } + // If the parents have updated then the internalId gets updated as well so we should never + // have an udpate to parentInternalId. We check that this is always the case. If the + // directory is moved (the parents change) then it will trigger the creation of a new + // directory with a new internalId and the existing GithubCodeDirectory (with old + // internalId) will be cleaned up at the end of the activity. + if (parentInternalId !== githubCodeDirectory.parentInternalId) { + throw new Error( + `Directory parentInternalId mismatch for ${connector.id}/${d.internalId}` + + ` (expected ${parentInternalId}, got ${githubCodeDirectory.parentInternalId})` + ); + } - // If some files were updated as part of the sync, refresh the directory updatedAt. - if (updatedDirectories[d.internalId]) { - githubCodeDirectory.updatedAt = codeSyncStartedAt; - } + // If some files were updated as part of the sync, refresh the directory updatedAt. + if (updatedDirectories[d.internalId]) { + githubCodeDirectory.updatedAt = codeSyncStartedAt; + } - // Update everything else. - githubCodeDirectory.dirName = d.dirName; - githubCodeDirectory.sourceUrl = d.sourceUrl; - githubCodeDirectory.lastSeenAt = codeSyncStartedAt; + // Update everything else. + githubCodeDirectory.dirName = d.dirName; + githubCodeDirectory.sourceUrl = d.sourceUrl; + githubCodeDirectory.lastSeenAt = codeSyncStartedAt; - await githubCodeDirectory.save(); - } + await githubCodeDirectory.save(); + }) + ); + await dq.onIdle(); // Final part of the sync, we delete all files and directories that were not seen during the // sync.