Skip to content

Commit

Permalink
Flav/monitor active notion temporal workflows (#3152)
Browse files Browse the repository at this point in the history
* Check Notion connectors active temporal workflows

* ✂️

* Stop using environment variables everywhere

* 📚

* Make it run every hour.
  • Loading branch information
flvndvd authored Jan 11, 2024
1 parent 5eaa8dc commit 33573ae
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 35 deletions.
1 change: 1 addition & 0 deletions connectors/src/connectors/notion/temporal/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { ModelId } from "@dust-tt/types";

import { DataSourceInfo } from "@connectors/types/data_source_config";

// Changes made here should be reflected in the production environment checks.
export function getWorkflowId(dataSourceInfo: DataSourceInfo) {
return `workflow-notion-${dataSourceInfo.workspaceId}-${dataSourceInfo.dataSourceName}`;
}
Expand Down
84 changes: 84 additions & 0 deletions front/production_checks/checks/check_notion_active_workflows.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { Client, WorkflowHandle } from "@temporalio/client";
import { QueryTypes } from "sequelize";

import { getTemporalClient } from "@app/lib/temporal";
import { getConnectorReplicaDbConnection } from "@app/production_checks/lib/utils";
import { CheckFunction } from "@app/production_checks/types/check";

interface NotionConnector {
id: number;
dataSourceName: string;
workspaceId: string;
}

export function getWorkflowId(dataSourceInfo: {
workspaceId: string;
dataSourceName: string;
}) {
return `workflow-notion-${dataSourceInfo.workspaceId}-${dataSourceInfo.dataSourceName}`;
}

async function listAllNotionConnectors() {
const connectorsReplica = getConnectorReplicaDbConnection();

const notionConnectors: NotionConnector[] = await connectorsReplica.query(
`SELECT id, "dataSourceName", "workspaceId" FROM connectors WHERE "type" = 'notion'`,
{
type: QueryTypes.SELECT,
}
);

return notionConnectors;
}

async function isTemporalWorkflowRunning(
client: Client,
notionConnector: NotionConnector
) {
try {
const handle: WorkflowHandle = client.workflow.getHandle(
getWorkflowId(notionConnector)
);

const description = await handle.describe();
const { status } = description;

return status.name === "RUNNING";
} catch (err) {
return false;
}
}

export const checkNotionActiveWorkflows: CheckFunction = async (
checkName,
logger,
reportSuccess,
reportFailure,
heartbeat
) => {
const notionConnectors = await listAllNotionConnectors();

const client = await getTemporalClient();

const missingActiveWorkflows: any[] = [];
for (const notionConnector of notionConnectors) {
heartbeat();

const isActive = isTemporalWorkflowRunning(client, notionConnector);
if (!isActive) {
missingActiveWorkflows.push({
connectorId: notionConnector.id,
workspaceId: notionConnector.workspaceId,
});
}
}

if (missingActiveWorkflows.length > 0) {
reportFailure(
{ missingActiveWorkflows },
"Missing Notion temporal workflows"
);
} else {
reportSuccess({});
}
};
25 changes: 7 additions & 18 deletions front/production_checks/checks/managed_data_source_gdrive_gc.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,21 @@
import { QueryTypes, Sequelize } from "sequelize";
import { QueryTypes } from "sequelize";

import { getCoreDocuments } from "@app/production_checks/lib/managed_ds";
import {
getConnectorReplicaDbConnection,
getFrontReplicaDbConnection,
} from "@app/production_checks/lib/utils";
import { CheckFunction } from "@app/production_checks/types/check";

const {
CONNECTORS_DATABASE_READ_REPLICA_URI,
FRONT_DATABASE_READ_REPLICA_URI,
} = process.env;

export const managedDataSourceGCGdriveCheck: CheckFunction = async (
checkName,
logger,
reportSuccess,
reportFailure,
heartbeat
) => {
const connectorsSequelize = new Sequelize(
CONNECTORS_DATABASE_READ_REPLICA_URI as string,
{
logging: false,
}
);
const frontSequelize = new Sequelize(
FRONT_DATABASE_READ_REPLICA_URI as string,
{
logging: false,
}
);
const connectorsSequelize = getConnectorReplicaDbConnection();
const frontSequelize = getFrontReplicaDbConnection();
const GdriveDataSources: { id: number; connectorId: string }[] =
await frontSequelize.query(
`SELECT id, "connectorId" FROM data_sources WHERE "connectorProvider" = 'google_drive'`,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import { Nango } from "@nangohq/node";
import { QueryTypes, Sequelize } from "sequelize";
import { QueryTypes } from "sequelize";

import { getConnectorReplicaDbConnection } from "@app/production_checks/lib/utils";
import { CheckFunction } from "@app/production_checks/types/check";

const {
CONNECTORS_DATABASE_READ_REPLICA_URI,
NANGO_SECRET_KEY,
NANGO_SLACK_CONNECTOR_ID,
} = process.env;
const { NANGO_SECRET_KEY, NANGO_SLACK_CONNECTOR_ID } = process.env;

export const nangoConnectionIdCleanupSlack: CheckFunction = async (
checkName,
Expand All @@ -22,19 +19,9 @@ export const nangoConnectionIdCleanupSlack: CheckFunction = async (
if (!NANGO_SLACK_CONNECTOR_ID) {
throw new Error("Env var NANGO_SLACK_CONNECTOR_ID is not defined");
}
if (!CONNECTORS_DATABASE_READ_REPLICA_URI) {
throw new Error(
"Env var CONNECTORS_DATABASE_READ_REPLICA_URI is not defined"
);
}

// Get all the Slack configurations in the database
const connectorsSequelize = new Sequelize(
CONNECTORS_DATABASE_READ_REPLICA_URI,
{
logging: false,
}
);
const connectorsSequelize = getConnectorReplicaDbConnection();
const dbSlackConfigurationsData: { id: number; slackTeamId: string }[] =
await connectorsSequelize.query(
`SELECT id, "slackTeamId" FROM "slack_configurations"`,
Expand Down
12 changes: 12 additions & 0 deletions front/production_checks/lib/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const {
CONNECTORS_DATABASE_READ_REPLICA_URI,
FRONT_DATABASE_READ_REPLICA_URI,
} = process.env;

const config = {
getConnectorsDatabaseReadReplicaUri: () =>
CONNECTORS_DATABASE_READ_REPLICA_URI,
getFrontDatabaseReadReplicaUri: () => FRONT_DATABASE_READ_REPLICA_URI,
};

export default config;
15 changes: 15 additions & 0 deletions front/production_checks/lib/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Sequelize } from "sequelize";

import config from "@app/production_checks/lib/config";

export function getConnectorReplicaDbConnection() {
return new Sequelize(config.getConnectorsDatabaseReadReplicaUri() as string, {
logging: false,
});
}

export function getFrontReplicaDbConnection() {
return new Sequelize(config.getConnectorsDatabaseReadReplicaUri() as string, {
logging: false,
});
}
6 changes: 6 additions & 0 deletions front/production_checks/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Context } from "@temporalio/activity";
import { v4 as uuidv4 } from "uuid";

import mainLogger from "@app/logger/logger";
import { checkNotionActiveWorkflows } from "@app/production_checks/checks/check_notion_active_workflows";
import { managedDataSourceGCGdriveCheck } from "@app/production_checks/checks/managed_data_source_gdrive_gc";
import { nangoConnectionIdCleanupSlack } from "@app/production_checks/checks/nango_connection_id_cleanup_slack";
import { scrubDeletedCoreDocumentVersionsCheck } from "@app/production_checks/checks/scrub_deleted_core_document_versions";
Expand All @@ -24,6 +25,11 @@ export async function runAllChecksActivity() {
check: scrubDeletedCoreDocumentVersionsCheck,
everyHour: 8,
},
{
name: "check_notion_active_workflows",
check: checkNotionActiveWorkflows,
everyHour: 1,
},
];
await runAllChecks(checks);
}
Expand Down

0 comments on commit 33573ae

Please sign in to comment.