Skip to content

Commit

Permalink
[connectors] Launch workflows from ZendeskConnectorManager (#8346)
Browse files Browse the repository at this point in the history
* feat: launch sync workflows on creation, permission setting and resume

* feat: implement the sync

* perf: optimize the fetching of brandIds by only selecting the one column

* feat: implement pause/unpause (add a pause workflow)

* refactor: remove the duplicate function stopZendeskSyncWorkflow

* fix: fix ZendeskConfigurationResource fetching

* fix incorrect error message

* refactor

Co-authored-by: Flavien David <[email protected]>

* pass complete type instead of only the connector ID in launchZendeskSyncWorkflow

* fix return type of stopZendeskSyncWorkflow and cleanup stop

* cleanup const connectorId = this.connectorId;

* correctly return errors in resume

* get rid of an unused try catch

* cleanup

* cleanup

* refactor: update the signature of launchZendeskSyncWorkflow

---------

Co-authored-by: Flavien David <[email protected]>
  • Loading branch information
aubin-tchoi and flvndvd authored Oct 31, 2024
1 parent 65a207b commit b1f7ebf
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 46 deletions.
115 changes: 98 additions & 17 deletions connectors/src/connectors/zendesk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import type {
ContentNodesViewType,
Result,
} from "@dust-tt/types";
import { assertNever } from "@dust-tt/types";
import { Err } from "@dust-tt/types";
import { Ok } from "@dust-tt/types";
import { assertNever, Err, Ok } from "@dust-tt/types";

import type { ConnectorManagerError } from "@connectors/connectors/interface";
import { BaseConnectorManager } from "@connectors/connectors/interface";
Expand Down Expand Up @@ -37,13 +35,18 @@ import {
revokeSyncZendeskTickets,
} from "@connectors/connectors/zendesk/lib/ticket_permissions";
import { getZendeskAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token";
import {
launchZendeskSyncWorkflow,
stopZendeskSyncWorkflow,
} from "@connectors/connectors/zendesk/temporal/client";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import { ZendeskConfigurationResource } from "@connectors/resources/zendesk_resources";
import {
ZendeskArticleResource,
ZendeskBrandResource,
ZendeskCategoryResource,
ZendeskConfigurationResource,
ZendeskTicketResource,
} from "@connectors/resources/zendesk_resources";
import type { DataSourceConfig } from "@connectors/types/data_source_config";
Expand All @@ -69,6 +72,20 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
{ subdomain: "d3v-dust", conversationsSlidingWindow: 90 }
);

const workflowStartResult = await launchZendeskSyncWorkflow(connector);

if (workflowStartResult.isErr()) {
await connector.delete();
logger.error(
{
workspaceId: dataSourceConfig.workspaceId,
error: workflowStartResult.error,
},
"[Zendesk] Error launching the sync workflow."
);
throw workflowStartResult.error;
}

return new Ok(connector.id.toString());
}

Expand All @@ -86,15 +103,50 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}

async stop(): Promise<Result<undefined, Error>> {
throw new Error("Method not implemented.");
return stopZendeskSyncWorkflow(this.connectorId);
}

async resume(): Promise<Result<undefined, Error>> {
throw new Error("Method not implemented.");
const { connectorId } = this;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
return new Err(new Error("Connector not found"));
}

const dataSourceConfig = dataSourceConfigFromConnector(connector);
const result = await launchZendeskSyncWorkflow(connector);
if (result.isErr()) {
logger.error(
{
workspaceId: dataSourceConfig.workspaceId,
dataSourceId: dataSourceConfig.dataSourceId,
error: result.error,
},
"[Zendesk] Error resuming the sync workflow."
);
return result;
}
return new Ok(undefined);
}

async sync(): Promise<Result<string, Error>> {
throw new Error("Method not implemented.");
const { connectorId } = this;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
return new Err(new Error("Connector not found"));
}

const brandIds = await ZendeskBrandResource.fetchAllBrandIds({
connectorId,
});
const result = await launchZendeskSyncWorkflow(connector, {
brandIds,
forceResync: true,
});

return result.isErr() ? result : new Ok(connector.id.toString());
}

async retrievePermissions({
Expand All @@ -105,7 +157,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
filterPermission: ConnectorPermission | null;
viewType: ContentNodesViewType;
}): Promise<Result<ContentNode[], Error>> {
const connectorId = this.connectorId;
const { connectorId } = this;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
Expand Down Expand Up @@ -137,7 +189,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}: {
permissions: Record<string, ConnectorPermission>;
}): Promise<Result<void, Error>> {
const connectorId = this.connectorId;
const { connectorId } = this;

const connector = await ConnectorResource.fetchById(this.connectorId);
if (!connector) {
Expand All @@ -146,9 +198,8 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}

const connectionId = connector.connectionId;
const zendeskConfiguration = await ZendeskConfigurationResource.fetchById(
this.connectorId
);
const zendeskConfiguration =
await ZendeskConfigurationResource.fetchByConnectorId(connectorId);
if (!zendeskConfiguration) {
logger.error(
{ connectorId },
Expand Down Expand Up @@ -276,7 +327,19 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}
}

/// Launch a sync workflow here
if (
toBeSignaledBrandIds.size > 0 ||
toBeSignaledTicketsIds.size > 0 ||
toBeSignaledHelpCenterIds.size > 0 ||
toBeSignaledCategoryIds.size > 0
) {
return launchZendeskSyncWorkflow(connector, {
brandIds: [...toBeSignaledBrandIds],
ticketsBrandIds: [...toBeSignaledTicketsIds],
helpCenterBrandIds: [...toBeSignaledHelpCenterIds],
categoryIds: [...toBeSignaledCategoryIds],
});
}

return new Ok(undefined);
}
Expand Down Expand Up @@ -327,7 +390,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}
});

