From 1c279ef7b5dd3cbaef0d58e5eead84596aa7a76b Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Wed, 24 Jan 2024 17:58:44 +0100 Subject: [PATCH] implement perpetual GC for notion --- .../connectors/notion/temporal/activities.ts | 31 ++++++++++++- .../connectors/notion/temporal/workflows.ts | 45 ++++++++++++++++--- connectors/src/lib/models/notion.ts | 28 ++++++++++-- 3 files changed, 94 insertions(+), 10 deletions(-) diff --git a/connectors/src/connectors/notion/temporal/activities.ts b/connectors/src/connectors/notion/temporal/activities.ts index f7a9ebd7b55ed..ef170eeb48408 100644 --- a/connectors/src/connectors/notion/temporal/activities.ts +++ b/connectors/src/connectors/notion/temporal/activities.ts @@ -970,11 +970,13 @@ export async function cachePage({ connectorId, pageId, runTimestamp, + topLevelWorkflowId, loggerArgs, }: { connectorId: ModelId; pageId: string; runTimestamp: number; + topLevelWorkflowId: string; loggerArgs: Record; }): Promise<{ skipped: boolean; @@ -1031,6 +1033,7 @@ export async function cachePage({ where: { notionPageId: pageId, connectorId: connector.id, + workflowId: topLevelWorkflowId, }, }); if (pageInCache) { @@ -1078,6 +1081,7 @@ export async function cachePage({ createdTime: notionPage.created_time, lastEditedTime: notionPage.last_edited_time, url: notionPage.url, + workflowId: topLevelWorkflowId, }); return { @@ -1092,6 +1096,7 @@ export async function cacheBlockChildren({ cursor, currentIndexInParent, loggerArgs, + topLevelWorkflowId, }: { connectorId: ModelId; pageId: string; @@ -1099,6 +1104,7 @@ export async function cacheBlockChildren({ cursor: string | null; currentIndexInParent: number; loggerArgs: Record; + topLevelWorkflowId: string; }): Promise<{ nextCursor: string | null; blocksWithChildren: string[]; @@ -1181,6 +1187,7 @@ export async function cacheBlockChildren({ notionPageId: pageId, notionBlockId: parsedBlocks.map((b) => b.id), connectorId: connector.id, + workflowId: topLevelWorkflowId, }, attributes: ["notionBlockId"], }); @@ -1229,6 +1236,7 @@ export async function cacheBlockChildren({ indexInParent: currentIndexInParent + i, childDatabaseTitle: block.childDatabaseTitle, connectorId: connector.id, + workflowId: topLevelWorkflowId, }) ) ); @@ -1245,11 +1253,13 @@ export async function cacheDatabaseChildren({ connectorId, databaseId, cursor, + topLevelWorkflowId, loggerArgs, }: { connectorId: ModelId; databaseId: string; cursor: string | null; + topLevelWorkflowId: string; loggerArgs: Record; }): Promise<{ nextCursor: string | null; @@ -1321,6 +1331,7 @@ export async function cacheDatabaseChildren({ where: { notionPageId: resultPage.results.map((r) => r.id), connectorId: connector.id, + workflowId: topLevelWorkflowId, }, attributes: ["notionPageId"], }); @@ -1356,6 +1367,7 @@ export async function cacheDatabaseChildren({ createdTime: page.created_time, lastEditedTime: page.last_edited_time, url: page.url, + workflowId: topLevelWorkflowId, }) ) ); @@ -1501,12 +1513,14 @@ export async function renderAndUpsertPageFromCache({ loggerArgs, runTimestamp, isFullSync, + topLevelWorkflowId, }: { connectorId: ModelId; pageId: string; loggerArgs: Record; runTimestamp: number; isFullSync: boolean; + topLevelWorkflowId: string; }) { const connector = await Connector.findOne({ where: { @@ -1550,6 +1564,7 @@ export async function renderAndUpsertPageFromCache({ where: { notionPageId: pageId, connectorId: connector.id, + workflowId: topLevelWorkflowId, }, }); if (!pageCacheEntry) { @@ -1563,6 +1578,7 @@ export async function renderAndUpsertPageFromCache({ where: { notionPageId: pageId, connectorId: connector.id, + workflowId: topLevelWorkflowId, }, }); @@ -1594,6 +1610,7 @@ export async function renderAndUpsertPageFromCache({ where: { parentId: Object.keys(blocksByParentId), connectorId: connector.id, + workflowId: topLevelWorkflowId, }, }); const childDatabases: Record = {}; @@ -1744,6 +1761,7 @@ export async function renderAndUpsertPageFromCache({ notionId, connectorId: connector.id, resourceType: parentType, + workflowId: topLevelWorkflowId, }); } } @@ -1891,6 +1909,7 @@ export async function renderAndUpsertPageFromCache({ notionId: id, resourceType: "database", connectorId: connector.id, + workflowId: topLevelWorkflowId, }) ), ...pageEntriesToCreate.map((id) => @@ -1898,12 +1917,19 @@ export async function renderAndUpsertPageFromCache({ notionId: id, resourceType: "page", connectorId: connector.id, + workflowId: topLevelWorkflowId, }) ), ]); } -export async function clearConnectorCache(connectorId: ModelId) { +export async function clearConnectorCache({ + connectorId, + topLevelWorkflowId, +}: { + connectorId: ModelId; + topLevelWorkflowId: string; +}) { const connector = await Connector.findOne({ where: { type: "notion", @@ -1923,16 +1949,19 @@ export async function clearConnectorCache(connectorId: ModelId) { await NotionConnectorPageCacheEntry.destroy({ where: { connectorId: connector.id, + workflowId: topLevelWorkflowId, }, }); await NotionConnectorBlockCacheEntry.destroy({ where: { connectorId: connector.id, + workflowId: topLevelWorkflowId, }, }); await NotionConnectorResourcesToCheckCacheEntry.destroy({ where: { connectorId: connector.id, + workflowId: topLevelWorkflowId, }, }); } diff --git a/connectors/src/connectors/notion/temporal/workflows.ts b/connectors/src/connectors/notion/temporal/workflows.ts index a218f39125376..d8e50cd417f9f 100644 --- a/connectors/src/connectors/notion/temporal/workflows.ts +++ b/connectors/src/connectors/notion/temporal/workflows.ts @@ -78,6 +78,8 @@ export async function notionSyncWorkflow({ }) { let iterations = 0; + const topLevelWorkflowId = workflowInfo().workflowId; + let lastSyncedPeriodTs: number | null = startFromTs ? preProcessTimestampForNotion(startFromTs) : null; @@ -94,7 +96,7 @@ export async function notionSyncWorkflow({ } // clear the connector cache before each sync - await clearConnectorCache(connectorId); + await clearConnectorCache({ connectorId, topLevelWorkflowId }); const runTimestamp = Date.now(); @@ -145,6 +147,7 @@ export async function notionSyncWorkflow({ pageIndex, isBatchSync: isInitialSync, queue: childWorkflowQueue, + topLevelWorkflowId, }) ); } while (cursor); @@ -170,6 +173,7 @@ export async function notionSyncWorkflow({ isBatchSync: isInitialSync, queue: childWorkflowQueue, childWorkflowsNameSuffix: "discovered", + topLevelWorkflowId, }); if (!isGarbageCollectionRun) { @@ -211,12 +215,14 @@ export async function upsertPageWorkflow({ runTimestamp, isBatchSync, pageIndex, + topLevelWorkflowId, }: { connectorId: ModelId; pageId: string; runTimestamp: number; isBatchSync: boolean; pageIndex: number; + topLevelWorkflowId: string; }): Promise<{ skipped: boolean; }> { @@ -230,6 +236,7 @@ export async function upsertPageWorkflow({ pageId, loggerArgs, runTimestamp, + topLevelWorkflowId, }); if (skipped) { @@ -249,6 +256,7 @@ export async function upsertPageWorkflow({ cursor, currentIndexInParent: blockIndexInPage, loggerArgs, + topLevelWorkflowId, }); cursor = nextCursor; blockIndexInPage += blocksCount; @@ -261,7 +269,7 @@ export async function upsertPageWorkflow({ searchAttributes: { connectorId: [connectorId], }, - args: [{ connectorId, pageId, blockId: block }], + args: [{ connectorId, pageId, blockId: block, topLevelWorkflowId }], parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE, memo: workflowInfo().memo, }); @@ -274,7 +282,7 @@ export async function upsertPageWorkflow({ searchAttributes: { connectorId: [connectorId], }, - args: [{ connectorId, databaseId }], + args: [{ connectorId, databaseId, topLevelWorkflowId }], parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE, memo: workflowInfo().memo, }); @@ -287,6 +295,7 @@ export async function upsertPageWorkflow({ loggerArgs, runTimestamp, isFullSync: isBatchSync, + topLevelWorkflowId, }); return { @@ -298,10 +307,12 @@ export async function notionProcessBlockChildrenWorkflow({ connectorId, pageId, blockId, + topLevelWorkflowId, }: { connectorId: ModelId; pageId: string; blockId: string; + topLevelWorkflowId: string; }): Promise { const loggerArgs = { connectorId, @@ -318,6 +329,7 @@ export async function notionProcessBlockChildrenWorkflow({ blockId, cursor, currentIndexInParent: blockIndexInParent, + topLevelWorkflowId, loggerArgs, }); cursor = nextCursor; @@ -331,7 +343,7 @@ export async function notionProcessBlockChildrenWorkflow({ searchAttributes: { connectorId: [connectorId], }, - args: [{ connectorId, pageId, blockId: block }], + args: [{ connectorId, pageId, blockId: block, topLevelWorkflowId }], parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE, memo: workflowInfo().memo, }); @@ -344,7 +356,7 @@ export async function notionProcessBlockChildrenWorkflow({ searchAttributes: { connectorId: [connectorId], }, - args: [{ connectorId, databaseId }], + args: [{ connectorId, databaseId, topLevelWorkflowId }], parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE, memo: workflowInfo().memo, }); @@ -355,9 +367,11 @@ export async function notionProcessBlockChildrenWorkflow({ export async function processChildDatabaseWorkflow({ connectorId, databaseId, + topLevelWorkflowId, }: { connectorId: ModelId; databaseId: string; + topLevelWorkflowId: string; }): Promise { const loggerArgs = { connectorId, @@ -370,6 +384,7 @@ export async function processChildDatabaseWorkflow({ databaseId, cursor, loggerArgs, + topLevelWorkflowId, }); cursor = nextCursor; } while (cursor); @@ -380,11 +395,13 @@ export async function syncResultPageWorkflow({ pageIds, runTimestamp, isBatchSync, + topLevelWorkflowId, }: { connectorId: ModelId; pageIds: string[]; runTimestamp: number; isBatchSync: boolean; + topLevelWorkflowId: string; }): Promise { const upsertQueue = new PQueue({ concurrency: MAX_PENDING_UPSERT_ACTIVITIES, @@ -400,7 +417,16 @@ export async function syncResultPageWorkflow({ searchAttributes: { connectorId: [connectorId], }, - args: [{ connectorId, pageId, runTimestamp, isBatchSync, pageIndex }], + args: [ + { + connectorId, + pageId, + runTimestamp, + isBatchSync, + pageIndex, + topLevelWorkflowId, + }, + ], parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE, memo: workflowInfo().memo, }) @@ -417,12 +443,14 @@ export async function syncResultPageDatabaseWorkflow({ runTimestamp, isGarbageCollectionRun, isBatchSync, + topLevelWorkflowId, }: { connectorId: ModelId; databaseIds: string[]; runTimestamp: number; isGarbageCollectionRun: boolean; isBatchSync: boolean; + topLevelWorkflowId: string; }): Promise { const upsertQueue = new PQueue({ concurrency: MAX_PENDING_UPSERT_ACTIVITIES, @@ -483,6 +511,7 @@ export async function syncResultPageDatabaseWorkflow({ isBatchSync, queue: workflowQueue, childWorkflowsNameSuffix: `database-children-${databaseId}`, + topLevelWorkflowId, }); promises.push(upsertsPromise); @@ -502,6 +531,7 @@ async function performUpserts({ isBatchSync, queue, childWorkflowsNameSuffix = "", + topLevelWorkflowId, }: { connectorId: ModelId; pageIds: string[]; @@ -512,6 +542,7 @@ async function performUpserts({ isBatchSync: boolean; queue: PQueue; childWorkflowsNameSuffix?: string; + topLevelWorkflowId: string; }): Promise { let pagesToSync: string[] = []; let databasesToSync: string[] = []; @@ -566,6 +597,7 @@ async function performUpserts({ runTimestamp, isBatchSync, pageIds: batch, + topLevelWorkflowId, }, ], searchAttributes: { @@ -611,6 +643,7 @@ async function performUpserts({ isGarbageCollectionRun, isBatchSync, databaseIds: batch, + topLevelWorkflowId, }, ], searchAttributes: { diff --git a/connectors/src/lib/models/notion.ts b/connectors/src/lib/models/notion.ts index 5fca840032348..679626c9b0198 100644 --- a/connectors/src/lib/models/notion.ts +++ b/connectors/src/lib/models/notion.ts @@ -273,6 +273,8 @@ export class NotionConnectorPageCacheEntry extends Model< declare lastEditedTime: string; declare url: string; + declare workflowId: string; + declare connectorId: ForeignKey; } NotionConnectorPageCacheEntry.init( @@ -333,14 +335,19 @@ NotionConnectorPageCacheEntry.init( type: DataTypes.STRING, allowNull: false, }, + workflowId: { + type: DataTypes.STRING, + allowNull: true, + }, }, { sequelize: sequelize_conn, modelName: "notion_connector_page_cache_entries", indexes: [ - { fields: ["notionPageId", "connectorId"], unique: true }, + { fields: ["notionPageId", "connectorId", "workflowId"], unique: true }, { fields: ["connectorId"] }, { fields: ["parentId"] }, + { fields: ["workflowId"] }, ], } ); @@ -364,6 +371,8 @@ export class NotionConnectorBlockCacheEntry extends Model< // special case for child DBs declare childDatabaseTitle?: string | null; + declare workflowId: string; + declare connectorId: ForeignKey; } NotionConnectorBlockCacheEntry.init( @@ -411,19 +420,24 @@ NotionConnectorBlockCacheEntry.init( type: DataTypes.STRING, allowNull: true, }, + workflowId: { + type: DataTypes.STRING, + allowNull: true, + }, }, { sequelize: sequelize_conn, modelName: "notion_connector_block_cache_entries", indexes: [ { - fields: ["notionBlockId", "connectorId", "notionPageId"], + fields: ["notionBlockId", "connectorId", "notionPageId", "workflowId"], unique: true, name: "uq_notion_block_id_conn_id_page_id", }, { fields: ["connectorId"] }, { fields: ["parentBlockId"] }, { fields: ["notionPageId"] }, + { fields: ["workflowId"] }, ], } ); @@ -439,6 +453,9 @@ export class NotionConnectorResourcesToCheckCacheEntry extends Model< declare notionId: string; declare resourceType: "page" | "database"; + + declare workflowId: string; + declare connectorId: ForeignKey; } NotionConnectorResourcesToCheckCacheEntry.init( @@ -466,17 +483,22 @@ NotionConnectorResourcesToCheckCacheEntry.init( type: DataTypes.STRING, allowNull: false, }, + workflowId: { + type: DataTypes.STRING, + allowNull: true, + }, }, { sequelize: sequelize_conn, modelName: "notion_connector_resources_to_check_cache_entries", indexes: [ { - fields: ["notionId", "connectorId"], + fields: ["notionId", "connectorId", "workflowId"], unique: true, name: "uq_notion_to_check_notion_id_conn_id", }, { fields: ["connectorId"] }, + { fields: ["workflowId"] }, ], } );