Skip to content

Commit

Permalink
enh(zendesk): reduce batch size and add heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
Henry Fontanier committed Nov 26, 2024
1 parent ca607bf commit 64af064
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
12 changes: 10 additions & 2 deletions connectors/src/connectors/zendesk/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { concurrentExecutor } from "@connectors/lib/async_utils";
import { ExternalOAuthTokenError } from "@connectors/lib/error";
import { ZendeskTimestampCursor } from "@connectors/lib/models/zendesk";
import { syncStarted, syncSucceeded } from "@connectors/lib/sync_status";
import { heartbeat } from "@connectors/lib/temporal";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import {
Expand Down Expand Up @@ -199,6 +200,7 @@ export async function syncZendeskCategoryBatchActivity({
syncCategory({ connectorId, brandId, category, currentSyncDateMs }),
{
concurrency: 10,
onBatchComplete: heartbeat,
}
);

Expand Down Expand Up @@ -361,7 +363,10 @@ export async function syncZendeskArticleBatchActivity({
loggerArgs,
forceResync,
}),
{ concurrency: 10 }
{
concurrency: 10,
onBatchComplete: heartbeat,
}
);
return { hasMore, nextLink };
}
Expand Down Expand Up @@ -446,7 +451,10 @@ export async function syncZendeskTicketBatchActivity({
users,
});
},
{ concurrency: 10 }
{
concurrency: 10,
onBatchComplete: heartbeat,
}
);

logger.info(
Expand Down
2 changes: 1 addition & 1 deletion connectors/src/connectors/zendesk/temporal/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ export const QUEUE_NAME = `zendesk-queue-v${WORKFLOW_VERSION}`;

// Batch size used when fetching from Zendesk API or from the database
// 100 is the maximum value allowed for most endpoints in Zendesk: https://developer.zendesk.com/api-reference/introduction/pagination/
export const ZENDESK_BATCH_SIZE = 50;
export const ZENDESK_BATCH_SIZE = 30;
25 changes: 21 additions & 4 deletions connectors/src/lib/async_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,35 @@ import PQueue from "p-queue";

export async function concurrentExecutor<T, V>(
items: T[],
iterator: (item: T, idx: number) => Promise<V>,
{ concurrency = 8 }: { concurrency: number }
task: (item: T, idx: number) => Promise<V>,
{
concurrency,
onBatchComplete,
}: {
concurrency: number;
onBatchComplete?: () => Promise<void>;
}
) {
const queue = new PQueue({ concurrency });
const promises: Promise<V>[] = [];
let completedCount = 0;

for (const [idx, item] of items.entries()) {
// Queue each task. The queue manages concurrency.
// Each task is wrapped in a promise to capture its completion.
const p = queue.add(async () => iterator(item, idx));
const p = queue.add(async () => {
const result = await task(item, idx);

// Call the onBatchComplete callback if it's provided and the batch is complete
completedCount++;
if (onBatchComplete && completedCount % concurrency === 0) {
await onBatchComplete();
}

return result;
});

// Cast the promise to Promise<R> to fix the return type from P-Queue.
// Cast the promise to Promise<V> to fix the return type from P-Queue.
promises.push(p as Promise<V>);
}

Expand Down

0 comments on commit 64af064

Please sign in to comment.