const connectorId = this.connectorId;
const { connectorId } = this;

const allBrandIds = [
...new Set([...brandIds, ...brandTicketsIds, ...brandHelpCenterIds]),
Expand Down Expand Up @@ -371,7 +434,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
internalId: string;
memoizationKey?: string;
}): Promise<Result<string[], Error>> {
const connectorId = this.connectorId;
const { connectorId } = this;

const { type, objectId } = getIdFromInternalId(connectorId, internalId);
switch (type) {
Expand Down Expand Up @@ -469,11 +532,29 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}

async pause(): Promise<Result<undefined, Error>> {
throw new Error("Method not implemented.");
const { connectorId } = this;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
return new Err(new Error("Connector not found"));
}
await connector.markAsPaused();
return this.stop();
}

async unpause(): Promise<Result<undefined, Error>> {
throw new Error("Method not implemented.");
const { connectorId } = this;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
return new Err(new Error("Connector not found"));
}
await connector.markAsUnpaused();

const brandIds = await ZendeskBrandResource.fetchAllBrandIds({
connectorId,
});
return launchZendeskSyncWorkflow(connector, { brandIds });
}

async garbageCollect(): Promise<Result<string, Error>> {
Expand Down
2 changes: 1 addition & 1 deletion connectors/src/connectors/zendesk/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async function _getZendeskConnectorOrRaise(connectorId: ModelId) {

async function _getZendeskConfigurationOrRaise(connectorId: ModelId) {
const configuration =
await ZendeskConfigurationResource.fetchById(connectorId);
await ZendeskConfigurationResource.fetchByConnectorId(connectorId);
if (!configuration) {
throw new Error("[Zendesk] Configuration not found.");
}
Expand Down
51 changes: 23 additions & 28 deletions connectors/src/connectors/zendesk/temporal/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,29 @@ export function getZendeskSyncWorkflowId(connectorId: ModelId): string {
return `zendesk-sync-${connectorId}`;
}

export async function launchZendeskSyncWorkflow({
connectorId,
startFromTs = null,
brandIds = [],
ticketsBrandIds = [],
helpCenterBrandIds = [],
categoryIds = [],
forceResync = false,
}: {
connectorId: ModelId;
startFromTs?: number | null;
brandIds?: number[];
ticketsBrandIds?: number[];
helpCenterBrandIds?: number[];
categoryIds?: number[];
forceResync?: boolean;
}): Promise<Result<string, Error>> {
export async function launchZendeskSyncWorkflow(
connector: ConnectorResource,
{
startFromTs = null,
brandIds = [],
ticketsBrandIds = [],
helpCenterBrandIds = [],
categoryIds = [],
forceResync = false,
}: {
startFromTs?: number | null;
brandIds?: number[];
ticketsBrandIds?: number[];
helpCenterBrandIds?: number[];
categoryIds?: number[];
forceResync?: boolean;
} = {}
): Promise<Result<undefined, Error>> {
if (startFromTs) {
throw new Error("[Zendesk] startFromTs not implemented yet.");
}

const client = await getTemporalClient();
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
throw new Error(
`[Zendesk] Connector not found, connectorId: ${connectorId}`
);
}

const signals: ZendeskUpdateSignal[] = [
...brandIds.map(
Expand Down Expand Up @@ -75,28 +70,28 @@ export async function launchZendeskSyncWorkflow({
),
];

const workflowId = getZendeskSyncWorkflowId(connectorId);
const workflowId = getZendeskSyncWorkflowId(connector.id);
try {
await client.workflow.signalWithStart(zendeskSyncWorkflow, {
args: [{ connectorId: connector.id }],
taskQueue: QUEUE_NAME,
workflowId,
searchAttributes: { connectorId: [connectorId] },
searchAttributes: { connectorId: [connector.id] },
signal: zendeskUpdatesSignal,
signalArgs: [signals],
memo: { connectorId },
memo: { connectorId: connector.id },
cronSchedule: "*/5 * * * *", // Every 5 minutes.
});
} catch (err) {
return new Err(err as Error);
}

return new Ok(workflowId);
return new Ok(undefined);
}

export async function stopZendeskSyncWorkflow(
connectorId: ModelId
): Promise<Result<void, Error>> {
): Promise<Result<undefined, Error>> {
const client = await getTemporalClient();
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
Expand Down
12 changes: 12 additions & 0 deletions connectors/src/resources/zendesk_resources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@ export class ZendeskBrandResource extends BaseResource<ZendeskBrand> {
return brands.map((brand) => new this(this.model, brand.get()));
}

static async fetchAllBrandIds({
connectorId,
}: {
connectorId: number;
}): Promise<number[]> {
const brands = await ZendeskBrand.findAll({
where: { connectorId },
attributes: ["brandId"],
});
return brands.map((brand) => brand.brandId);
}

static async fetchAllWithHelpCenter({
connectorId,
}: {
Expand Down

0 comments on commit b1f7ebf

Please sign in to comment.