Skip to content

Commit

Permalink
Connectors stop workflow when a OAuth token is revoked (#2109)
Browse files Browse the repository at this point in the history
* Stop a connectors workflow when an OAuth token is revoked

Fix

Add support for error reporting.

* Display connectors error in the managed Data Source UI

* npm run format

* UI part of it

* Clean up

* Addressed comments from @Henry and @spolu

* Adjust error message
  • Loading branch information
lasryaric authored Oct 24, 2023
1 parent 9063566 commit 0bc82c5
Show file tree
Hide file tree
Showing 16 changed files with 243 additions and 58 deletions.
57 changes: 29 additions & 28 deletions connectors/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions connectors/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"@types/lodash.memoize": "^4.1.7",
"@types/minimist": "^1.2.2",
"@types/uuid": "^9.0.2",
"axios": "^1.5.1",
"body-parser": "^1.20.2",
"dd-trace": "^3.16.0",
"eventsource-parser": "^1.0.0",
Expand Down
1 change: 1 addition & 0 deletions connectors/src/api/get_connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const _getConnector = async (
firstSuccessfulSyncTime: connector.firstSuccessfulSyncTime?.getTime(),
firstSyncProgress,
defaultNewResourcePermission: connector.defaultNewResourcePermission,
errorType: connector.errorType || undefined,
});
};

Expand Down
14 changes: 14 additions & 0 deletions connectors/src/connectors/google_drive/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { literal, Op } from "sequelize";

