diff --git a/connectors/src/connectors/microsoft/index.ts b/connectors/src/connectors/microsoft/index.ts index 79d03a70762e..e63fffccde86 100644 --- a/connectors/src/connectors/microsoft/index.ts +++ b/connectors/src/connectors/microsoft/index.ts @@ -171,7 +171,7 @@ export async function fullResyncMicrosoftConnector( fromTs: number | null ) { console.log("fullResyncMicrosoftConnector", connectorId, fromTs); - return launchMicrosoftFullSyncWorkflow(connectorId); + return launchMicrosoftFullSyncWorkflow(connectorId, null); } function getIdFromResource(r: MicrosoftRootResource) { diff --git a/connectors/src/connectors/microsoft/temporal/activities.ts b/connectors/src/connectors/microsoft/temporal/activities.ts new file mode 100644 index 000000000000..9bd4cfc75a4a --- /dev/null +++ b/connectors/src/connectors/microsoft/temporal/activities.ts @@ -0,0 +1,18 @@ +import type { ModelId } from "@dust-tt/types"; + +import logger from "@connectors/logger/logger"; +import type { DataSourceConfig } from "@connectors/types/data_source_config"; + +export async function fullSyncActivity({ + connectorId, + dataSourceConfig, +}: { + connectorId: ModelId; + dataSourceConfig: DataSourceConfig; +}): Promise { + logger.info( + `To implement: full sync for connector ${connectorId} with config ${JSON.stringify( + dataSourceConfig + )}` + ); +} diff --git a/connectors/src/connectors/microsoft/temporal/cast_known_errors.ts b/connectors/src/connectors/microsoft/temporal/cast_known_errors.ts new file mode 100644 index 000000000000..5cad32befaf0 --- /dev/null +++ b/connectors/src/connectors/microsoft/temporal/cast_known_errors.ts @@ -0,0 +1,25 @@ +import type { + ActivityExecuteInput, + ActivityInboundCallsInterceptor, + Next, +} from "@temporalio/worker"; + +export class MicrosoftCastKnownErrorsInterceptor + implements ActivityInboundCallsInterceptor +{ + async execute( + input: ActivityExecuteInput, + next: Next + ): Promise { + try { + return await next(input); + } catch (err: unknown) { + // no error to intercept and cast as of yet + // but it will come very soon (below is to pleas the linter) + if (err instanceof Error) { + throw err; + } + throw err; + } + } +} diff --git a/connectors/src/connectors/microsoft/temporal/client.ts b/connectors/src/connectors/microsoft/temporal/client.ts index a98d0fc1e9d4..edc2277d4215 100644 --- a/connectors/src/connectors/microsoft/temporal/client.ts +++ b/connectors/src/connectors/microsoft/temporal/client.ts @@ -1,15 +1,66 @@ import type { ModelId, Result } from "@dust-tt/types"; -import { Err } from "@dust-tt/types"; +import { Err, Ok } from "@dust-tt/types"; +import { QUEUE_NAME } from "@connectors/connectors/microsoft/temporal/config"; +import { + fullSyncWorkflow, + microsoftFullSyncWorkflowId, +} from "@connectors/connectors/microsoft/temporal/workflow"; +import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; +import { getTemporalClient, terminateWorkflow } from "@connectors/lib/temporal"; +import logger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; export async function launchMicrosoftFullSyncWorkflow( - connectorId: ModelId + connectorId: ModelId, + fromTs: number | null ): Promise> { const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { return new Err(new Error(`Connector ${connectorId} not found`)); } + if (fromTs) { + return new Err( + new Error("Google Drive connector does not support partial resync") + ); + } + + const client = await getTemporalClient(); + + const dataSourceConfig = dataSourceConfigFromConnector(connector); - throw Error("Not implemented"); + const workflowId = microsoftFullSyncWorkflowId(connectorId); + + try { + await terminateWorkflow(workflowId); + await client.workflow.start(fullSyncWorkflow, { + args: [{ connectorId, dataSourceConfig }], + taskQueue: QUEUE_NAME, + workflowId: workflowId, + searchAttributes: { + connectorId: [connectorId], + }, + memo: { + connectorId: connectorId, + }, + }); + logger.info( + { + workspaceId: dataSourceConfig.workspaceId, + workflowId, + }, + `Started workflow.` + ); + return new Ok(workflowId); + } catch (e) { + logger.error( + { + workspaceId: dataSourceConfig.workspaceId, + workflowId, + error: e, + }, + `Failed starting workflow.` + ); + return new Err(e as Error); + } } diff --git a/connectors/src/connectors/microsoft/temporal/worker.ts b/connectors/src/connectors/microsoft/temporal/worker.ts index c0133101e53f..9e4b96ab8606 100644 --- a/connectors/src/connectors/microsoft/temporal/worker.ts +++ b/connectors/src/connectors/microsoft/temporal/worker.ts @@ -1 +1,38 @@ -export async function runMicrosoftWorker() {} +import type { Context } from "@temporalio/activity"; +import { Worker } from "@temporalio/worker"; + +import { MicrosoftCastKnownErrorsInterceptor } from "@connectors/connectors/microsoft/temporal/cast_known_errors"; +import * as sync_status from "@connectors/lib/sync_status"; +import { + getTemporalWorkerConnection, + TEMPORAL_MAXED_CACHED_WORKFLOWS, +} from "@connectors/lib/temporal"; +import { ActivityInboundLogInterceptor } from "@connectors/lib/temporal_monitoring"; +import logger from "@connectors/logger/logger"; + +import * as activities from "./activities"; +import { QUEUE_NAME } from "./config"; + +export async function runMicrosoftWorker() { + const { connection, namespace } = await getTemporalWorkerConnection(); + const workerFullSync = await Worker.create({ + workflowsPath: require.resolve("./workflows"), + activities: { ...activities, ...sync_status }, + taskQueue: QUEUE_NAME, + maxConcurrentActivityTaskExecutions: 15, + connection, + maxCachedWorkflows: TEMPORAL_MAXED_CACHED_WORKFLOWS, + reuseV8Context: true, + namespace, + interceptors: { + activityInbound: [ + (ctx: Context) => { + return new ActivityInboundLogInterceptor(ctx, logger); + }, + () => new MicrosoftCastKnownErrorsInterceptor(), + ], + }, + }); + + await workerFullSync.run(); +} diff --git a/connectors/src/connectors/microsoft/temporal/workflow.ts b/connectors/src/connectors/microsoft/temporal/workflow.ts index fd207de78d9f..13bfe2b964a0 100644 --- a/connectors/src/connectors/microsoft/temporal/workflow.ts +++ b/connectors/src/connectors/microsoft/temporal/workflow.ts @@ -1,6 +1,28 @@ import type { ModelId } from "@dust-tt/types"; +import { proxyActivities } from "@temporalio/workflow"; -export async function microsoftFullSync() {} +import type * as activities from "@connectors/connectors/microsoft/temporal/activities"; +import type * as sync_status from "@connectors/lib/sync_status"; +import type { DataSourceConfig } from "@connectors/types/data_source_config"; + +const { fullSyncActivity } = proxyActivities({ + startToCloseTimeout: "20 minutes", +}); + +const { syncSucceeded } = proxyActivities({ + startToCloseTimeout: "10 minutes", +}); + +export async function fullSyncWorkflow({ + connectorId, + dataSourceConfig, +}: { + connectorId: ModelId; + dataSourceConfig: DataSourceConfig; +}) { + await fullSyncActivity({ connectorId, dataSourceConfig }); + await syncSucceeded(connectorId); +} export function microsoftFullSyncWorkflowId(connectorId: ModelId) { return `microsoft-fullSync-${connectorId}`;