Skip to content

Commit

Permalink
Handle large Confluence workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
flvndvd committed Mar 8, 2024
1 parent 1510ce9 commit 370306b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
18 changes: 12 additions & 6 deletions connectors/src/connectors/confluence/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,12 @@ export async function confluenceGetTopLevelPageIdsActivity({
pageCursor
);

localLogger.info("Found Confluence top-level pages in space.", {
topLevelPagesCount: childPageIds.length,
});
localLogger.info(
{
topLevelPagesCount: childPageIds.length,
},
"Found Confluence top-level pages in space."
);

return { topLevelPageIds: childPageIds, nextPageCursor };
}
Expand Down Expand Up @@ -554,9 +557,12 @@ export async function confluenceRemoveSpaceActivity(
},
});

localLogger.info("Delete Confluence space", {
numberOfPages: allPages.length,
});
localLogger.info(
{
numberOfPages: allPages.length,
},
"Delete Confluence space"
);

for (const page of allPages) {
await deletePage(connectorId, page.pageId, dataSourceConfig);
Expand Down
19 changes: 15 additions & 4 deletions connectors/src/connectors/confluence/temporal/workflows.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { ModelId } from "@dust-tt/types";
import type { WorkflowInfo } from "@temporalio/workflow";
import {
continueAsNew,
executeChild,
proxyActivities,
setHandler,
Expand Down Expand Up @@ -39,6 +40,8 @@ const {
startToCloseTimeout: "20 minutes",
});

const MAX_HISTORY_LENGTH = 30_000;

export async function confluenceSyncWorkflow({
connectorId,
spaceIdsToBrowse,
Expand Down Expand Up @@ -205,7 +208,7 @@ export async function confluenceSpaceSyncWorkflow(
spaceName,
confluenceCloudId,
visitedAtMs,
topLevelPageId: pageId,
topLevelPageIds: [pageId],
},
],
memo,
Expand All @@ -232,7 +235,7 @@ interface confluenceSyncTopLevelChildPagesWorkflowInput {
isBatchSync: boolean;
spaceId: string;
spaceName: string;
topLevelPageId: string;
topLevelPageIds: string[];
visitedAtMs: number;
}

Expand All @@ -245,8 +248,8 @@ interface confluenceSyncTopLevelChildPagesWorkflowInput {
export async function confluenceSyncTopLevelChildPagesWorkflow(
params: confluenceSyncTopLevelChildPagesWorkflowInput
) {
const { spaceName, topLevelPageId, visitedAtMs } = params;
const stack = [topLevelPageId];
const { spaceName, topLevelPageIds, visitedAtMs } = params;
const stack = [...topLevelPageIds];

while (stack.length > 0) {
const currentPageId = stack.pop();
Expand Down Expand Up @@ -278,6 +281,14 @@ export async function confluenceSyncTopLevelChildPagesWorkflow(

stack.push(...childPageIds);
} while (nextPageCursor !== null);

// If additional pages are pending and workflow limits are reached, continue in a new workflow.
if (stack.length > 0 && workflowInfo().historyLength > MAX_HISTORY_LENGTH) {
await continueAsNew<typeof confluenceSyncTopLevelChildPagesWorkflow>({
...params,
topLevelPageIds: stack,
});
}
}
}

Expand Down

0 comments on commit 370306b

Please sign in to comment.