diff --git a/valhalla/jawn/src/index.ts b/valhalla/jawn/src/index.ts index 377e2b6137..f89576e530 100644 --- a/valhalla/jawn/src/index.ts +++ b/valhalla/jawn/src/index.ts @@ -64,7 +64,7 @@ if (KAFKA_ENABLED) { dlqCount: DLQ_WORKER_COUNT, normalCount: NORMAL_WORKER_COUNT, scoresCount: SCORES_WORKER_COUNT, - scoresDlqCount: SCORES_WORKER_COUNT, + scoresDlqCount: 0, backFillCount: 0, }); } diff --git a/valhalla/jawn/src/managers/score/ScoreManager.ts b/valhalla/jawn/src/managers/score/ScoreManager.ts index 65a03decb3..0f334be5b5 100644 --- a/valhalla/jawn/src/managers/score/ScoreManager.ts +++ b/valhalla/jawn/src/managers/score/ScoreManager.ts @@ -6,6 +6,7 @@ import { dataDogClient } from "../../lib/clients/DataDogClient"; import { KafkaProducer } from "../../lib/clients/KafkaProducer"; import { HeliconeScoresMessage } from "../../lib/handlers/HandlerContext"; import * as Sentry from "@sentry/node"; +import { validate as uuidValidate } from "uuid"; type Scores = Record; @@ -80,9 +81,12 @@ export class ScoreManager extends BaseManager { if (scoresMessages.length === 0) { return ok(null); } + const validScoresMessages = scoresMessages.filter((message) => + uuidValidate(message.requestId) + ); // Filter out duplicate scores messages and only keep the latest one const filteredMessages = Array.from( - scoresMessages + validScoresMessages .reduce((map, message) => { const key = `${message.requestId}-${message.organizationId}`; const existingMessage = map.get(key);