Skip to content

Commit

Permalink
rebase main
Browse files Browse the repository at this point in the history
  • Loading branch information
JulesBelveze committed Dec 16, 2024
2 parents 0f68600 + 143e777 commit ef1940d
Show file tree
Hide file tree
Showing 43 changed files with 1,086 additions and 1,155 deletions.
188 changes: 99 additions & 89 deletions connectors/migrations/20241211_fix_gdrive_parents.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getGoogleSheetTableId } from "@dust-tt/types";
import { concurrentExecutor, getGoogleSheetTableId } from "@dust-tt/types";
import { makeScript } from "scripts/helpers";
import { Op } from "sequelize";

Expand All @@ -19,6 +19,8 @@ import logger from "@connectors/logger/logger";
import { ConnectorModel } from "@connectors/resources/storage/models/connector_model";

const QUERY_BATCH_SIZE = 1024;
const DOCUMENT_CONCURRENCY = 16;
const TABLE_CONCURRENCY = 16;

function getParents(
fileId: string | null,
Expand Down Expand Up @@ -88,67 +90,71 @@ async function migrate({
}
);

for (const file of googleDriveFiles) {
const internalId = file.dustFileId;
const driveFileId = file.driveFileId;
const parents = getParents(
file.parentId,
parentsMap,
childLogger.child({ nodeId: driveFileId })
);
if (!parents) {
continue;
}
parents.unshift(driveFileId);

if (file.mimeType === "application/vnd.google-apps.folder") {
const folder = await getFolderNode({
dataSourceConfig,
folderId: internalId,
});
const newParents = parents.map((id) => getDocumentId(id));
if (!folder || folder.parents.join("/") !== newParents.join("/")) {
childLogger.info(
{ folderId: file.driveFileId, parents: newParents },
"Upsert folder"
);

if (execute) {
// upsert repository as folder
await upsertFolderNode({
dataSourceConfig,
folderId: file.dustFileId,
parents: newParents,
parentId: file.parentId ? getDocumentId(file.parentId) : null,
title: file.name,
});
}
await concurrentExecutor(
googleDriveFiles,
async (file) => {
const internalId = file.dustFileId;
const driveFileId = file.driveFileId;
const parents = getParents(
file.parentId,
parentsMap,
childLogger.child({ nodeId: driveFileId })
);
if (!parents) {
return;
}
} else if (file.mimeType === "text/csv") {
const tableId = internalId;
parents.unshift(...parents.map((id) => getDocumentId(id)));
const table = await getTable({ dataSourceConfig, tableId });
if (table) {
if (table.parents.join("/") !== parents.join("/")) {
parents.unshift(driveFileId);

if (file.mimeType === "application/vnd.google-apps.folder") {
const folder = await getFolderNode({
dataSourceConfig,
folderId: internalId,
});
const newParents = parents.map((id) => getDocumentId(id));
if (!folder || folder.parents.join("/") !== newParents.join("/")) {
childLogger.info(
{
tableId,
parents,
previousParents: table.parents,
},
"Update parents for table"
{ folderId: file.driveFileId, parents: newParents },
"Upsert folder"
);

if (execute) {
await updateTableParentsField({
// upsert repository as folder
await upsertFolderNode({
dataSourceConfig,
tableId,
parents,
folderId: file.dustFileId,
parents: newParents,
parentId: file.parentId ? getDocumentId(file.parentId) : null,
title: file.name,
});
}
}
} else if (file.mimeType === "text/csv") {
const tableId = internalId;
parents.unshift(...parents.map((id) => getDocumentId(id)));
const table = await getTable({ dataSourceConfig, tableId });
if (table) {
if (table.parents.join("/") !== parents.join("/")) {
childLogger.info(
{
tableId,
parents,
previousParents: table.parents,
},
"Update parents for table"
);
if (execute) {
await updateTableParentsField({
dataSourceConfig,
tableId,
parents,
});
}
}
}
}
}
}
},
{ concurrency: DOCUMENT_CONCURRENCY }
);

nextId = googleDriveFiles[googleDriveFiles.length - 1]?.id;
} while (nextId);
Expand All @@ -167,45 +173,49 @@ async function migrate({
limit: QUERY_BATCH_SIZE,
});

for (const sheet of googleDriveSheets) {
const tableId = getGoogleSheetTableId(
sheet.driveFileId,
sheet.driveSheetId
);

const parents = getParents(
sheet.driveFileId,
parentsMap,
childLogger.child({ nodeId: tableId })
);
if (!parents) {
continue;
}
await concurrentExecutor(
googleDriveSheets,
async (sheet) => {
const tableId = getGoogleSheetTableId(
sheet.driveFileId,
sheet.driveSheetId
);

parents.unshift(...parents.map((id) => getDocumentId(id)));
parents.unshift(tableId);

const table = await getTable({ dataSourceConfig, tableId });
if (table) {
if (table.parents.join("/") !== parents.join("/")) {
childLogger.info(
{
tableId,
parents,
previousParents: table.parents,
},
"Update parents for table"
);
if (execute) {
await updateTableParentsField({
dataSourceConfig,
tableId,
parents,
});
const parents = getParents(
sheet.driveFileId,
parentsMap,
childLogger.child({ nodeId: tableId })
);
if (!parents) {
return;
}

parents.unshift(...parents.map((id) => getDocumentId(id)));
parents.unshift(tableId);

const table = await getTable({ dataSourceConfig, tableId });
if (table) {
if (table.parents.join("/") !== parents.join("/")) {
childLogger.info(
{
tableId,
parents,
previousParents: table.parents,
},
"Update parents for table"
);
if (execute) {
await updateTableParentsField({
dataSourceConfig,
tableId,
parents,
});
}
}
}
}
}
},
{ concurrency: TABLE_CONCURRENCY }
);

nextId = googleDriveSheets[googleDriveSheets.length - 1]?.id;
} while (nextId);
Expand Down
43 changes: 43 additions & 0 deletions connectors/migrations/20241216_backfill_confluence_folders.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { makeScript } from "scripts/helpers";

import { makeSpaceInternalId } from "@connectors/connectors/confluence/lib/internal_ids";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { concurrentExecutor } from "@connectors/lib/async_utils";
import { upsertFolderNode } from "@connectors/lib/data_sources";
import { ConfluenceSpace } from "@connectors/lib/models/confluence";
import { ConnectorResource } from "@connectors/resources/connector_resource";

const FOLDER_CONCURRENCY = 10;

makeScript({}, async ({ execute }, logger) => {
const connectors = await ConnectorResource.listByType("confluence", {});

for (const connector of connectors) {
const confluenceSpaces = await ConfluenceSpace.findAll({
attributes: ["spaceId", "name"],
where: { connectorId: connector.id },
});
const dataSourceConfig = dataSourceConfigFromConnector(connector);
if (execute) {
await concurrentExecutor(
confluenceSpaces,
async (space) => {
await upsertFolderNode({
dataSourceConfig,
folderId: makeSpaceInternalId(space.spaceId),
parents: [makeSpaceInternalId(space.spaceId)],
title: space.name,
});
},
{ concurrency: FOLDER_CONCURRENCY }
);
logger.info(
`Upserted ${confluenceSpaces.length} spaces for connector ${connector.id}`
);
} else {
logger.info(
`Found ${confluenceSpaces.length} spaces for connector ${connector.id}`
);
}
}
});
95 changes: 95 additions & 0 deletions connectors/migrations/20241216_backfill_zendesk_folders.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { makeScript } from "scripts/helpers";

import { getBrandInternalId } from "@connectors/connectors/zendesk/lib/id_conversions";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { concurrentExecutor } from "@connectors/lib/async_utils";
import { upsertFolderNode } from "@connectors/lib/data_sources";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import {
ZendeskBrandResource,
ZendeskCategoryResource,
} from "@connectors/resources/zendesk_resources";

const FOLDER_CONCURRENCY = 10;

makeScript({}, async ({ execute }, logger) => {
const connectors = await ConnectorResource.listByType("zendesk", {});

for (const connector of connectors) {
const dataSourceConfig = dataSourceConfigFromConnector(connector);
const connectorId = connector.id;

const brands = await ZendeskBrandResource.fetchByConnector(connector);
if (execute) {
await concurrentExecutor(
brands,
async (brand) => {
/// same code as in the connector
const brandInternalId = getBrandInternalId({
connectorId,
brandId: brand.brandId,
});
await upsertFolderNode({
dataSourceConfig,
folderId: brandInternalId,
parents: [brandInternalId],
title: brand.name,
});

const helpCenterNode = brand.getHelpCenterContentNode(connectorId);
await upsertFolderNode({
dataSourceConfig,
folderId: helpCenterNode.internalId,
parents: [
helpCenterNode.internalId,
helpCenterNode.parentInternalId,
],
title: helpCenterNode.title,
});

const ticketsNode = brand.getTicketsContentNode(connectorId);
await upsertFolderNode({
dataSourceConfig,
folderId: ticketsNode.internalId,
parents: [ticketsNode.internalId, ticketsNode.parentInternalId],
title: ticketsNode.title,
});
},
{ concurrency: FOLDER_CONCURRENCY }
);
logger.info(
`Upserted ${brands.length} spaces for connector ${connector.id}`
);
} else {
logger.info(
`Found ${brands.length} spaces for connector ${connector.id}`
);
}

const categories =
await ZendeskCategoryResource.fetchByConnector(connector);
if (execute) {
await concurrentExecutor(
categories,
async (category) => {
/// same code as in the connector
const parents = category.getParentInternalIds(connectorId);
await upsertFolderNode({
dataSourceConfig: dataSourceConfigFromConnector(connector),
folderId: parents[0],
parents,
title: category.name,
});
},
{ concurrency: FOLDER_CONCURRENCY }
);
logger.info(
`Upserted ${brands.length} spaces for connector ${connector.id}`
);
} else {
logger.info(
`Found ${brands.length} spaces for connector ${connector.id}`
);
}
}
});
Loading

0 comments on commit ef1940d

Please sign in to comment.