Skip to content

Commit

Permalink
[connectors] feature: Upsert table parents from managed ds (#6627)
Browse files Browse the repository at this point in the history
  • Loading branch information
tdraier authored Aug 6, 2024
1 parent 49cefef commit 697d387
Show file tree
Hide file tree
Showing 21 changed files with 729 additions and 127 deletions.
184 changes: 184 additions & 0 deletions connectors/migrations/20240802_table_parents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import { getGoogleSheetTableId } from "@dust-tt/types";
import { makeScript } from "scripts/helpers";
import { Op } from "sequelize";
import { v4 as uuidv4 } from "uuid";

import { getLocalParents as getGoogleParents } from "@connectors/connectors/google_drive/lib";
import { getParents as getMicrosoftParents } from "@connectors/connectors/microsoft/temporal/file";
import { getParents as getNotionParents } from "@connectors/connectors/notion/lib/parents";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import {
getTable,
updateTableParentsField,
} from "@connectors/lib/data_sources";
import { GoogleDriveSheet } from "@connectors/lib/models/google_drive";
import { MicrosoftNodeModel } from "@connectors/lib/models/microsoft";
import { NotionDatabase } from "@connectors/lib/models/notion";
import type { Logger } from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";

export async function googleTables(
connector: ConnectorResource,
execute: boolean,
logger: Logger
): Promise<void> {
logger.info(`Processing Google Drive connector ${connector.id}`);
const memo = uuidv4();
const csvGoogleSheets = await GoogleDriveSheet.findAll({
where: { connectorId: connector.id },
});
for (const sheet of csvGoogleSheets) {
const { driveFileId, driveSheetId, connectorId } = sheet;

const dataSourceConfig = dataSourceConfigFromConnector(connector);

const tableId = getGoogleSheetTableId(driveFileId, driveSheetId);

const parents = await getGoogleParents(connectorId, tableId, memo);

const table = await getTable({
dataSourceConfig,
tableId,
});
if (table && JSON.stringify(table.parents) !== JSON.stringify(parents)) {
logger.info(`Parents for ${tableId}: ${parents}`);
if (execute) {
await updateTableParentsField({ tableId, parents, dataSourceConfig });
}
}
}
}

export async function microsoftTables(
connector: ConnectorResource,
execute: boolean,
logger: Logger
): Promise<void> {
logger.info(`Processing Microsoft connector ${connector.id}`);
const microsoftSheets = await MicrosoftNodeModel.findAll({
where: {
nodeType: "worksheet",
connectorId: connector.id,
},
});
for (const sheet of microsoftSheets) {
const { internalId, connectorId } = sheet;

const dataSourceConfig = dataSourceConfigFromConnector(connector);

const parents = await getMicrosoftParents({
connectorId,
internalId,
startSyncTs: 0,
});

const table = await getTable({
dataSourceConfig,
tableId: internalId,
});

if (table && JSON.stringify(table.parents) !== JSON.stringify(parents)) {
logger.info(`Parents for ${internalId}: ${parents}`);
if (execute) {
await updateTableParentsField({
tableId: internalId,
parents,
dataSourceConfig,
});
}
}
}
}

export async function notionTables(
connector: ConnectorResource,
execute: boolean,
logger: Logger
): Promise<void> {
logger.info(`Processing Notion connector ${connector.id}`);
const notionDatabases = await NotionDatabase.findAll({
where: {
connectorId: connector.id,
structuredDataUpsertedTs: {
[Op.not]: null,
},
},
});

const memo = uuidv4();

for (const database of notionDatabases) {
const { notionDatabaseId, connectorId } = database;
if (!connectorId) {
continue;
}

const dataSourceConfig = dataSourceConfigFromConnector(connector);
const parents = await getNotionParents(
connectorId as number,
notionDatabaseId as string,
new Set<string>(),
memo
);
const table = await getTable({
dataSourceConfig,
tableId: "notion-" + notionDatabaseId,
});
if (table && JSON.stringify(table.parents) !== JSON.stringify(parents)) {
logger.info(`Parents for notion-${notionDatabaseId}: ${parents}`);
if (execute) {
await updateTableParentsField({
tableId: "notion-" + notionDatabaseId,
parents,
dataSourceConfig,
});
}
}
}
}

export async function handleConnector(
connector: ConnectorResource,
execute: boolean,
logger: Logger
): Promise<void> {
switch (connector.type) {
case "google_drive":
return googleTables(connector, execute, logger);
case "microsoft":
return microsoftTables(connector, execute, logger);
case "notion":
return notionTables(connector, execute, logger);
}
}

makeScript(
{
connectorId: { type: "number", demandOption: false },
},
async ({ connectorId, execute }, logger) => {
if (connectorId) {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
throw new Error(
`Could not find connector for connectorId ${connectorId}`
);
}
await handleConnector(connector, execute, logger);
} else {
for (const connectorType of [
"google_drive",
"microsoft",
"notion",
] as const) {
const connectors = await ConnectorResource.listByType(
connectorType,
{}
);
for (const connector of connectors) {
await handleConnector(connector, execute, logger);
}
}
}
}
);
4 changes: 2 additions & 2 deletions connectors/src/connectors/google_drive/lib/cli.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type {
AdminSuccessResponseType,
GoogleDriveCheckFileResponseType,
CheckFileGenericResponseType,
GoogleDriveCommandType,
} from "@dust-tt/types";
import { googleDriveIncrementalSyncWorkflowId } from "@dust-tt/types";
Expand All @@ -24,7 +24,7 @@ export const google_drive = async ({
command,
args,
}: GoogleDriveCommandType): Promise<
AdminSuccessResponseType | GoogleDriveCheckFileResponseType
AdminSuccessResponseType | CheckFileGenericResponseType
> => {
const logger = topLogger.child({
majorCommand: "google_drive",
Expand Down
27 changes: 21 additions & 6 deletions connectors/src/connectors/google_drive/temporal/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ async function handleFileExport(
maxDocumentLen: number,
localLogger: Logger,
dataSourceConfig: DataSourceConfig,
connectorId: ModelId
connectorId: ModelId,
startSyncTs: number
): Promise<CoreAPIDataSourceDocumentSection | null> {
const drive = await getDriveClient(oauth2client);
let res;
Expand Down Expand Up @@ -157,13 +158,18 @@ async function handleFileExport(
if (file.mimeType === "text/plain") {
result = handleTextFile(res.data, maxDocumentLen);
} else if (file.mimeType === "text/csv") {
const parents = (
await getFileParentsMemoized(connectorId, oauth2client, file, startSyncTs)
).map((f) => f.id);

result = await handleCsvFile({
data: res.data,
file,
maxDocumentLen,
localLogger,
dataSourceConfig,
connectorId,
parents,
});
} else {
result = await handleTextExtraction(res.data, localLogger, file.mimeType);
Expand Down Expand Up @@ -259,7 +265,8 @@ export async function syncOneFile(
file,
localLogger,
dataSourceConfig,
maxDocumentLen
maxDocumentLen,
startSyncTs
);
} else {
return syncOneFileTextDocument(
Expand All @@ -284,15 +291,21 @@ async function syncOneFileTable(
file: GoogleDriveObjectType,
localLogger: Logger,
dataSourceConfig: DataSourceConfig,
maxDocumentLen: number
maxDocumentLen: number,
startSyncTs: number
) {
let skipReason: string | undefined;
const upsertTimestampMs = undefined;

const documentId = getDocumentId(file.id);

if (isGoogleDriveSpreadSheetFile(file)) {
const res = await syncSpreadSheet(oauth2client, connectorId, file);
const res = await syncSpreadSheet(
oauth2client,
connectorId,
file,
startSyncTs
);
if (!res.isSupported) {
return false;
}
Expand All @@ -310,7 +323,8 @@ async function syncOneFileTable(
maxDocumentLen,
localLogger,
dataSourceConfig,
connectorId
connectorId,
startSyncTs
);
}
await updateGoogleDriveFiles(
Expand Down Expand Up @@ -357,7 +371,8 @@ async function syncOneFileTextDocument(
maxDocumentLen,
localLogger,
dataSourceConfig,
connectorId
connectorId,
startSyncTs
);
}
if (documentContent) {
Expand Down
25 changes: 21 additions & 4 deletions connectors/src/connectors/google_drive/temporal/spreadsheets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { sheets_v4 } from "googleapis";
import { google } from "googleapis";
import type { OAuth2Client } from "googleapis-common";

import { getFileParentsMemoized } from "@connectors/connectors/google_drive/lib/hierarchy";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { concurrentExecutor } from "@connectors/lib/async_utils";
import { MAX_FILE_SIZE_TO_DOWNLOAD } from "@connectors/lib/data_sources";
Expand Down Expand Up @@ -47,6 +48,7 @@ async function upsertSheetInDb(connector: ConnectorResource, sheet: Sheet) {
async function upsertTable(
connector: ConnectorResource,
sheet: Sheet,
parents: string[],
rows: string[][],
loggerArgs: object
) {
Expand Down Expand Up @@ -77,6 +79,7 @@ async function upsertTable(
spreadsheetId: spreadsheet.id,
},
truncate: true,
parents: [tableId, ...parents],
});

logger.info(loggerArgs, "[Spreadsheet] Table upserted.");
Expand Down Expand Up @@ -171,7 +174,8 @@ function getValidRows(allRows: string[][], loggerArgs: object): string[][] {

async function processSheet(
connector: ConnectorResource,
sheet: Sheet
sheet: Sheet,
parents: string[]
): Promise<boolean> {
if (!sheet.values) {
return false;
Expand All @@ -196,7 +200,7 @@ async function processSheet(
const rows = await getValidRows(sheet.values, loggerArgs);
// Assuming the first line as headers, at least one additional data line is required.
if (rows.length > 1) {
await upsertTable(connector, sheet, rows, loggerArgs);
await upsertTable(connector, sheet, parents, rows, loggerArgs);

await upsertSheetInDb(connector, sheet);

Expand Down Expand Up @@ -365,7 +369,8 @@ async function getAllSheetsFromSpreadSheet(
export async function syncSpreadSheet(
oauth2client: OAuth2Client,
connectorId: ModelId,
file: GoogleDriveObjectType
file: GoogleDriveObjectType,
startSyncTs: number
): Promise<
| {
isSupported: false;
Expand Down Expand Up @@ -474,9 +479,21 @@ export async function syncSpreadSheet(
},
});

const parents = [
file.id,
...(
await getFileParentsMemoized(
connectorId,
oauth2client,
file,
startSyncTs
)
).map((f) => f.id),
];

const successfulSheetIdImports: number[] = [];
for (const sheet of sheets) {
const isImported = await processSheet(connector, sheet);
const isImported = await processSheet(connector, sheet, parents);
if (isImported) {
successfulSheetIdImports.push(sheet.id);
}
Expand Down
1 change: 0 additions & 1 deletion connectors/src/connectors/microsoft/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ export class MicrosoftConnectorManager extends BaseConnectorManager<null> {
nodesWithPermissions.filter((n) => n.permission === filterPermission)
);
}

return new Ok(nodesWithPermissions);
}

Expand Down
4 changes: 2 additions & 2 deletions connectors/src/connectors/microsoft/lib/cli.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type {
AdminSuccessResponseType,
MicrosoftCheckFileResponseType,
CheckFileGenericResponseType,
MicrosoftCommandType,
} from "@dust-tt/types";
import { googleDriveIncrementalSyncWorkflowId } from "@dust-tt/types";
Expand Down Expand Up @@ -52,7 +52,7 @@ export const microsoft = async ({
command,
args,
}: MicrosoftCommandType): Promise<
AdminSuccessResponseType | MicrosoftCheckFileResponseType
AdminSuccessResponseType | CheckFileGenericResponseType
> => {
switch (command) {
case "garbage-collect-all": {
Expand Down
1 change: 0 additions & 1 deletion connectors/src/connectors/microsoft/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,6 @@ async function updateParentsField({
const parents = await getParents({
connectorId: file.connectorId,
internalId: file.internalId,
parentInternalId: file.parentInternalId,
startSyncTs,
});

Expand Down
Loading

0 comments on commit 697d387

Please sign in to comment.