From 4394a50c42d5c97c6802f9e0cb409ebbdd0d5606 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 4 Apr 2024 17:24:07 +0200 Subject: [PATCH] scrub workflow v2: pause all connectors --- front/scrub_workspace/temporal/activities.ts | 18 +++++++++++- front/scrub_workspace/temporal/client.ts | 4 +-- front/scrub_workspace/temporal/workflows.ts | 31 +++++++++++++++++++- 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/front/scrub_workspace/temporal/activities.ts b/front/scrub_workspace/temporal/activities.ts index 95d2905fcc53..04504aa20f9e 100644 --- a/front/scrub_workspace/temporal/activities.ts +++ b/front/scrub_workspace/temporal/activities.ts @@ -1,4 +1,4 @@ -import { removeNulls } from "@dust-tt/types"; +import { ConnectorsAPI, removeNulls } from "@dust-tt/types"; import { chunk } from "lodash"; import { @@ -72,6 +72,22 @@ export async function scrubWorkspaceData({ await deleteDatasources(auth); } +export async function pauseAllConnectors({ + workspaceId, +}: { + workspaceId: string; +}) { + const auth = await Authenticator.internalAdminForWorkspace(workspaceId); + const dataSources = await getDataSources(auth); + const connectorsApi = new ConnectorsAPI(logger); + for (const ds of dataSources) { + if (!ds.connectorId) { + continue; + } + await connectorsApi.pauseConnector(ds.connectorId); + } +} + async function deleteAllConversations(auth: Authenticator) { const workspace = auth.workspace(); if (!workspace) { diff --git a/front/scrub_workspace/temporal/client.ts b/front/scrub_workspace/temporal/client.ts index 58365ac83faf..e73cf90de696 100644 --- a/front/scrub_workspace/temporal/client.ts +++ b/front/scrub_workspace/temporal/client.ts @@ -3,7 +3,7 @@ import { Err, Ok } from "@dust-tt/types"; import { getTemporalClient } from "@app/lib/temporal"; import logger from "@app/logger/logger"; -import { scheduleWorkspaceScrubWorkflow } from "@app/scrub_workspace/temporal/workflows"; +import { scheduleWorkspaceScrubWorkflowV2 } from "@app/scrub_workspace/temporal/workflows"; import { QUEUE_NAME } from "./config"; @@ -17,7 +17,7 @@ export async function launchScheduleWorkspaceScrubWorkflow({ const workflowId = `schedule-workspace-scrub-${workspaceId}`; try { - await client.workflow.start(scheduleWorkspaceScrubWorkflow, { + await client.workflow.start(scheduleWorkspaceScrubWorkflowV2, { args: [{ workspaceId }], taskQueue: QUEUE_NAME, workflowId: workflowId, diff --git a/front/scrub_workspace/temporal/workflows.ts b/front/scrub_workspace/temporal/workflows.ts index e5f849988333..8775bb02d2b7 100644 --- a/front/scrub_workspace/temporal/workflows.ts +++ b/front/scrub_workspace/temporal/workflows.ts @@ -13,10 +13,14 @@ const { sendDataDeletionEmail } = proxyActivities({ }, }); -const { scrubWorkspaceData } = proxyActivities({ +const { scrubWorkspaceData, pauseAllConnectors } = proxyActivities< + typeof activities +>({ startToCloseTimeout: "60 minutes", }); +// DEPRECATED +// TODO(@fontanierh): remove this workflow once the new one is deployed and no instances are still running. export async function scheduleWorkspaceScrubWorkflow({ workspaceId, }: { @@ -40,3 +44,28 @@ export async function scheduleWorkspaceScrubWorkflow({ await scrubWorkspaceData({ workspaceId }); return true; } + +export async function scheduleWorkspaceScrubWorkflowV2({ + workspaceId, +}: { + workspaceId: string; +}): Promise { + await pauseAllConnectors({ workspaceId }); + await sendDataDeletionEmail({ + remainingDays: 15, + workspaceId, + isLast: false, + }); + await sleep("12 days"); + if (!(await shouldStillScrubData({ workspaceId }))) { + return false; + } + await sendDataDeletionEmail({ remainingDays: 3, workspaceId, isLast: true }); + await sleep("3 days"); + if (!(await shouldStillScrubData({ workspaceId }))) { + return false; + } + + await scrubWorkspaceData({ workspaceId }); + return true; +}