Skip to content

Commit

Permalink
Use ModelId for connectorId (#3609)
Browse files Browse the repository at this point in the history
* Use ModelId for stop connectors

* Fix delete connector

* Fix resume connector

* 👕

* ✨

* 👕
  • Loading branch information
flvndvd authored Feb 7, 2024
1 parent 5ab96ae commit ff9cb34
Show file tree
Hide file tree
Showing 21 changed files with 96 additions and 120 deletions.
28 changes: 10 additions & 18 deletions connectors/src/admin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,18 @@ const connectors = async (command: string, args: parseArgs.ParsedArgs) => {

switch (command) {
case "stop": {
await throwOnError(
STOP_CONNECTOR_BY_TYPE[provider](connector.id.toString())
);
await throwOnError(STOP_CONNECTOR_BY_TYPE[provider](connector.id));
return;
}
case "delete": {
await throwOnError(
DELETE_CONNECTOR_BY_TYPE[provider](connector.id.toString(), true)
DELETE_CONNECTOR_BY_TYPE[provider](connector.id, true)
);
await terminateAllWorkflowsForConnectorId(connector.id);
return;
}
case "resume": {
await throwOnError(
RESUME_CONNECTOR_BY_TYPE[provider](connector.id.toString())
);
await throwOnError(RESUME_CONNECTOR_BY_TYPE[provider](connector.id));
return;
}
case "full-resync": {
Expand All @@ -144,12 +140,8 @@ const connectors = async (command: string, args: parseArgs.ParsedArgs) => {
}

case "restart": {
await throwOnError(
STOP_CONNECTOR_BY_TYPE[provider](connector.id.toString())
);
await throwOnError(
RESUME_CONNECTOR_BY_TYPE[provider](connector.id.toString())
);
await throwOnError(STOP_CONNECTOR_BY_TYPE[provider](connector.id));
await throwOnError(RESUME_CONNECTOR_BY_TYPE[provider](connector.id));
return;
}
default:
Expand Down Expand Up @@ -201,7 +193,7 @@ const github = async (command: string, args: parseArgs.ParsedArgs) => {
const repoId = data.id;

await launchGithubCodeSyncWorkflow(
connector.id.toString(),
connector.id,
args.owner,
args.repo,
repoId
Expand Down Expand Up @@ -283,10 +275,10 @@ const notion = async (command: string, args: parseArgs.ParsedArgs) => {
promises.push(
queue.add(async () => {
await throwOnError(
STOP_CONNECTOR_BY_TYPE[connector.type](connector.id.toString())
STOP_CONNECTOR_BY_TYPE[connector.type](connector.id)
);
await throwOnError(
RESUME_CONNECTOR_BY_TYPE[connector.type](connector.id.toString())
RESUME_CONNECTOR_BY_TYPE[connector.type](connector.id)
);
})
);
Expand Down Expand Up @@ -601,7 +593,7 @@ const notion = async (command: string, args: parseArgs.ParsedArgs) => {
{ connectorId: connector.id },
"Stopping notion garbage collector"
);
await stopNotionGarbageCollectorWorkflow(connector.id.toString());
await stopNotionGarbageCollectorWorkflow(connector.id);
}
return;
}
Expand Down Expand Up @@ -689,7 +681,7 @@ const google_drive = async (command: string, args: parseArgs.ParsedArgs) => {
);
}
await throwOnError(
launchGoogleDriveIncrementalSyncWorkflow(connector.id.toString())
launchGoogleDriveIncrementalSyncWorkflow(connector.id)
);
return;
}
Expand Down
4 changes: 2 additions & 2 deletions connectors/src/api/delete_connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const _deleteConnectorAPIHandler = async (

const connectorStopper = STOP_CONNECTOR_BY_TYPE[connector.type];

const stopRes = await connectorStopper(connector.id.toString());
const stopRes = await connectorStopper(connector.id);

if (stopRes.isErr()) {
return apiError(req, res, {
Expand All @@ -61,7 +61,7 @@ const _deleteConnectorAPIHandler = async (
}

const connectorDeleter = DELETE_CONNECTOR_BY_TYPE[connector.type];
const cleanRes = await connectorDeleter(connector.id.toString(), force);
const cleanRes = await connectorDeleter(connector.id, force);
if (cleanRes.isErr()) {
return apiError(req, res, {
status_code: 500,
Expand Down
6 changes: 2 additions & 4 deletions connectors/src/api/resume_connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const _resumeConnectorAPIHandler = async (
}
const connectorResumer = RESUME_CONNECTOR_BY_TYPE[connector.type];

const resumeRes = await connectorResumer(connector.id.toString());
const resumeRes = await connectorResumer(connector.id);

if (resumeRes.isErr()) {
return apiError(req, res, {
Expand All @@ -40,9 +40,7 @@ const _resumeConnectorAPIHandler = async (
});
}

return res.status(200).json({
connectorId: resumeRes.value,
});
return res.sendStatus(204);
} catch (e) {
logger.error(errorFromAny(e), "Failed to resume the connector");
return apiError(req, res, {
Expand Down
6 changes: 2 additions & 4 deletions connectors/src/api/stop_connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const _stopConnectorAPIHandler = async (
}
const connectorStopper = STOP_CONNECTOR_BY_TYPE[connector.type];

const stopRes = await connectorStopper(connector.id.toString());
const stopRes = await connectorStopper(connector.id);

if (stopRes.isErr()) {
return apiError(req, res, {
Expand All @@ -40,9 +40,7 @@ const _stopConnectorAPIHandler = async (
});
}

return res.status(200).json({
connectorId: stopRes.value,
});
return res.sendStatus(204);
} catch (e) {
logger.error(errorFromAny(e), "Failed to stop the connector");
return apiError(req, res, {
Expand Down
7 changes: 1 addition & 6 deletions connectors/src/api/webhooks/webhook_github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,12 +462,7 @@ async function syncCode(
"githubCodeSync: Starting workflow"
);
// We signal the workflow to start the sync of the repo.
await launchGithubCodeSyncWorkflow(
c.id.toString(),
orgLogin,
repoName,
repoId
);
await launchGithubCodeSyncWorkflow(c.id, orgLogin, repoName, repoId);

// And finally update the lastSeenAt. Multiple PR merge can race through that logic but
// since we debounce the code sync workflow 10s this will result in only one actual workflow
Expand Down
2 changes: 1 addition & 1 deletion connectors/src/api/webhooks/webhook_google_drive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const _webhookGoogleDriveAPIHandler = async (
}

const workflowRes = await launchGoogleDriveIncrementalSyncWorkflow(
webhook.connectorId.toString()
webhook.connectorId
);

if (workflowRes.isErr()) {
Expand Down
20 changes: 9 additions & 11 deletions connectors/src/connectors/confluence/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,21 +188,19 @@ export async function updateConfluenceConnector(
}

export async function stopConfluenceConnector(
connectorId: string
): Promise<Result<string, Error>> {
// TODO(2024-01-23 flav) Change the prototype to take a ModelId.
const connectorIdAsNumber = parseInt(connectorId, 10);
const res = await stopConfluenceSyncWorkflow(connectorIdAsNumber);
connectorId: ModelId
): Promise<Result<undefined, Error>> {
const res = await stopConfluenceSyncWorkflow(connectorId);
if (res.isErr()) {
return res;
}

return new Ok(connectorId.toString());
return new Ok(undefined);
}

export async function resumeConfluenceConnector(
connectorId: string
): Promise<Result<string, Error>> {
connectorId: ModelId
): Promise<Result<undefined, Error>> {
try {
const connector = await Connector.findOne({
where: {
Expand All @@ -227,15 +225,15 @@ export async function resumeConfluenceConnector(

await launchConfluenceSyncWorkflow(connector.id, null);

return new Ok(connector.id.toString());
return new Ok(undefined);
} catch (err) {
return new Err(err as Error);
}
}

export async function cleanupConfluenceConnector(
connectorId: string
): Promise<Result<void, Error>> {
connectorId: ModelId
): Promise<Result<undefined, Error>> {
const connector = await Connector.findOne({
where: { type: "confluence", id: connectorId },
});
Expand Down
16 changes: 8 additions & 8 deletions connectors/src/connectors/github/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ export async function updateGithubConnector(
}

export async function stopGithubConnector(
connectorId: string
): Promise<Result<string, Error>> {
connectorId: ModelId
): Promise<Result<undefined, Error>> {
try {
const connector = await Connector.findOne({
where: {
Expand Down Expand Up @@ -147,15 +147,15 @@ export async function stopGithubConnector(
webhooksEnabledAt: null,
});

return new Ok(connector.id.toString());
return new Ok(undefined);
} catch (err) {
return new Err(err as Error);
}
}

export async function resumeGithubConnector(
connectorId: string
): Promise<Result<string, Error>> {
connectorId: ModelId
): Promise<Result<undefined, Error>> {
try {
const connector = await Connector.findOne({
where: {
Expand Down Expand Up @@ -190,7 +190,7 @@ export async function resumeGithubConnector(
syncCodeOnly: false,
});

return new Ok(connector.id.toString());
return new Ok(undefined);
} catch (err) {
return new Err(err as Error);
}
Expand Down Expand Up @@ -218,8 +218,8 @@ export async function fullResyncGithubConnector(
}

export async function cleanupGithubConnector(
connectorId: string
): Promise<Result<void, Error>> {
connectorId: ModelId
): Promise<Result<undefined, Error>> {
return sequelize_conn.transaction(async (transaction) => {
try {
const connector = await Connector.findOne({
Expand Down
4 changes: 2 additions & 2 deletions connectors/src/connectors/github/temporal/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ export async function launchGithubReposSyncWorkflow(
}

export async function launchGithubCodeSyncWorkflow(
connectorId: string,
connectorId: ModelId,
repoLogin: string,
repoName: string,
repoId: number
Expand All @@ -158,7 +158,7 @@ export async function launchGithubCodeSyncWorkflow(
taskQueue: QUEUE_NAME,
workflowId: getCodeSyncWorkflowId(dataSourceConfig, repoId),
searchAttributes: {
connectorId: [parseInt(connectorId)],
connectorId: [connectorId],
},
signal: newWebhookSignal,
signalArgs: undefined,
Expand Down
4 changes: 2 additions & 2 deletions connectors/src/connectors/google_drive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ export async function updateGoogleDriveConnector(
}

export async function cleanupGoogleDriveConnector(
connectorId: string,
connectorId: ModelId,
force = false
): Promise<Result<void, Error>> {
): Promise<Result<undefined, Error>> {
return sequelize_conn.transaction(async (transaction) => {
const connector = await Connector.findByPk(connectorId, {
transaction: transaction,
Expand Down
8 changes: 3 additions & 5 deletions connectors/src/connectors/google_drive/temporal/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export async function launchGoogleDriveFullSyncWorkflow(
}

export async function launchGoogleDriveIncrementalSyncWorkflow(
connectorId: string
connectorId: ModelId
): Promise<Result<string, Error>> {
const connector = await Connector.findByPk(connectorId);
if (!connector) {
Expand All @@ -104,18 +104,16 @@ export async function launchGoogleDriveIncrementalSyncWorkflow(
return new Err(new RateLimitError("Rate limit exceeded"));
}
const client = await getTemporalClient();
const connectorIdModelId = parseInt(connectorId, 10) as ModelId;

const dataSourceConfig = dataSourceConfigFromConnector(connector);

const workflowId = googleDriveIncrementalSyncWorkflowId(connectorId);
try {
await client.workflow.signalWithStart(googleDriveIncrementalSync, {
args: [connectorIdModelId, dataSourceConfig],
args: [connectorId, dataSourceConfig],
taskQueue: QUEUE_NAME,
workflowId: workflowId,
searchAttributes: {
connectorId: [parseInt(connectorId)],
connectorId: [connectorId],
},
signal: newWebhookSignal,
signalArgs: undefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ export async function googleDriveIncrementalSync(
}
}

export function googleDriveIncrementalSyncWorkflowId(connectorId: string) {
export function googleDriveIncrementalSyncWorkflowId(connectorId: ModelId) {
return `googleDrive-IncrementalSync-${connectorId}`;
}

Expand Down
24 changes: 11 additions & 13 deletions connectors/src/connectors/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ConnectorProvider } from "@dust-tt/types";
import type { ConnectorProvider, ModelId } from "@dust-tt/types";

import {
cleanupConfluenceConnector,
Expand Down Expand Up @@ -134,20 +134,18 @@ export const STOP_CONNECTOR_BY_TYPE: Record<
ConnectorStopper
> = {
confluence: stopConfluenceConnector,
slack: async (connectorId: string) => {
slack: async (connectorId: ModelId) => {
logger.info({ connectorId }, `Stopping Slack connector is a no-op.`);
return new Ok(connectorId);
return new Ok(undefined);
},
github: stopGithubConnector,
notion: stopNotionConnector,
google_drive: async (connectorId: string) => {
google_drive: async (connectorId: ModelId) => {
logger.info({ connectorId }, `Stopping Google Drive connector is a no-op.`);
return new Ok(connectorId);
return new Ok(undefined);
},
intercom: stopIntercomConnector,
webcrawler: async (connectorId: string) => {
return stopWebcrawlerConnector(connectorId);
},
webcrawler: stopWebcrawlerConnector,
};

export const DELETE_CONNECTOR_BY_TYPE: Record<
Expand All @@ -168,18 +166,18 @@ export const RESUME_CONNECTOR_BY_TYPE: Record<
ConnectorResumer
> = {
confluence: resumeConfluenceConnector,
slack: async (connectorId: string) => {
slack: async (connectorId: ModelId) => {
logger.info({ connectorId }, `Resuming Slack connector is a no-op.`);
return new Ok(connectorId);
return new Ok(undefined);
},
notion: resumeNotionConnector,
github: resumeGithubConnector,
google_drive: async (connectorId: string) => {
google_drive: async (connectorId: ModelId) => {
throw new Error(`Not implemented ${connectorId}`);
},
intercom: resumeIntercomConnector,
webcrawler: () => {
throw new Error("Not implemented");
webcrawler: (connectorId: ModelId) => {
throw new Error(`Not implemented ${connectorId}`);
},
};

Expand Down
Loading

0 comments on commit ff9cb34

Please sign in to comment.