diff --git a/connectors/src/admin/cli.ts b/connectors/src/admin/cli.ts index 0bac7a1e6f3a..84623e58c16e 100644 --- a/connectors/src/admin/cli.ts +++ b/connectors/src/admin/cli.ts @@ -24,6 +24,7 @@ import { launchGoogleDriveIncrementalSyncWorkflow, launchGoogleDriveRenewWebhooksWorkflow, } from "@connectors/connectors/google_drive/temporal/client"; +import { searchNotionPagesForQuery } from "@connectors/connectors/notion/lib/cli"; import { QUEUE_NAME } from "@connectors/connectors/notion/temporal/config"; import { upsertPageWorkflow } from "@connectors/connectors/notion/temporal/workflows"; import { uninstallSlack } from "@connectors/connectors/slack"; @@ -45,6 +46,31 @@ import logger from "@connectors/logger/logger"; const { NANGO_SLACK_CONNECTOR_ID } = process.env; +async function getConnectorOrThrow({ + connectorType, + workspaceId, +}: { + connectorType: string; + workspaceId: string; +}): Promise { + if (!workspaceId) { + throw new Error("Missing workspace ID (wId)"); + } + const connector = await Connector.findOne({ + where: { + type: connectorType, + workspaceId: workspaceId, + dataSourceName: "managed-" + connectorType, + }, + }); + if (!connector) { + throw new Error( + `No connector found for ${connectorType} workspace with ID ${workspaceId}` + ); + } + return connector; +} + const connectors = async (command: string, args: parseArgs.ParsedArgs) => { if (!args.wId) { throw new Error("Missing --wId argument"); @@ -436,6 +462,25 @@ const notion = async (command: string, args: parseArgs.ParsedArgs) => { break; } + case "search-pages": { + const { query, wId } = args; + + const connector = await getConnectorOrThrow({ + connectorType: "notion", + workspaceId: wId, + }); + + const pages = await searchNotionPagesForQuery({ + connectorId: connector.id, + connectionId: connector.connectionId, + query, + }); + + console.table(pages); + + break; + } + default: throw new Error("Unknown notion command: " + command); } diff --git a/connectors/src/connectors/notion/lib/cli.ts b/connectors/src/connectors/notion/lib/cli.ts new file mode 100644 index 000000000000..1785320295e0 --- /dev/null +++ b/connectors/src/connectors/notion/lib/cli.ts @@ -0,0 +1,52 @@ +import { ModelId } from "@dust-tt/types"; +import { Client, isFullDatabase, isFullPage } from "@notionhq/client"; +import { Op } from "sequelize"; + +import { getNotionAccessToken } from "@connectors/connectors/notion/temporal/activities"; +import { NotionDatabase } from "@connectors/lib/models/notion"; + +async function listSkippedDatabaseIdsForConnectorId(connectorId: ModelId) { + const skippedDatabases = await NotionDatabase.findAll({ + where: { + connectorId: connectorId, + skipReason: { + [Op.not]: null, + }, + }, + }); + + return new Set(skippedDatabases.map((db) => db.notionDatabaseId)); +} + +export async function searchNotionPagesForQuery({ + connectorId, + connectionId, + query, +}: { + connectorId: ModelId; + connectionId: string; + query: string; +}) { + const notionAccessToken = await getNotionAccessToken(connectionId); + + const notionClient = new Client({ + auth: notionAccessToken, + }); + + const pages = await notionClient.search({ + query, + page_size: 20, + }); + + const skippedDatabaseIds = await listSkippedDatabaseIdsForConnectorId( + connectorId + ); + + return pages.results.map((p) => ({ + id: p.id, + type: p.object, + title: "title" in p ? p.title[0]?.plain_text : "", + isSkipped: p.object === "database" && skippedDatabaseIds.has(p.id), + isFull: p.object === "database" ? isFullDatabase(p) : isFullPage(p), + })); +} diff --git a/connectors/src/connectors/notion/temporal/activities.ts b/connectors/src/connectors/notion/temporal/activities.ts index f7b49ee043e0..83b0610d25f3 100644 --- a/connectors/src/connectors/notion/temporal/activities.ts +++ b/connectors/src/connectors/notion/temporal/activities.ts @@ -1,5 +1,5 @@ import { ModelId } from "@dust-tt/types"; -import { APIResponseError, isFullBlock, isFullPage } from "@notionhq/client"; +import { isFullBlock, isFullPage, isNotionClientError } from "@notionhq/client"; import { Context } from "@temporalio/activity"; import { Op } from "sequelize"; @@ -263,7 +263,7 @@ export async function getPagesAndDatabasesToSync({ skippedDatabaseIds ); } catch (e) { - if (APIResponseError.isAPIResponseError(e)) { + if (isNotionClientError(e)) { // Sometimes a cursor will consistently fail with 500. // In this case, there is not much we can do, so we just give up and move on. // Notion workspaces are resynced daily so nothing is lost forever.