diff --git a/connectors/src/connectors/notion/index.ts b/connectors/src/connectors/notion/index.ts index 65231d733dcb..6a1332a96a1b 100644 --- a/connectors/src/connectors/notion/index.ts +++ b/connectors/src/connectors/notion/index.ts @@ -410,8 +410,8 @@ export async function retrieveNotionConnectorPermissions({ sourceUrl: db.notionUrl || null, expandable: true, permission: "read", - dustDocumentId: null, - lastUpdatedAt: null, + dustDocumentId: `notion-database-${db.notionDatabaseId}`, + lastUpdatedAt: db.structuredDataUpsertedTs?.getTime() ?? null, }; }; diff --git a/connectors/src/connectors/notion/lib/notion_api.ts b/connectors/src/connectors/notion/lib/notion_api.ts index b75ad1e3aa27..40a49c920812 100644 --- a/connectors/src/connectors/notion/lib/notion_api.ts +++ b/connectors/src/connectors/notion/lib/notion_api.ts @@ -1043,7 +1043,9 @@ export function parsePageBlock(block: BlockObjectResponse): ParsedNotionBlock { case "child_database": return { ...commonFields, - text: null, + text: `Child Database: ${ + block.child_database.title ?? "Untitled Database" + }`, childDatabaseTitle: block.child_database.title, }; diff --git a/connectors/src/connectors/notion/temporal/activities.ts b/connectors/src/connectors/notion/temporal/activities.ts index 18d76c7deedb..8cd630229dfb 100644 --- a/connectors/src/connectors/notion/temporal/activities.ts +++ b/connectors/src/connectors/notion/temporal/activities.ts @@ -50,6 +50,8 @@ import { deleteTable, deleteTableRow, MAX_DOCUMENT_TXT_LEN, + MAX_PREFIX_CHARS, + MAX_PREFIX_TOKENS, renderDocumentTitleAndContent, renderPrefixSection, sectionLength, @@ -1245,7 +1247,6 @@ export async function cacheBlockChildren({ }): Promise<{ nextCursor: string | null; blocksWithChildren: string[]; - childDatabases: string[]; blocksCount: number; }> { const connector = await ConnectorResource.fetchById(connectorId); @@ -1269,7 +1270,6 @@ export async function cacheBlockChildren({ nextCursor: null, blocksWithChildren: [], blocksCount: 0, - childDatabases: [], }; } @@ -1300,7 +1300,6 @@ export async function cacheBlockChildren({ nextCursor: null, blocksWithChildren: [], blocksCount: 0, - childDatabases: [], }; } @@ -1342,16 +1341,11 @@ export async function cacheBlockChildren({ .filter((b) => b.hasChildren) .map((b) => b.id); - const childDatabases = parsedBlocks - .filter((b) => b.type === "child_database") - .map((b) => b.id); - localLogger.info( { blocksWithChildrenCount: blocksWithChildren.length, - childDatabasesCount: childDatabases.length, }, - "Found blocks with children and child databases." + "Found blocks with children." ); localLogger.info( @@ -1376,7 +1370,6 @@ export async function cacheBlockChildren({ ); return { - childDatabases, blocksWithChildren, blocksCount: parsedBlocks.length, nextCursor: resultPage.next_cursor, @@ -1730,46 +1723,6 @@ export async function renderAndUpsertPageFromCache({ ]; } - const childDatabaseTitleById = blockCacheEntries - .filter((b) => b.blockType === "child_database") - .map((b) => ({ - id: b.notionBlockId, - title: - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - b.childDatabaseTitle!, - })) - .reduce((acc, { id, title }) => { - acc[id] = title; - return acc; - }, {} as Record); - - localLogger.info( - "notionRenderAndUpsertPageFromCache: Retrieving child database pages from cache." - ); - const childDbPagesCacheEntries = await NotionConnectorPageCacheEntry.findAll({ - where: { - parentId: Object.keys(blocksByParentId), - connectorId: connector.id, - workflowId: topLevelWorkflowId, - }, - }); - const childDatabases: Record = {}; - for (const childDbPageCacheEntry of childDbPagesCacheEntries) { - childDatabases[childDbPageCacheEntry.parentId] = [ - ...(childDatabases[childDbPageCacheEntry.parentId] ?? []), - childDbPageCacheEntry, - ]; - } - const renderedChildDatabases: Record = {}; - for (const [databaseId, pages] of Object.entries(childDatabases)) { - renderedChildDatabases[databaseId] = await renderDatabaseFromPages({ - databaseTitle: childDatabaseTitleById[databaseId] ?? null, - pagesProperties: pages.map( - (p) => JSON.parse(p.pagePropertiesText) as PageObjectProperties - ), - }); - } - localLogger.info("notionRenderAndUpsertPageFromCache: Rendering page."); const renderedPageSection = await renderPageSection({ dsConfig, @@ -1782,11 +1735,16 @@ export async function renderAndUpsertPageFromCache({ // Adding notion properties to the page rendering // We skip the title as it is added separately as prefix to the top-level document section. + let maxPropertyLength = 0; const parsedProperties = parsePageProperties( JSON.parse(pageCacheEntry.pagePropertiesText) as PageObjectProperties ); - for (const p of parsedProperties.filter((p) => p.key !== "title" && p.text)) { + for (const p of parsedProperties.filter((p) => p.key !== "title")) { + if (!p.text) { + continue; + } const propertyContent = `$${p.key}: ${p.text}\n`; + maxPropertyLength = Math.max(maxPropertyLength, p.text.length); renderedPageSection.sections.unshift({ prefix: null, content: propertyContent, @@ -1893,9 +1851,10 @@ export async function renderAndUpsertPageFromCache({ const createdAt = new Date(pageCacheEntry.createdTime); const updatedAt = new Date(pageCacheEntry.lastEditedTime); - if (documentLength === 0) { + if (documentLength === 0 && maxPropertyLength < 256) { localLogger.info( - "notionRenderAndUpsertPageFromCache: Not upserting page without body." + { maxPropertyLength }, + "notionRenderAndUpsertPageFromCache: Not upserting page without body and free text properties." ); } else if (!skipReason) { upsertTs = new Date().getTime(); @@ -1937,7 +1896,6 @@ export async function renderAndUpsertPageFromCache({ updatedTime: updatedAt.getTime(), parsedProperties, }), - parents, loggerArgs, upsertContext: { @@ -2265,7 +2223,10 @@ async function renderPageSection({ // Prefix for depths 0 and 1, and only if children const blockSection = depth < 2 && adaptedBlocksByParentId[b.notionBlockId]?.length - ? await renderPrefixSection(dsConfig, renderedBlock) + ? await renderPrefixSection({ + dataSourceConfig: dsConfig, + prefix: renderedBlock, + }) : { prefix: null, content: renderedBlock, @@ -2307,11 +2268,13 @@ export async function upsertDatabaseStructuredDataFromCache({ connectorId, topLevelWorkflowId, loggerArgs, + runTimestamp, }: { databaseId: string; connectorId: number; topLevelWorkflowId: string; loggerArgs: Record; + runTimestamp: number; }): Promise { const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { @@ -2350,21 +2313,28 @@ export async function upsertDatabaseStructuredDataFromCache({ return; } + const pagesProperties = pageCacheEntries.map( + (p) => JSON.parse(p.pagePropertiesText) as PageObjectProperties + ); + const csv = await renderDatabaseFromPages({ databaseTitle: null, - pagesProperties: pageCacheEntries.map( - (p) => JSON.parse(p.pagePropertiesText) as PageObjectProperties - ), + pagesProperties, dustIdColumn: pageCacheEntries.map((p) => `notion-${p.notionPageId}`), cellSeparator: ",", rowBoundary: "", }); - const { tableId, tableName, tableDescription } = + const { databaseName, tableId, tableName, tableDescription } = getTableInfoFromDatabase(dbModel); + const dataSourceConfig = dataSourceConfigFromConnector(connector); + + const upsertAt = new Date(); + + localLogger.info("Upserting Notion Database as Table."); await upsertTableFromCsv({ - dataSourceConfig: dataSourceConfigFromConnector(connector), + dataSourceConfig, tableId, tableName, tableDescription, @@ -2373,10 +2343,67 @@ export async function upsertDatabaseStructuredDataFromCache({ // We overwrite the whole table since we just fetched all child pages. truncate: true, }); - await dbModel.update({ structuredDataUpsertedTs: new Date() }); + // Same as above, but without the `dustId` column + const csvForDocument = await renderDatabaseFromPages({ + databaseTitle: null, + pagesProperties, + cellSeparator: ",", + rowBoundary: "", + }); + const csvHeader = csvForDocument.split("\n")[0]; + const csvRows = csvForDocument.split("\n").slice(1).join("\n"); + if (csvHeader && csvRows.length) { + const parents = await getParents( + connector.id, + databaseId, + new Set(), + runTimestamp.toString() + ); + localLogger.info("Upserting Notion Database as Document."); + const prefix = `${databaseName}\n${csvHeader}`; + const prefixSection = await renderPrefixSection({ + dataSourceConfig, + prefix, + maxPrefixTokens: MAX_PREFIX_TOKENS * 2, + maxPrefixChars: MAX_PREFIX_CHARS * 2, + }); + if (!prefixSection.content) { + await upsertToDatasource({ + dataSourceConfig, + documentId: `notion-database-${databaseId}`, + documentContent: { + prefix: prefixSection.prefix, + content: csvRows, + sections: [], + }, + documentUrl: dbModel.notionUrl ?? undefined, + // TODO: see if we actually want to use the Notion last edited time of the database + // we currently don't have it because we don't fetch the DB object from notion. + timestampMs: upsertAt.getTime(), + tags: [`title:${databaseName}`, "is_database:true"], + parents: parents, + loggerArgs, + upsertContext: { + sync_type: "batch", + }, + async: true, + }); + } else { + localLogger.info( + { + prefix: prefixSection.prefix, + content: csvRows, + }, + "Skipping document upsert as prefix is too long." + ); + } + } + + await dbModel.update({ structuredDataUpsertedTs: upsertAt }); } function getTableInfoFromDatabase(database: NotionDatabase): { + databaseName: string; tableId: string; tableName: string; tableDescription: string; @@ -2390,5 +2417,5 @@ function getTableInfoFromDatabase(database: NotionDatabase): { ); const tableDescription = `Structured data from Notion Database ${tableName}`; - return { tableId, tableName, tableDescription }; + return { databaseName: name, tableId, tableName, tableDescription }; } diff --git a/connectors/src/connectors/notion/temporal/config.ts b/connectors/src/connectors/notion/temporal/config.ts index 58cde7662d93..06305fc1083d 100644 --- a/connectors/src/connectors/notion/temporal/config.ts +++ b/connectors/src/connectors/notion/temporal/config.ts @@ -1,2 +1,2 @@ -export const WORKFLOW_VERSION = 33; +export const WORKFLOW_VERSION = 34; export const QUEUE_NAME = `notion-queue-v${WORKFLOW_VERSION}`; diff --git a/connectors/src/connectors/notion/temporal/workflows.ts b/connectors/src/connectors/notion/temporal/workflows.ts index 44d530b81a6a..9f9a08396e4a 100644 --- a/connectors/src/connectors/notion/temporal/workflows.ts +++ b/connectors/src/connectors/notion/temporal/workflows.ts @@ -434,7 +434,7 @@ export async function upsertPageChildWorkflow({ let cursor: string | null = null; let blockIndexInPage = 0; do { - const { nextCursor, blocksWithChildren, childDatabases, blocksCount } = + const { nextCursor, blocksWithChildren, blocksCount } = await cacheBlockChildren({ connectorId, pageId, @@ -458,17 +458,6 @@ export async function upsertPageChildWorkflow({ memo: workflowInfo().memo, }); } - for (const databaseId of childDatabases) { - await executeChild(processChildDatabaseChildWorkflow, { - workflowId: `${topLevelWorkflowId}-page-${pageId}-child-database-${databaseId}`, - searchAttributes: { - connectorId: [connectorId], - }, - args: [{ connectorId, databaseId, topLevelWorkflowId }], - parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE, - memo: workflowInfo().memo, - }); - } } while (cursor); await renderAndUpsertPageFromCache({ @@ -504,7 +493,7 @@ export async function notionProcessBlockChildrenChildWorkflow({ let blockIndexInParent = 0; do { - const { nextCursor, blocksWithChildren, childDatabases, blocksCount } = + const { nextCursor, blocksWithChildren, blocksCount } = await cacheBlockChildren({ connectorId, pageId, @@ -528,45 +517,6 @@ export async function notionProcessBlockChildrenChildWorkflow({ memo: workflowInfo().memo, }); } - for (const databaseId of childDatabases) { - await executeChild(processChildDatabaseChildWorkflow, { - workflowId: `${topLevelWorkflowId}-page-${pageId}-child-database-${databaseId}`, - searchAttributes: { - connectorId: [connectorId], - }, - args: [{ connectorId, databaseId, topLevelWorkflowId }], - parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE, - memo: workflowInfo().memo, - }); - } - } while (cursor); -} - -export async function processChildDatabaseChildWorkflow({ - connectorId, - databaseId, - topLevelWorkflowId, -}: { - connectorId: ModelId; - databaseId: string; - topLevelWorkflowId: string; -}): Promise { - const loggerArgs = { - connectorId, - }; - - let cursor: string | null = null; - do { - const { nextCursor } = await fetchDatabaseChildPages({ - connectorId, - databaseId, - cursor, - loggerArgs, - topLevelWorkflowId, - storeInCache: true, - returnUpToDatePageIdsForExistingDatabase: true, - }); - cursor = nextCursor; } while (cursor); } @@ -753,6 +703,7 @@ async function upsertDatabase({ connectorId, topLevelWorkflowId, loggerArgs, + runTimestamp, }) ); diff --git a/connectors/src/lib/data_sources.ts b/connectors/src/lib/data_sources.ts index 28377dace41c..0c7828e023d6 100644 --- a/connectors/src/lib/data_sources.ts +++ b/connectors/src/lib/data_sources.ts @@ -244,20 +244,27 @@ async function _updateDocumentParentsField({ // allows for 4 full prefixes before hitting half of the max chunk size (approx. // 256 chars for 512 token chunks) -const MAX_PREFIX_TOKENS = EMBEDDING_CONFIG.max_chunk_size / 8; +export const MAX_PREFIX_TOKENS = EMBEDDING_CONFIG.max_chunk_size / 8; // Limit on chars to avoid tokenizing too much text uselessly on documents with // large prefixes. The final truncating will rely on MAX_PREFIX_TOKENS so this // limit can be large and should be large to avoid underusing prexfixes -const MAX_PREFIX_CHARS = MAX_PREFIX_TOKENS * 8; +export const MAX_PREFIX_CHARS = MAX_PREFIX_TOKENS * 8; // The role of this function is to create a prefix from an arbitrary long string. The prefix // provided will not be augmented with `\n`, so it should include appropriate carriage return. If // the prefix is too long (> MAX_PREFIX_TOKENS), it will be truncated. The remained will be returned as // content of the resulting section. -export async function renderPrefixSection( - dataSourceConfig: DataSourceConfig, - prefix: string | null -): Promise { +export async function renderPrefixSection({ + dataSourceConfig, + prefix, + maxPrefixTokens = MAX_PREFIX_TOKENS, + maxPrefixChars = MAX_PREFIX_CHARS, +}: { + dataSourceConfig: DataSourceConfig; + prefix: string | null; + maxPrefixTokens?: number; + maxPrefixChars?: number; +}): Promise { if (!prefix || !prefix.trim()) { return { prefix: null, @@ -265,21 +272,19 @@ export async function renderPrefixSection( sections: [], }; } - let targetPrefix = safeSubstring(prefix, 0, MAX_PREFIX_CHARS); + let targetPrefix = safeSubstring(prefix, 0, maxPrefixChars); let targetContent = - prefix.length > MAX_PREFIX_CHARS - ? safeSubstring(prefix, MAX_PREFIX_CHARS) - : ""; + prefix.length > maxPrefixChars ? safeSubstring(prefix, maxPrefixChars) : ""; const tokens = await tokenize(targetPrefix, dataSourceConfig); targetPrefix = tokens - .slice(0, MAX_PREFIX_TOKENS) + .slice(0, maxPrefixTokens) .map((t) => t[1]) .join(""); targetContent = tokens - .slice(MAX_PREFIX_TOKENS) + .slice(maxPrefixTokens) .map((t) => t[1]) .join("") + targetContent; @@ -345,10 +350,10 @@ export async function renderMarkdownSection( throw new Error("Unreachable"); } - const c = await renderPrefixSection( - dsConfig, - toMarkdown(child, { extensions: [gfmToMarkdown()] }) - ); + const c = await renderPrefixSection({ + dataSourceConfig: dsConfig, + prefix: toMarkdown(child, { extensions: [gfmToMarkdown()] }), + }); last.content.sections.push(c); path.push({ depth: child.depth, @@ -405,7 +410,7 @@ export async function renderDocumentTitleAndContent({ } else { title = null; } - const c = await renderPrefixSection(dataSourceConfig, title); + const c = await renderPrefixSection({ dataSourceConfig, prefix: title }); let metaPrefix: string | null = ""; if (createdAt && isValidDate(createdAt)) { metaPrefix += `$createdAt: ${createdAt.toISOString()}\n`;