Skip to content

Commit

Permalink
[Microsoft] Fullsync workflow skeleton (#5940)
Browse files Browse the repository at this point in the history
Description
---
This PR adds boilerplate code for full sync workflow, started when the
MS connector is created, with an activity that just emits a log for now

Risk
---
na (gated)

Deploy
---
connectors
  • Loading branch information
philipperolet authored Jun 28, 2024
1 parent f33f80d commit cab4670
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 6 deletions.
2 changes: 1 addition & 1 deletion connectors/src/connectors/microsoft/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export async function fullResyncMicrosoftConnector(
fromTs: number | null
) {
console.log("fullResyncMicrosoftConnector", connectorId, fromTs);
return launchMicrosoftFullSyncWorkflow(connectorId);
return launchMicrosoftFullSyncWorkflow(connectorId, null);
}

function getIdFromResource(r: MicrosoftRootResource) {
Expand Down
18 changes: 18 additions & 0 deletions connectors/src/connectors/microsoft/temporal/activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import type { ModelId } from "@dust-tt/types";

import logger from "@connectors/logger/logger";
import type { DataSourceConfig } from "@connectors/types/data_source_config";

export async function fullSyncActivity({
connectorId,
dataSourceConfig,
}: {
connectorId: ModelId;
dataSourceConfig: DataSourceConfig;
}): Promise<void> {
logger.info(
`To implement: full sync for connector ${connectorId} with config ${JSON.stringify(
dataSourceConfig
)}`
);
}
25 changes: 25 additions & 0 deletions connectors/src/connectors/microsoft/temporal/cast_known_errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type {
ActivityExecuteInput,
ActivityInboundCallsInterceptor,
Next,
} from "@temporalio/worker";

export class MicrosoftCastKnownErrorsInterceptor
implements ActivityInboundCallsInterceptor
{
async execute(
input: ActivityExecuteInput,
next: Next<ActivityInboundCallsInterceptor, "execute">
): Promise<unknown> {
try {
return await next(input);
} catch (err: unknown) {
// no error to intercept and cast as of yet
// but it will come very soon (below is to pleas the linter)
if (err instanceof Error) {
throw err;
}
throw err;
}
}
}
57 changes: 54 additions & 3 deletions connectors/src/connectors/microsoft/temporal/client.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,66 @@
import type { ModelId, Result } from "@dust-tt/types";
import { Err } from "@dust-tt/types";
import { Err, Ok } from "@dust-tt/types";

import { QUEUE_NAME } from "@connectors/connectors/microsoft/temporal/config";
import {
fullSyncWorkflow,
microsoftFullSyncWorkflowId,
} from "@connectors/connectors/microsoft/temporal/workflow";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { getTemporalClient, terminateWorkflow } from "@connectors/lib/temporal";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";

export async function launchMicrosoftFullSyncWorkflow(
connectorId: ModelId
connectorId: ModelId,
fromTs: number | null
): Promise<Result<string, Error>> {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
return new Err(new Error(`Connector ${connectorId} not found`));
}
if (fromTs) {
return new Err(
new Error("Google Drive connector does not support partial resync")
);
}

const client = await getTemporalClient();

const dataSourceConfig = dataSourceConfigFromConnector(connector);

throw Error("Not implemented");
const workflowId = microsoftFullSyncWorkflowId(connectorId);

try {
await terminateWorkflow(workflowId);
await client.workflow.start(fullSyncWorkflow, {
args: [{ connectorId, dataSourceConfig }],
taskQueue: QUEUE_NAME,
workflowId: workflowId,
searchAttributes: {
connectorId: [connectorId],
},
memo: {
connectorId: connectorId,
},
});
logger.info(
{
workspaceId: dataSourceConfig.workspaceId,
workflowId,
},
`Started workflow.`
);
return new Ok(workflowId);
} catch (e) {
logger.error(
{
workspaceId: dataSourceConfig.workspaceId,
workflowId,
error: e,
},
`Failed starting workflow.`
);
return new Err(e as Error);
}
}
39 changes: 38 additions & 1 deletion connectors/src/connectors/microsoft/temporal/worker.ts
Original file line number Diff line number Diff line change
@@ -1 +1,38 @@
export async function runMicrosoftWorker() {}
import type { Context } from "@temporalio/activity";
import { Worker } from "@temporalio/worker";

import { MicrosoftCastKnownErrorsInterceptor } from "@connectors/connectors/microsoft/temporal/cast_known_errors";
import * as sync_status from "@connectors/lib/sync_status";
import {
getTemporalWorkerConnection,
TEMPORAL_MAXED_CACHED_WORKFLOWS,
} from "@connectors/lib/temporal";
import { ActivityInboundLogInterceptor } from "@connectors/lib/temporal_monitoring";
import logger from "@connectors/logger/logger";

import * as activities from "./activities";
import { QUEUE_NAME } from "./config";

export async function runMicrosoftWorker() {
const { connection, namespace } = await getTemporalWorkerConnection();
const workerFullSync = await Worker.create({
workflowsPath: require.resolve("./workflows"),
activities: { ...activities, ...sync_status },
taskQueue: QUEUE_NAME,
maxConcurrentActivityTaskExecutions: 15,
connection,
maxCachedWorkflows: TEMPORAL_MAXED_CACHED_WORKFLOWS,
reuseV8Context: true,
namespace,
interceptors: {
activityInbound: [
(ctx: Context) => {
return new ActivityInboundLogInterceptor(ctx, logger);
},
() => new MicrosoftCastKnownErrorsInterceptor(),
],
},
});

await workerFullSync.run();
}
24 changes: 23 additions & 1 deletion connectors/src/connectors/microsoft/temporal/workflow.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
import type { ModelId } from "@dust-tt/types";
import { proxyActivities } from "@temporalio/workflow";

export async function microsoftFullSync() {}
import type * as activities from "@connectors/connectors/microsoft/temporal/activities";
import type * as sync_status from "@connectors/lib/sync_status";
import type { DataSourceConfig } from "@connectors/types/data_source_config";

const { fullSyncActivity } = proxyActivities<typeof activities>({
startToCloseTimeout: "20 minutes",
});

const { syncSucceeded } = proxyActivities<typeof sync_status>({
startToCloseTimeout: "10 minutes",
});

export async function fullSyncWorkflow({
connectorId,
dataSourceConfig,
}: {
connectorId: ModelId;
dataSourceConfig: DataSourceConfig;
}) {
await fullSyncActivity({ connectorId, dataSourceConfig });
await syncSucceeded(connectorId);
}

export function microsoftFullSyncWorkflowId(connectorId: ModelId) {
return `microsoft-fullSync-${connectorId}`;
Expand Down

0 comments on commit cab4670

Please sign in to comment.