From 33573ae26c70e7876bdf91e90dab6e58c371cac5 Mon Sep 17 00:00:00 2001 From: Flavien David Date: Thu, 11 Jan 2024 15:35:08 +0100 Subject: [PATCH] Flav/monitor active notion temporal workflows (#3152) * Check Notion connectors active temporal workflows * :scissors: * Stop using environment variables everywhere * :books: * Make it run every hour. --- .../src/connectors/notion/temporal/utils.ts | 1 + .../checks/check_notion_active_workflows.ts | 84 +++++++++++++++++++ .../checks/managed_data_source_gdrive_gc.ts | 25 ++---- .../nango_connection_id_cleanup_slack.ts | 21 +---- front/production_checks/lib/config.ts | 12 +++ front/production_checks/lib/utils.ts | 15 ++++ .../production_checks/temporal/activities.ts | 6 ++ 7 files changed, 129 insertions(+), 35 deletions(-) create mode 100644 front/production_checks/checks/check_notion_active_workflows.ts create mode 100644 front/production_checks/lib/config.ts create mode 100644 front/production_checks/lib/utils.ts diff --git a/connectors/src/connectors/notion/temporal/utils.ts b/connectors/src/connectors/notion/temporal/utils.ts index ccf1b70d3067..3d54bc5731cd 100644 --- a/connectors/src/connectors/notion/temporal/utils.ts +++ b/connectors/src/connectors/notion/temporal/utils.ts @@ -2,6 +2,7 @@ import { ModelId } from "@dust-tt/types"; import { DataSourceInfo } from "@connectors/types/data_source_config"; +// Changes made here should be reflected in the production environment checks. export function getWorkflowId(dataSourceInfo: DataSourceInfo) { return `workflow-notion-${dataSourceInfo.workspaceId}-${dataSourceInfo.dataSourceName}`; } diff --git a/front/production_checks/checks/check_notion_active_workflows.ts b/front/production_checks/checks/check_notion_active_workflows.ts new file mode 100644 index 000000000000..2e63dec482cc --- /dev/null +++ b/front/production_checks/checks/check_notion_active_workflows.ts @@ -0,0 +1,84 @@ +import { Client, WorkflowHandle } from "@temporalio/client"; +import { QueryTypes } from "sequelize"; + +import { getTemporalClient } from "@app/lib/temporal"; +import { getConnectorReplicaDbConnection } from "@app/production_checks/lib/utils"; +import { CheckFunction } from "@app/production_checks/types/check"; + +interface NotionConnector { + id: number; + dataSourceName: string; + workspaceId: string; +} + +export function getWorkflowId(dataSourceInfo: { + workspaceId: string; + dataSourceName: string; +}) { + return `workflow-notion-${dataSourceInfo.workspaceId}-${dataSourceInfo.dataSourceName}`; +} + +async function listAllNotionConnectors() { + const connectorsReplica = getConnectorReplicaDbConnection(); + + const notionConnectors: NotionConnector[] = await connectorsReplica.query( + `SELECT id, "dataSourceName", "workspaceId" FROM connectors WHERE "type" = 'notion'`, + { + type: QueryTypes.SELECT, + } + ); + + return notionConnectors; +} + +async function isTemporalWorkflowRunning( + client: Client, + notionConnector: NotionConnector +) { + try { + const handle: WorkflowHandle = client.workflow.getHandle( + getWorkflowId(notionConnector) + ); + + const description = await handle.describe(); + const { status } = description; + + return status.name === "RUNNING"; + } catch (err) { + return false; + } +} + +export const checkNotionActiveWorkflows: CheckFunction = async ( + checkName, + logger, + reportSuccess, + reportFailure, + heartbeat +) => { + const notionConnectors = await listAllNotionConnectors(); + + const client = await getTemporalClient(); + + const missingActiveWorkflows: any[] = []; + for (const notionConnector of notionConnectors) { + heartbeat(); + + const isActive = isTemporalWorkflowRunning(client, notionConnector); + if (!isActive) { + missingActiveWorkflows.push({ + connectorId: notionConnector.id, + workspaceId: notionConnector.workspaceId, + }); + } + } + + if (missingActiveWorkflows.length > 0) { + reportFailure( + { missingActiveWorkflows }, + "Missing Notion temporal workflows" + ); + } else { + reportSuccess({}); + } +}; diff --git a/front/production_checks/checks/managed_data_source_gdrive_gc.ts b/front/production_checks/checks/managed_data_source_gdrive_gc.ts index ea54aab5c83b..9cd904641df4 100644 --- a/front/production_checks/checks/managed_data_source_gdrive_gc.ts +++ b/front/production_checks/checks/managed_data_source_gdrive_gc.ts @@ -1,13 +1,12 @@ -import { QueryTypes, Sequelize } from "sequelize"; +import { QueryTypes } from "sequelize"; import { getCoreDocuments } from "@app/production_checks/lib/managed_ds"; +import { + getConnectorReplicaDbConnection, + getFrontReplicaDbConnection, +} from "@app/production_checks/lib/utils"; import { CheckFunction } from "@app/production_checks/types/check"; -const { - CONNECTORS_DATABASE_READ_REPLICA_URI, - FRONT_DATABASE_READ_REPLICA_URI, -} = process.env; - export const managedDataSourceGCGdriveCheck: CheckFunction = async ( checkName, logger, @@ -15,18 +14,8 @@ export const managedDataSourceGCGdriveCheck: CheckFunction = async ( reportFailure, heartbeat ) => { - const connectorsSequelize = new Sequelize( - CONNECTORS_DATABASE_READ_REPLICA_URI as string, - { - logging: false, - } - ); - const frontSequelize = new Sequelize( - FRONT_DATABASE_READ_REPLICA_URI as string, - { - logging: false, - } - ); + const connectorsSequelize = getConnectorReplicaDbConnection(); + const frontSequelize = getFrontReplicaDbConnection(); const GdriveDataSources: { id: number; connectorId: string }[] = await frontSequelize.query( `SELECT id, "connectorId" FROM data_sources WHERE "connectorProvider" = 'google_drive'`, diff --git a/front/production_checks/checks/nango_connection_id_cleanup_slack.ts b/front/production_checks/checks/nango_connection_id_cleanup_slack.ts index f125f1cf26fc..58ab8dd99241 100644 --- a/front/production_checks/checks/nango_connection_id_cleanup_slack.ts +++ b/front/production_checks/checks/nango_connection_id_cleanup_slack.ts @@ -1,13 +1,10 @@ import { Nango } from "@nangohq/node"; -import { QueryTypes, Sequelize } from "sequelize"; +import { QueryTypes } from "sequelize"; +import { getConnectorReplicaDbConnection } from "@app/production_checks/lib/utils"; import { CheckFunction } from "@app/production_checks/types/check"; -const { - CONNECTORS_DATABASE_READ_REPLICA_URI, - NANGO_SECRET_KEY, - NANGO_SLACK_CONNECTOR_ID, -} = process.env; +const { NANGO_SECRET_KEY, NANGO_SLACK_CONNECTOR_ID } = process.env; export const nangoConnectionIdCleanupSlack: CheckFunction = async ( checkName, @@ -22,19 +19,9 @@ export const nangoConnectionIdCleanupSlack: CheckFunction = async ( if (!NANGO_SLACK_CONNECTOR_ID) { throw new Error("Env var NANGO_SLACK_CONNECTOR_ID is not defined"); } - if (!CONNECTORS_DATABASE_READ_REPLICA_URI) { - throw new Error( - "Env var CONNECTORS_DATABASE_READ_REPLICA_URI is not defined" - ); - } // Get all the Slack configurations in the database - const connectorsSequelize = new Sequelize( - CONNECTORS_DATABASE_READ_REPLICA_URI, - { - logging: false, - } - ); + const connectorsSequelize = getConnectorReplicaDbConnection(); const dbSlackConfigurationsData: { id: number; slackTeamId: string }[] = await connectorsSequelize.query( `SELECT id, "slackTeamId" FROM "slack_configurations"`, diff --git a/front/production_checks/lib/config.ts b/front/production_checks/lib/config.ts new file mode 100644 index 000000000000..95acaf1cc9a7 --- /dev/null +++ b/front/production_checks/lib/config.ts @@ -0,0 +1,12 @@ +const { + CONNECTORS_DATABASE_READ_REPLICA_URI, + FRONT_DATABASE_READ_REPLICA_URI, +} = process.env; + +const config = { + getConnectorsDatabaseReadReplicaUri: () => + CONNECTORS_DATABASE_READ_REPLICA_URI, + getFrontDatabaseReadReplicaUri: () => FRONT_DATABASE_READ_REPLICA_URI, +}; + +export default config; diff --git a/front/production_checks/lib/utils.ts b/front/production_checks/lib/utils.ts new file mode 100644 index 000000000000..913078f55875 --- /dev/null +++ b/front/production_checks/lib/utils.ts @@ -0,0 +1,15 @@ +import { Sequelize } from "sequelize"; + +import config from "@app/production_checks/lib/config"; + +export function getConnectorReplicaDbConnection() { + return new Sequelize(config.getConnectorsDatabaseReadReplicaUri() as string, { + logging: false, + }); +} + +export function getFrontReplicaDbConnection() { + return new Sequelize(config.getConnectorsDatabaseReadReplicaUri() as string, { + logging: false, + }); +} diff --git a/front/production_checks/temporal/activities.ts b/front/production_checks/temporal/activities.ts index 0f825d12061f..e24d7a7ea295 100644 --- a/front/production_checks/temporal/activities.ts +++ b/front/production_checks/temporal/activities.ts @@ -2,6 +2,7 @@ import { Context } from "@temporalio/activity"; import { v4 as uuidv4 } from "uuid"; import mainLogger from "@app/logger/logger"; +import { checkNotionActiveWorkflows } from "@app/production_checks/checks/check_notion_active_workflows"; import { managedDataSourceGCGdriveCheck } from "@app/production_checks/checks/managed_data_source_gdrive_gc"; import { nangoConnectionIdCleanupSlack } from "@app/production_checks/checks/nango_connection_id_cleanup_slack"; import { scrubDeletedCoreDocumentVersionsCheck } from "@app/production_checks/checks/scrub_deleted_core_document_versions"; @@ -24,6 +25,11 @@ export async function runAllChecksActivity() { check: scrubDeletedCoreDocumentVersionsCheck, everyHour: 8, }, + { + name: "check_notion_active_workflows", + check: checkNotionActiveWorkflows, + everyHour: 1, + }, ]; await runAllChecks(checks); }