Skip to content

Commit

Permalink
Merge branch 'main' into alban/assistantPicker-3263
Browse files Browse the repository at this point in the history
  • Loading branch information
albandum authored Jan 24, 2024
2 parents 1db5237 + e7062f2 commit ccad089
Show file tree
Hide file tree
Showing 15 changed files with 230 additions and 161 deletions.
100 changes: 99 additions & 1 deletion connectors/src/connectors/confluence/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ import type { ConfluenceSpaceType } from "@connectors/connectors/confluence/lib/
import {
launchConfluenceRemoveSpacesSyncWorkflow,
launchConfluenceSyncWorkflow,
stopConfluenceSyncWorkflow,
} from "@connectors/connectors/confluence/temporal/client";
import type { ConnectorPermissionRetriever } from "@connectors/connectors/interface";
import { Connector, sequelize_conn } from "@connectors/lib/models";
import {
ConfluenceConfiguration,
ConfluencePage,
ConfluenceSpace,
} from "@connectors/lib/models/confluence";
import { nangoDeleteConnection } from "@connectors/lib/nango_client";
import { getAccessTokenFromNango } from "@connectors/lib/nango_helpers";
import type { Result } from "@connectors/lib/result";
import { Err, Ok } from "@connectors/lib/result";
Expand Down Expand Up @@ -105,6 +108,101 @@ export async function updateConfluenceConnector(
throw new Error("Not implemented");
}

export async function stopConfluenceConnector(
connectorId: string
): Promise<Result<string, Error>> {
// TODO(2024-01-23 flav) Change the prototype to take a ModelId.
const connectorIdAsNumber = parseInt(connectorId, 10);
const res = await stopConfluenceSyncWorkflow(connectorIdAsNumber);
if (res.isErr()) {
return res;
}

return new Ok(connectorId.toString());
}

export async function resumeConfluenceConnector(
connectorId: string
): Promise<Result<string, Error>> {
try {
const connector = await Connector.findOne({
where: {
id: connectorId,
},
});

if (!connector) {
return new Err(
new Error(`Confluence connector not found (connectorId: ${connectorId}`)
);
}

const connectorState = await ConfluenceConfiguration.findOne({
where: {
connectorId: connector.id,
},
});
if (!connectorState) {
return new Err(new Error("Confluence configuration not found"));
}

await launchConfluenceSyncWorkflow(connector.id);

return new Ok(connector.id.toString());
} catch (err) {
return new Err(err as Error);
}
}

export async function cleanupConfluenceConnector(
connectorId: string
): Promise<Result<void, Error>> {
const connector = await Connector.findOne({
where: { type: "confluence", id: connectorId },
});
if (!connector) {
logger.error({ connectorId }, "Confluence connector not found.");
return new Err(new Error("Connector not found"));
}

return sequelize_conn.transaction(async (transaction) => {
await Promise.all([
ConfluenceConfiguration.destroy({
where: {
connectorId: connector.id,
},
transaction,
}),
ConfluenceSpace.destroy({
where: {
connectorId: connector.id,
},
transaction,
}),
ConfluencePage.destroy({
where: {
connectorId: connector.id,
},
transaction,
}),
]);

const nangoRes = await nangoDeleteConnection(
connector.connectionId,
getRequiredNangoConfluenceConnectorId()
);
if (nangoRes.isErr()) {
throw nangoRes.error;
}

await connector.destroy({
transaction,
});

return new Ok(undefined);
});
}

