diff --git a/valhalla/jawn/src/index.ts b/valhalla/jawn/src/index.ts index 95ae845ce..963846606 100644 --- a/valhalla/jawn/src/index.ts +++ b/valhalla/jawn/src/index.ts @@ -62,10 +62,10 @@ const KAFKA_ENABLED = (KAFKA_CREDS?.KAFKA_ENABLED ?? "false") === "true"; if (KAFKA_ENABLED) { startConsumers({ - dlqCount: 0, - normalCount: 0, - scoresCount: 0, - scoresDlqCount: 0, + dlqCount: DLQ_WORKER_COUNT, + normalCount: NORMAL_WORKER_COUNT, + scoresCount: SCORES_WORKER_COUNT, + scoresDlqCount: SCORES_WORKER_COUNT, backFillCount: 0, }); } @@ -205,6 +205,7 @@ server.on("error", console.error); server.setTimeout(1000 * 60 * 10); // 10 minutes +//This shuts down the server and all delayed operations with delay only locally, on AWS it will be killed by the OS with no delay async function gracefulShutdown(signal: string) { console.log(`Received ${signal}. Starting graceful shutdown...`); @@ -223,7 +224,7 @@ async function gracefulShutdown(signal: string) { "Could not close connections in time, forcefully shutting down" ); process.exit(1); - }, 60000 * 3); + }, 30000); } process.on("SIGTERM", () => gracefulShutdown("SIGTERM")); diff --git a/valhalla/jawn/src/lib/shared/delayedOperationService.ts b/valhalla/jawn/src/lib/shared/delayedOperationService.ts index a1bed7bd7..394b34dc1 100644 --- a/valhalla/jawn/src/lib/shared/delayedOperationService.ts +++ b/valhalla/jawn/src/lib/shared/delayedOperationService.ts @@ -2,7 +2,7 @@ export class DelayedOperationService { private static instance: DelayedOperationService; private delayedOperations: Map Promise> = new Map(); - private static readonly SHUTDOWN_TIMEOUT = 60000 * 3; // 3 minutes timeout + private static readonly SHUTDOWN_TIMEOUT = 30000; // 30 seconds timeout public static getInstance(): DelayedOperationService { if (!DelayedOperationService.instance) { diff --git a/valhalla/jawn/src/lib/stores/ScoreStore.ts b/valhalla/jawn/src/lib/stores/ScoreStore.ts index 1fd428e9c..d0cf92f12 100644 --- a/valhalla/jawn/src/lib/stores/ScoreStore.ts +++ b/valhalla/jawn/src/lib/stores/ScoreStore.ts @@ -281,6 +281,10 @@ export class ScoreStore extends BaseStore { feedback.responseId !== "00000000-0000-0000-0000-000000000000" ); + if (validFeedbacks.length === 0) { + return ok([]); + } + console.log( `Upserting feedback for ${ validFeedbacks.length