diff --git a/connectors/src/connectors/zendesk/lib/errors.ts b/connectors/src/connectors/zendesk/lib/errors.ts index 03b6a9d5062e..daff7d7b8afd 100644 --- a/connectors/src/connectors/zendesk/lib/errors.ts +++ b/connectors/src/connectors/zendesk/lib/errors.ts @@ -7,6 +7,11 @@ interface NodeZendeskError extends Error { result: string | null; } +interface ZendeskApiError extends Error { + status: number; + description: string | null; +} + export function isNodeZendeskForbiddenError( err: unknown ): err is NodeZendeskError { @@ -17,3 +22,26 @@ export function isNodeZendeskForbiddenError( err.statusCode === 403 ); } + +export function isZendeskExpiredCursorError( + err: unknown +): err is ZendeskApiError { + return ( + typeof err === "object" && + err !== null && + "status" in err && + err.status === 422 && + "description" in err && + typeof err.description === "string" && + err.description.includes("Invalid search: cursor has expired") + ); +} + +export function isZendeskEpipeError(err: unknown): err is NodeZendeskError { + return ( + typeof err === "object" && + err !== null && + "code" in err && + err.code === "EPIPE" + ); +} diff --git a/connectors/src/connectors/zendesk/lib/zendesk_api.ts b/connectors/src/connectors/zendesk/lib/zendesk_api.ts index 2c39c4f63663..ce571f1a48d4 100644 --- a/connectors/src/connectors/zendesk/lib/zendesk_api.ts +++ b/connectors/src/connectors/zendesk/lib/zendesk_api.ts @@ -5,11 +5,9 @@ import { createClient } from "node-zendesk"; import type { ZendeskFetchedArticle, ZendeskFetchedCategory, - ZendeskFetchedSection, ZendeskFetchedTicket, ZendeskFetchedUser, } from "@connectors/@types/node-zendesk"; -import { isNodeZendeskForbiddenError } from "@connectors/connectors/zendesk/lib/errors"; import { ExternalOAuthTokenError } from "@connectors/lib/error"; import logger from "@connectors/logger/logger"; import type { ZendeskCategoryResource } from "@connectors/resources/zendesk_resources"; @@ -365,30 +363,3 @@ export async function fetchZendeskCurrentUser({ const data = await response.json(); return data.user; } - -/** - * Fetches the Section and the User for an article. - */ -export async function fetchArticleMetadata( - zendeskApiClient: Client, - article: ZendeskFetchedArticle -): Promise<{ section: ZendeskFetchedSection; user: ZendeskFetchedUser }> { - try { - const { result: section } = await zendeskApiClient.helpcenter.sections.show( - article.section_id - ); - const { result: user } = await zendeskApiClient.users.show( - article.author_id - ); - return { section, user }; - } catch (e) { - logger.error( - { articleId: article.id, error: e }, - "[Zendesk] Error fetching article metadata" - ); - if (isNodeZendeskForbiddenError(e)) { - throw new ExternalOAuthTokenError(e); - } - throw e; - } -} diff --git a/connectors/src/connectors/zendesk/temporal/activities.ts b/connectors/src/connectors/zendesk/temporal/activities.ts index f77ac28f635f..2e77d5957f4f 100644 --- a/connectors/src/connectors/zendesk/temporal/activities.ts +++ b/connectors/src/connectors/zendesk/temporal/activities.ts @@ -1,6 +1,5 @@ import type { ModelId } from "@dust-tt/types"; -import { isNodeZendeskForbiddenError } from "@connectors/connectors/zendesk/lib/errors"; import { syncArticle } from "@connectors/connectors/zendesk/lib/sync_article"; import { syncCategory } from "@connectors/connectors/zendesk/lib/sync_category"; import { syncTicket } from "@connectors/connectors/zendesk/lib/sync_ticket"; @@ -15,7 +14,6 @@ import { import { ZENDESK_BATCH_SIZE } from "@connectors/connectors/zendesk/temporal/config"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; import { concurrentExecutor } from "@connectors/lib/async_utils"; -import { ExternalOAuthTokenError } from "@connectors/lib/error"; import { ZendeskTimestampCursor } from "@connectors/lib/models/zendesk"; import { syncStarted, syncSucceeded } from "@connectors/lib/sync_status"; import { heartbeat } from "@connectors/lib/temporal"; @@ -332,21 +330,11 @@ export async function syncZendeskArticleBatchActivity({ `[Zendesk] Processing ${articles.length} articles in batch` ); - let sections; - let users; - try { - sections = - await zendeskApiClient.helpcenter.sections.listByCategory(categoryId); - const { result: usersResult } = await zendeskApiClient.users.showMany( - articles.map((article) => article.author_id) - ); - users = usersResult; - } catch (e) { - if (isNodeZendeskForbiddenError(e)) { - throw new ExternalOAuthTokenError(e); - } - throw e; - } + const sections = + await zendeskApiClient.helpcenter.sections.listByCategory(categoryId); + const { result: users } = await zendeskApiClient.users.showMany( + articles.map((article) => article.author_id) + ); await concurrentExecutor( articles, diff --git a/connectors/src/connectors/zendesk/temporal/cast_known_errors.ts b/connectors/src/connectors/zendesk/temporal/cast_known_errors.ts new file mode 100644 index 000000000000..ce8f1abdfeca --- /dev/null +++ b/connectors/src/connectors/zendesk/temporal/cast_known_errors.ts @@ -0,0 +1,48 @@ +import type { + ActivityExecuteInput, + ActivityInboundCallsInterceptor, + Next, +} from "@temporalio/worker"; + +import { + isNodeZendeskForbiddenError, + isZendeskEpipeError, + isZendeskExpiredCursorError, +} from "@connectors/connectors/zendesk/lib/errors"; +import { + DustConnectorWorkflowError, + ExternalOAuthTokenError, + ProviderWorkflowError, +} from "@connectors/lib/error"; + +export class ZendeskCastKnownErrorsInterceptor + implements ActivityInboundCallsInterceptor +{ + async execute( + input: ActivityExecuteInput, + next: Next + ): Promise { + try { + return await next(input); + } catch (err: unknown) { + if (isNodeZendeskForbiddenError(err)) { + throw new ExternalOAuthTokenError(err); + } else if (isZendeskExpiredCursorError(err)) { + throw new DustConnectorWorkflowError( + "Cursor expired", + "unhandled_internal_activity_error", + err + ); + } else if (isZendeskEpipeError(err)) { + throw new ProviderWorkflowError( + "zendesk", + "EPIPE", + "transient_upstream_activity_error", + err + ); + } + + throw err; + } + } +} diff --git a/connectors/src/connectors/zendesk/temporal/incremental_activities.ts b/connectors/src/connectors/zendesk/temporal/incremental_activities.ts index ad9df74a4a28..de37de067f33 100644 --- a/connectors/src/connectors/zendesk/temporal/incremental_activities.ts +++ b/connectors/src/connectors/zendesk/temporal/incremental_activities.ts @@ -9,7 +9,6 @@ import { getZendeskSubdomainAndAccessToken } from "@connectors/connectors/zendes import { changeZendeskClientSubdomain, createZendeskClient, - fetchArticleMetadata, fetchRecentlyUpdatedArticles, fetchRecentlyUpdatedTickets, } from "@connectors/connectors/zendesk/lib/zendesk_api"; @@ -113,9 +112,10 @@ export async function syncZendeskArticleUpdateBatchActivity({ await concurrentExecutor( articles, async (article) => { - const { section, user } = await fetchArticleMetadata( - zendeskApiClient, - article + const { result: section } = + await zendeskApiClient.helpcenter.sections.show(article.section_id); + const { result: user } = await zendeskApiClient.users.show( + article.author_id ); if (section.category_id) { diff --git a/connectors/src/connectors/zendesk/temporal/worker.ts b/connectors/src/connectors/zendesk/temporal/worker.ts index 8077ee0b78cc..aedd61eef455 100644 --- a/connectors/src/connectors/zendesk/temporal/worker.ts +++ b/connectors/src/connectors/zendesk/temporal/worker.ts @@ -2,6 +2,7 @@ import type { Context } from "@temporalio/activity"; import { Worker } from "@temporalio/worker"; import TsconfigPathsPlugin from "tsconfig-paths-webpack-plugin"; +import { ZendeskCastKnownErrorsInterceptor } from "@connectors/connectors/zendesk/temporal/cast_known_errors"; import { getTemporalWorkerConnection } from "@connectors/lib/temporal"; import { ActivityInboundLogInterceptor } from "@connectors/lib/temporal_monitoring"; import logger from "@connectors/logger/logger"; @@ -26,6 +27,7 @@ export async function runZendeskWorkers() { (ctx: Context) => { return new ActivityInboundLogInterceptor(ctx, logger); }, + () => new ZendeskCastKnownErrorsInterceptor(), ], }, bundlerOptions: {