function createConnectorResourceFromSpace(
space: ConfluenceSpace | ConfluenceSpaceType,
baseUrl: string,
Expand Down Expand Up @@ -198,7 +296,7 @@ async function startWorkflowIfNecessary(
workflowLauncher: (
connectorId: ModelId,
spaceIds: string[]
) => Promise<Result<void, Error>>,
) => Promise<Result<string, Error>>,
connectorId: ModelId
): Promise<Result<void, Error>> {
if (spaceIds.length > 0) {
Expand Down
55 changes: 49 additions & 6 deletions connectors/src/connectors/confluence/temporal/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { ModelId, Result } from "@dust-tt/types";
import { Err, Ok } from "@dust-tt/types";
import type { WorkflowHandle } from "@temporalio/client";
import { WorkflowNotFoundError } from "@temporalio/client";

import { QUEUE_NAME } from "@connectors/connectors/confluence/temporal/config";
import type { SpaceUpdatesSignal } from "@connectors/connectors/confluence/temporal/signals";
Expand All @@ -15,12 +17,13 @@ import {
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { Connector } from "@connectors/lib/models";
import { getTemporalClient } from "@connectors/lib/temporal";
import logger from "@connectors/logger/logger";

export async function launchConfluenceSyncWorkflow(
connectorId: ModelId,
spaceIds: string[] = [],
forceUpsert = false
): Promise<Result<undefined, Error>> {
): Promise<Result<string, Error>> {
const connector = await Connector.findByPk(connectorId);
if (!connector) {
throw new Error(`Connector not found. ConnectorId: ${connectorId}`);
Expand All @@ -34,6 +37,8 @@ export async function launchConfluenceSyncWorkflow(
spaceId: sId,
}));

const workflowId = makeConfluenceSyncWorkflowId(connector.id);

// When the workflow is inactive, we omit passing spaceIds as they are only used to signal modifications within a currently active full sync workflow.
try {
await client.workflow.signalWithStart(confluenceSyncWorkflow, {
Expand All @@ -46,7 +51,7 @@ export async function launchConfluenceSyncWorkflow(
},
],
taskQueue: QUEUE_NAME,
workflowId: makeConfluenceSyncWorkflowId(connector.id),
workflowId,
searchAttributes: {
connectorId: [connectorId],
},
Expand All @@ -61,13 +66,13 @@ export async function launchConfluenceSyncWorkflow(
return new Err(err as Error);
}

return new Ok(undefined);
return new Ok(workflowId);
}

export async function launchConfluenceRemoveSpacesSyncWorkflow(
connectorId: ModelId,
spaceIds: string[] = []
) {
): Promise<Result<string, Error>> {
const connector = await Connector.findByPk(connectorId);
if (!connector) {
throw new Error(`Connector not found. ConnectorId: ${connectorId}`);
Expand All @@ -79,6 +84,8 @@ export async function launchConfluenceRemoveSpacesSyncWorkflow(
spaceId: sId,
}));

const workflowId = makeConfluenceRemoveSpacesWorkflowId(connector.id);

try {
await client.workflow.signalWithStart(confluenceRemoveSpacesWorkflow, {
args: [
Expand All @@ -88,7 +95,7 @@ export async function launchConfluenceRemoveSpacesSyncWorkflow(
},
],
taskQueue: QUEUE_NAME,
workflowId: makeConfluenceRemoveSpacesWorkflowId(connector.id),
workflowId,
searchAttributes: {
connectorId: [connectorId],
},
Expand All @@ -102,5 +109,41 @@ export async function launchConfluenceRemoveSpacesSyncWorkflow(
return new Err(err as Error);
}

return new Ok(undefined);
return new Ok(workflowId);
}

export async function stopConfluenceSyncWorkflow(
connectorId: ModelId
): Promise<Result<void, Error>> {
const client = await getTemporalClient();
const connector = await Connector.findByPk(connectorId);
if (!connector) {
throw new Error(
`[Intercom] Connector not found. ConnectorId: ${connectorId}`
);
}

const workflowId = makeConfluenceSyncWorkflowId(connectorId);

try {
const handle: WorkflowHandle<typeof confluenceSyncWorkflow> =
client.workflow.getHandle(workflowId);
try {
await handle.terminate();
} catch (e) {
if (!(e instanceof WorkflowNotFoundError)) {
throw e;
}
}
return new Ok(undefined);
} catch (e) {
logger.error(
{
workflowId,
error: e,
},
"Failed to stop Confluence workflow."
);
return new Err(e as Error);
}
}
22 changes: 11 additions & 11 deletions connectors/src/connectors/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import type { ConnectorProvider } from "@dust-tt/types";

import {
cleanupConfluenceConnector,
createConfluenceConnector,
resumeConfluenceConnector,
retrieveConfluenceConnectorPermissions,
retrieveConfluenceObjectsTitles,
setConfluenceConnectorPermissions,
stopConfluenceConnector,
updateConfluenceConnector,
} from "@connectors/connectors/confluence";
import { launchConfluenceSyncWorkflow } from "@connectors/connectors/confluence/temporal/client";
import {
cleanupGithubConnector,
createGithubConnector,
Expand Down Expand Up @@ -127,9 +131,7 @@ export const STOP_CONNECTOR_BY_TYPE: Record<
ConnectorProvider,
ConnectorStopper
> = {
confluence: () => {
throw new Error("Not yet implemented!");
},
confluence: stopConfluenceConnector,
slack: async (connectorId: string) => {
logger.info({ connectorId }, `Stopping Slack connector is a no-op.`);
return new Ok(connectorId);
Expand All @@ -150,9 +152,7 @@ export const DELETE_CONNECTOR_BY_TYPE: Record<
ConnectorProvider,
ConnectorCleaner
> = {
confluence: () => {
throw new Error("Not yet implemented!");
},
confluence: cleanupConfluenceConnector,
slack: cleanupSlackConnector,
notion: cleanupNotionConnector,
github: cleanupGithubConnector,
Expand All @@ -165,9 +165,7 @@ export const RESUME_CONNECTOR_BY_TYPE: Record<
ConnectorProvider,
ConnectorResumer
> = {
confluence: () => {
throw new Error("Not yet implemented!");
},
confluence: resumeConfluenceConnector,
slack: async (connectorId: string) => {
logger.info({ connectorId }, `Resuming Slack connector is a no-op.`);
return new Ok(connectorId);
Expand All @@ -185,8 +183,10 @@ export const RESUME_CONNECTOR_BY_TYPE: Record<

export const SYNC_CONNECTOR_BY_TYPE: Record<ConnectorProvider, SyncConnector> =
{
confluence: () => {
throw new Error("Not yet implemented!");
confluence: (connectorId: string) => {
// TODO(2024-01-23 flav) Remove once prototype is fixed.
const connectorIdAsNumber = parseInt(connectorId, 10);
return launchConfluenceSyncWorkflow(connectorIdAsNumber);
},
slack: launchSlackSyncWorkflow,
notion: fullResyncNotionConnector,
Expand Down
2 changes: 1 addition & 1 deletion front/components/Button.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export function SignInDropDownButton({
/>
</DropdownMenu.Button>
<DropdownMenu.Items origin="topRight" width={240}>
<div className="flex flex-col gap-2 p-4">
<div className="flex flex-col gap-3 py-4">
<Button
variant="tertiary"
size="md"
Expand Down
19 changes: 15 additions & 4 deletions front/components/assistant/AssistantPicker.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import type {
LightAgentConfigurationType,
WorkspaceType,
} from "@dust-tt/types";
import { isBuilder } from "@dust-tt/types";
import Link from "next/link";
import { useEffect, useState } from "react";

Expand All @@ -26,7 +25,6 @@ export function AssistantPicker({
assistants,
onItemClick,
pickerButton,
showBuilderButtons,
size = "md",
}: {
owner: WorkspaceType;
Expand Down Expand Up @@ -128,7 +126,7 @@ export function AssistantPicker({
<Button
label="Create"
size="xs"
variant="secondary"
variant="primary"
icon={PlusIcon}
className="mr-2"
/>
Expand All @@ -142,7 +140,20 @@ export function AssistantPicker({
/>
</Link>
</div>
)}
}
>
{searchedAssistants.map((c) => (
<Item.Avatar
key={`assistant-picker-${c.sId}`}
label={"@" + c.name}
visual={c.pictureUrl}
hasAction={false}
onClick={() => {
onItemClick(c);
setSearchText("");
}}
/>
))}
</DropdownMenu.Items>
</DropdownMenu>
);
Expand Down
Loading

0 comments on commit ccad089

Please sign in to comment.