import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { dpdf2text } from "@connectors/lib/dpdf2text";
import { ExternalOauthTokenError } from "@connectors/lib/error";
import {
Connector,
GoogleDriveFiles,
Expand Down Expand Up @@ -861,6 +862,19 @@ export async function renewOneWebhook(webhookId: ModelId) {
});
}
} catch (e) {
if (e instanceof ExternalOauthTokenError) {
logger.info(
{
error: e,
connectorId: wh.connectorId,
workspaceId: connector.workspaceId,
id: wh.id,
},
`Deleting webhook because the oauth token was revoked.`
);
await wh.destroy();
return;
}
logger.error({ error: e }, `Failed to renew webhook`);
const tags = [
`connector_id:${wh.connectorId}`,
Expand Down
4 changes: 4 additions & 0 deletions connectors/src/connectors/google_drive/temporal/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ export async function launchGoogleDriveFullSyncWorkflow(
args: [connectorIdModelId, nangoConnectionId, dataSourceConfig],
taskQueue: "google-queue",
workflowId: workflowId,

memo: {
connectorId: connectorId,
},
});
logger.info(
{
Expand Down
2 changes: 1 addition & 1 deletion connectors/src/lib/data_sources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async function _upsertToDatasource({
);
} catch (e) {
const elapsed = new Date().getTime() - now.getTime();
if (axios.isAxiosError(e) && e.config.data) {
if (axios.isAxiosError(e) && e.config?.data) {
e.config.data = "[REDACTED]";
}
statsDClient.increment("data_source_upserts_error.count", 1, statsDTags);
Expand Down
3 changes: 3 additions & 0 deletions connectors/src/lib/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ export class HTTPError extends Error {
this.statusCode = statusCode;
}
}

// This error is thrown when we are dealing with a revoked OAuth token.
export class ExternalOauthTokenError extends Error {}
6 changes: 6 additions & 0 deletions connectors/src/lib/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
PageObjectProperties,
} from "@connectors/connectors/notion/lib/types";
import {
ConnectorErrorType,
type ConnectorProvider,
ConnectorSyncStatus,
} from "@connectors/types/connector";
Expand Down Expand Up @@ -44,6 +45,7 @@ export class Connector extends Model<
declare dataSourceName: string;

declare lastSyncStatus?: ConnectorSyncStatus;
declare errorType: ConnectorErrorType | null;
declare lastSyncStartTime?: Date;
declare lastSyncFinishTime?: Date;
declare lastSyncSuccessfulTime?: Date;
Expand Down Expand Up @@ -95,6 +97,10 @@ Connector.init(
type: DataTypes.STRING,
allowNull: true,
},
errorType: {
type: DataTypes.STRING,
allowNull: true,
},
lastSyncStartTime: {
type: DataTypes.DATE,
allowNull: true,
Expand Down
43 changes: 41 additions & 2 deletions connectors/src/lib/nango_client.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,53 @@
import { Nango } from "@nangohq/node";
import axios from "axios";

import { ExternalOauthTokenError } from "@connectors/lib/error";

import { Err, Ok, Result } from "./result";

const { NANGO_SECRET_KEY } = process.env;

export function nango_client(): Nango {
class CustomNango extends Nango {
async getConnection(
providerConfigKey: string,
connectionId: string,
forceRefresh?: boolean,
refreshToken?: boolean
) {
try {
return await super.getConnection(
providerConfigKey,
connectionId,
true,
refreshToken
);
} catch (e) {
if (axios.isAxiosError(e)) {
if (e.response?.status === 400) {
if (typeof e?.response?.data?.error === "string") {
const errorText = e.response.data.error;
if (
errorText.includes(
"The external API returned an error when trying to refresh the access token"
) &&
errorText.includes("invalid_grant")
) {
throw new ExternalOauthTokenError();
}
}
}
}

throw e;
}
}
}

export function nango_client() {
if (!NANGO_SECRET_KEY) {
throw new Error("Env var NANGO_SECRET_KEY is not defined");
}
const nango = new Nango({ secretKey: NANGO_SECRET_KEY });
const nango = new CustomNango({ secretKey: NANGO_SECRET_KEY });

return nango;
}
Expand Down
43 changes: 34 additions & 9 deletions connectors/src/lib/sync_status.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
import { Connector, ModelId } from "@connectors/lib/models";
import { Err, Ok, Result } from "@connectors/lib/result";
import { ConnectorSyncStatus } from "@connectors/types/connector";
import {
ConnectorErrorType,
ConnectorSyncStatus,
} from "@connectors/types/connector";

async function syncFinished(
connectorId: ModelId,
status: ConnectorSyncStatus,
finishedAt: Date
): Promise<Result<void, Error>> {
async function syncFinished({
connectorId,
status,
finishedAt,
errorType,
}: {
connectorId: ModelId;
status: ConnectorSyncStatus;
finishedAt: Date;
errorType: ConnectorErrorType | null;
}): Promise<Result<void, Error>> {
const connector = await Connector.findByPk(connectorId);
if (!connector) {
return new Err(new Error("Connector not found"));
}
connector.lastSyncStatus = status;
connector.lastSyncFinishTime = finishedAt;
connector.errorType = errorType;
if (status === "succeeded") {
if (!connector.firstSuccessfulSyncTime) {
connector.firstSuccessfulSyncTime = finishedAt;
Expand Down Expand Up @@ -48,18 +58,33 @@ export async function syncSucceeded(connectorId: ModelId, at?: Date) {
at = new Date();
}

return syncFinished(connectorId, "succeeded", at);
return syncFinished({
connectorId: connectorId,
status: "succeeded",
finishedAt: at,
errorType: null,
});
}

/**
* Signal that a sync has failed.
* This function can be used by the sync worker itself or by the supervisor.
*/
export async function syncFailed(connectorId: ModelId, at?: Date) {
export async function syncFailed(
connectorId: ModelId,
errorMessage: string,
errorType: ConnectorErrorType,
at?: Date
) {
if (!at) {
at = new Date();
}
return syncFinished(connectorId, "failed", new Date());
return syncFinished({
connectorId,
status: "failed",
finishedAt: new Date(),
errorType,
});
}

/**
Expand Down
51 changes: 50 additions & 1 deletion connectors/src/lib/temporal.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import { Client, Connection, ConnectionOptions } from "@temporalio/client";
import {
Client,
Connection,
ConnectionOptions,
WorkflowNotFoundError,
} from "@temporalio/client";
import { NativeConnection } from "@temporalio/worker";
import fs from "fs-extra";

import { ModelId } from "./models";

// This is a singleton connection to the Temporal server.
let TEMPORAL_CLIENT: Client | undefined;
const WORKFLOW_ID2CONNECTOR_ID_CACHE: Record<string, ModelId> = {};

export async function getTemporalClient(): Promise<Client> {
if (TEMPORAL_CLIENT) {
Expand Down Expand Up @@ -65,3 +73,44 @@ export async function getTemporalWorkerConnection(): Promise<{
const connection = await NativeConnection.connect(connectionOptions);
return { connection, namespace: process.env.TEMPORAL_NAMESPACE };
}

export async function getConnectorId(
workflowRunId: string
): Promise<ModelId | null> {
if (!WORKFLOW_ID2CONNECTOR_ID_CACHE[workflowRunId]) {
const client = await getTemporalClient();
const workflowHandle = await client.workflow.getHandle(workflowRunId);
const described = await workflowHandle.describe();
if (described.memo && described.memo.connectorId) {
if (typeof described.memo.connectorId === "number") {
WORKFLOW_ID2CONNECTOR_ID_CACHE[workflowRunId] =
described.memo.connectorId;
} else if (typeof described.memo.connectorId === "string") {
WORKFLOW_ID2CONNECTOR_ID_CACHE[workflowRunId] = parseInt(
described.memo.connectorId,
10
);
}
}
if (!WORKFLOW_ID2CONNECTOR_ID_CACHE[workflowRunId]) {
return null;
}
}
return WORKFLOW_ID2CONNECTOR_ID_CACHE[workflowRunId] || null;
}

export async function cancelWorkflow(workflowId: string) {
const client = await getTemporalClient();
try {
const workflowHandle = await client.workflow.getHandle(workflowId);
await workflowHandle.cancel();

return true;
} catch (e) {
if (!(e instanceof WorkflowNotFoundError)) {
throw e;
}
}

return false;
}
Loading

0 comments on commit 0bc82c5

Please sign in to comment.