Skip to content

Commit

Permalink
Github code: activity concurrency (#3124)
Browse files Browse the repository at this point in the history
* Github code: activity concurrency

* use p-queue onIdle
  • Loading branch information
spolu authored Jan 10, 2024
1 parent c2d0895 commit 5d53396
Showing 1 changed file with 152 additions and 141 deletions.
293 changes: 152 additions & 141 deletions connectors/src/connectors/github/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -771,12 +771,16 @@ async function garbageCollectCodeSync(
"GarbageCollectCodeSync: deleting files"
);

for (const f of filesToDelete) {
await deleteFromDataSource(dataSourceConfig, f.documentId, loggerArgs);
// Only destroy once we succesfully removed from the data source. This is idempotent and will
// work as expected when retried.
await f.destroy();
}
const fq = new PQueue({ concurrency: 8 });
filesToDelete.forEach((f) =>
fq.add(async () => {
await deleteFromDataSource(dataSourceConfig, f.documentId, loggerArgs);
// Only destroy once we succesfully removed from the data source. This is idempotent and will
// work as expected when retried.
await f.destroy();
})
);
await fq.onIdle();
}

const directoriesToDelete = await GithubCodeDirectory.findAll({
Expand Down Expand Up @@ -888,156 +892,163 @@ export async function githubCodeSyncActivity({
// given that syncing stallness is already considered an incident.

const rootInternalId = `github-code-${repoId}`;

const updatedDirectories: { [key: string]: boolean } = {};

for (const f of files) {
// Read file (files are 1MB at most).
const content = await fs.readFile(f.localFilePath);
const contentHash = blake3(content).toString("hex");
const parentInternalId = f.parentInternalId || rootInternalId;

// Find file or create it with an empty contentHash.
let githubCodeFile = await GithubCodeFile.findOne({
where: {
connectorId: connector.id,
repoId: repoId.toString(),
documentId: f.documentId,
},
});

if (!githubCodeFile) {
githubCodeFile = await GithubCodeFile.create({
connectorId: connector.id,
repoId: repoId.toString(),
documentId: f.documentId,
parentInternalId,
fileName: f.fileName,
sourceUrl: f.sourceUrl,
contentHash: "",
createdAt: codeSyncStartedAt,
updatedAt: codeSyncStartedAt,
lastSeenAt: codeSyncStartedAt,
const fq = new PQueue({ concurrency: 4 });
files.forEach((f) =>
fq.add(async () => {
// Read file (files are 1MB at most).
const content = await fs.readFile(f.localFilePath);
const contentHash = blake3(content).toString("hex");
const parentInternalId = f.parentInternalId || rootInternalId;

// Find file or create it with an empty contentHash.
let githubCodeFile = await GithubCodeFile.findOne({
where: {
connectorId: connector.id,
repoId: repoId.toString(),
documentId: f.documentId,
},
});
}

// If the parents have updated then the documentId gets updated as well so we should never
// have an udpate to parentInternalId. We check that this is always the case. If the file is
// moved (the parents change) then it will trigger the creation of a new file with a new
// docuemntId and the existing GithubCodeFile (with old documentId) will be cleaned up at the
// end of the activity.
if (parentInternalId !== githubCodeFile.parentInternalId) {
throw new Error(
`File parentInternalId mismatch for ${connector.id}/${f.documentId}` +
` (expected ${parentInternalId}, got ${githubCodeFile.parentInternalId})`
);
}

// We update the if the file name, source url or content has changed.
const needsUpdate =
f.fileName !== githubCodeFile.fileName ||
f.sourceUrl !== githubCodeFile.sourceUrl ||
contentHash !== githubCodeFile.contentHash;
if (!githubCodeFile) {
githubCodeFile = await GithubCodeFile.create({
connectorId: connector.id,
repoId: repoId.toString(),
documentId: f.documentId,
parentInternalId,
fileName: f.fileName,
sourceUrl: f.sourceUrl,
contentHash: "",
createdAt: codeSyncStartedAt,
updatedAt: codeSyncStartedAt,
lastSeenAt: codeSyncStartedAt,
});
}

if (needsUpdate) {
// Record the parent directories to update their updatedAt.
for (const parentInternalId of f.parents) {
updatedDirectories[parentInternalId] = true;
// If the parents have updated then the documentId gets updated as well so we should never
// have an udpate to parentInternalId. We check that this is always the case. If the file
// is moved (the parents change) then it will trigger the creation of a new file with a
// new docuemntId and the existing GithubCodeFile (with old documentId) will be cleaned up
// at the end of the activity.
if (parentInternalId !== githubCodeFile.parentInternalId) {
throw new Error(
`File parentInternalId mismatch for ${connector.id}/${f.documentId}` +
` (expected ${parentInternalId}, got ${githubCodeFile.parentInternalId})`
);
}

const tags = [
`title:${f.fileName}`,
`lasUpdatedAt:${codeSyncStartedAt.getTime()}`,
];
// We update the if the file name, source url or content has changed.
const needsUpdate =
f.fileName !== githubCodeFile.fileName ||
f.sourceUrl !== githubCodeFile.sourceUrl ||
contentHash !== githubCodeFile.contentHash;

if (needsUpdate) {
// Record the parent directories to update their updatedAt.
for (const parentInternalId of f.parents) {
updatedDirectories[parentInternalId] = true;
}

const tags = [
`title:${f.fileName}`,
`lasUpdatedAt:${codeSyncStartedAt.getTime()}`,
];

// Time to upload the file to the data source.
await upsertToDatasource({
dataSourceConfig,
documentId: f.documentId,
documentContent: formatCodeContentForUpsert(f.sourceUrl, content),
documentUrl: f.sourceUrl,
timestampMs: codeSyncStartedAt.getTime(),
tags,
parents: [...f.parents, rootInternalId, repoId.toString()],
retries: 3,
delayBetweenRetriesMs: 1000,
loggerArgs: { ...loggerArgs, provider: "github" },
upsertContext: {
sync_type: isBatchSync ? "batch" : "incremental",
},
});

// Finally update the file.
githubCodeFile.fileName = f.fileName;
githubCodeFile.sourceUrl = f.sourceUrl;
githubCodeFile.contentHash = contentHash;
githubCodeFile.updatedAt = codeSyncStartedAt;
} else {
localLogger.info(
{
repoId,
fileName: f.fileName,
documentId: f.documentId,
},
"Skipping update of unchanged GithubCodeFile"
);
}

// Time to upload the file to the data source.
await upsertToDatasource({
dataSourceConfig,
documentId: f.documentId,
documentContent: formatCodeContentForUpsert(f.sourceUrl, content),
documentUrl: f.sourceUrl,
timestampMs: codeSyncStartedAt.getTime(),
tags,
parents: [...f.parents, rootInternalId, repoId.toString()],
retries: 3,
delayBetweenRetriesMs: 1000,
loggerArgs: { ...loggerArgs, provider: "github" },
upsertContext: {
sync_type: isBatchSync ? "batch" : "incremental",
// Finally we update the lastSeenAt for all files we've seen, and save.
githubCodeFile.lastSeenAt = codeSyncStartedAt;
await githubCodeFile.save();
})
);
await fq.onIdle();

const dq = new PQueue({ concurrency: 8 });
directories.forEach((d) =>
dq.add(async () => {
const parentInternalId = d.parentInternalId || rootInternalId;

// Find directory or create it.
let githubCodeDirectory = await GithubCodeDirectory.findOne({
where: {
connectorId: connector.id,
repoId: repoId.toString(),
internalId: d.internalId,
},
});

// Finally update the file.
githubCodeFile.fileName = f.fileName;
githubCodeFile.sourceUrl = f.sourceUrl;
githubCodeFile.contentHash = contentHash;
githubCodeFile.updatedAt = codeSyncStartedAt;
} else {
localLogger.info(
{
repoId,
fileName: f.fileName,
documentId: f.documentId,
},
"Skipping update of unchanged GithubCodeFile"
);
}

// Finally we update the lastSeenAt for all files we've seen, and save.
githubCodeFile.lastSeenAt = codeSyncStartedAt;
await githubCodeFile.save();
}

for (const d of directories) {
const parentInternalId = d.parentInternalId || rootInternalId;

// Find directory or create it.
let githubCodeDirectory = await GithubCodeDirectory.findOne({
where: {
connectorId: connector.id,
repoId: repoId.toString(),
internalId: d.internalId,
},
});

if (!githubCodeDirectory) {
githubCodeDirectory = await GithubCodeDirectory.create({
connectorId: connector.id,
repoId: repoId.toString(),
internalId: d.internalId,
parentInternalId,
dirName: d.dirName,
sourceUrl: d.sourceUrl,
createdAt: codeSyncStartedAt,
updatedAt: codeSyncStartedAt,
lastSeenAt: codeSyncStartedAt,
});
}
if (!githubCodeDirectory) {
githubCodeDirectory = await GithubCodeDirectory.create({
connectorId: connector.id,
repoId: repoId.toString(),
internalId: d.internalId,
parentInternalId,
dirName: d.dirName,
sourceUrl: d.sourceUrl,
createdAt: codeSyncStartedAt,
updatedAt: codeSyncStartedAt,
lastSeenAt: codeSyncStartedAt,
});
}

// If the parents have updated then the internalId gets updated as well so we should never
// have an udpate to parentInternalId. We check that this is always the case. If the directory
// is moved (the parents change) then it will trigger the creation of a new directory with a
// new internalId and the existing GithubCodeDirectory (with old internalId) will be cleaned
// up at the end of the activity.
if (parentInternalId !== githubCodeDirectory.parentInternalId) {
throw new Error(
`Directory parentInternalId mismatch for ${connector.id}/${d.internalId}` +
` (expected ${parentInternalId}, got ${githubCodeDirectory.parentInternalId})`
);
}
// If the parents have updated then the internalId gets updated as well so we should never
// have an udpate to parentInternalId. We check that this is always the case. If the
// directory is moved (the parents change) then it will trigger the creation of a new
// directory with a new internalId and the existing GithubCodeDirectory (with old
// internalId) will be cleaned up at the end of the activity.
if (parentInternalId !== githubCodeDirectory.parentInternalId) {
throw new Error(
`Directory parentInternalId mismatch for ${connector.id}/${d.internalId}` +
` (expected ${parentInternalId}, got ${githubCodeDirectory.parentInternalId})`
);
}

// If some files were updated as part of the sync, refresh the directory updatedAt.
if (updatedDirectories[d.internalId]) {
githubCodeDirectory.updatedAt = codeSyncStartedAt;
}
// If some files were updated as part of the sync, refresh the directory updatedAt.
if (updatedDirectories[d.internalId]) {
githubCodeDirectory.updatedAt = codeSyncStartedAt;
}

// Update everything else.
githubCodeDirectory.dirName = d.dirName;
githubCodeDirectory.sourceUrl = d.sourceUrl;
githubCodeDirectory.lastSeenAt = codeSyncStartedAt;
// Update everything else.
githubCodeDirectory.dirName = d.dirName;
githubCodeDirectory.sourceUrl = d.sourceUrl;
githubCodeDirectory.lastSeenAt = codeSyncStartedAt;

await githubCodeDirectory.save();
}
await githubCodeDirectory.save();
})
);
await dq.onIdle();

// Final part of the sync, we delete all files and directories that were not seen during the
// sync.
Expand Down

0 comments on commit 5d53396

Please sign in to comment.