diff --git a/docs/features/advanced-usage/scores.mdx b/docs/features/advanced-usage/scores.mdx
index 402f276b51..029f2fcae1 100644
--- a/docs/features/advanced-usage/scores.mdx
+++ b/docs/features/advanced-usage/scores.mdx
@@ -12,6 +12,11 @@ import QuestionsSection from "/snippets/questions-section.mdx";
[plan](https://www.helicone.ai/pricing).
+
+ Scores will be ingested with a **10 minutes delay** to ensure there is enough
+ time to process the request.
+
+
## Introduction
Helicone's scores [API](https://docs.helicone.ai/rest/request/post-v1request-score) allows you to score your requests and experiments. You can use this feature to evaluate the performance of your prompts and compare different experiments and datasets. E.g., if you are building an image classification application, you might need a variety of scores to help you determine how accurate the outputs are compared to what you expect. For example, an image classification app might have one score that tells you how accurate the model classifies images into the correct categories, and another that measures the confidence level of the model's predictions.
diff --git a/valhalla/jawn/src/index.ts b/valhalla/jawn/src/index.ts
index 377e2b6137..bc56af88de 100644
--- a/valhalla/jawn/src/index.ts
+++ b/valhalla/jawn/src/index.ts
@@ -21,6 +21,7 @@ import * as publicSwaggerDoc from "./tsoa-build/public/swagger.json";
import { initLogs } from "./utils/injectLogs";
import { initSentry } from "./utils/injectSentry";
import { startConsumers } from "./workers/consumerInterface";
+import { DelayedOperationService } from "./lib/shared/delayedOperationService";
export const ENVIRONMENT: "production" | "development" = (process.env
.VERCEL_ENV ?? "development") as any;
@@ -202,5 +203,30 @@ const server = app.listen(
server.on("error", console.error);
-// Thisp
server.setTimeout(1000 * 60 * 10); // 10 minutes
+
+// This shuts down the server and all delayed operations with delay only locally, on AWS it will be killed by the OS with no delay
+// Please wait few minutes before terminating the original task on AWS
+async function gracefulShutdown(signal: string) {
+ console.log(`Received ${signal}. Starting graceful shutdown...`);
+
+ server.close(async () => {
+ console.log("HTTP server closed.");
+
+ await DelayedOperationService.getInstance().executeShutdown();
+
+ console.log("Graceful shutdown completed.");
+ process.exit(0);
+ });
+
+ // If server hasn't closed in 30 seconds, force shutdown
+ setTimeout(() => {
+ console.error(
+ "Could not close connections in time, forcefully shutting down"
+ );
+ process.exit(1);
+ }, 30000);
+}
+
+process.on("SIGTERM", () => gracefulShutdown("SIGTERM"));
+process.on("SIGINT", () => gracefulShutdown("SIGINT"));
diff --git a/valhalla/jawn/src/lib/shared/delayedOperationService.ts b/valhalla/jawn/src/lib/shared/delayedOperationService.ts
new file mode 100644
index 0000000000..394b34dc1d
--- /dev/null
+++ b/valhalla/jawn/src/lib/shared/delayedOperationService.ts
@@ -0,0 +1,73 @@
+export class DelayedOperationService {
+ private static instance: DelayedOperationService;
+ private delayedOperations: Map Promise> =
+ new Map();
+ private static readonly SHUTDOWN_TIMEOUT = 30000; // 30 seconds timeout
+
+ public static getInstance(): DelayedOperationService {
+ if (!DelayedOperationService.instance) {
+ DelayedOperationService.instance = new DelayedOperationService();
+ }
+ return DelayedOperationService.instance;
+ }
+
+ public addDelayedOperation(
+ timeoutId: NodeJS.Timeout,
+ operation: () => Promise
+ ): void {
+ this.delayedOperations.set(timeoutId, operation);
+ }
+
+ public static getTimeoutId(
+ operation: () => Promise,
+ delayMs: number
+ ): NodeJS.Timeout {
+ return setTimeout(() => {
+ operation().catch((error) => {
+ console.error("Error in delayed operation:", error);
+ });
+ }, delayMs);
+ }
+
+ public async executeShutdown(): Promise {
+ console.log("Executing shutdown handlers...");
+ try {
+ // Clear all timeouts and collect operations
+ const operations = Array.from(this.delayedOperations.entries());
+ this.delayedOperations.clear();
+
+ for (const [timeoutId, operation] of operations) {
+ clearTimeout(timeoutId);
+ }
+
+ // Execute delayed operations
+ await Promise.all([
+ Promise.all(
+ operations.map(([timeoutId, op]) =>
+ op()
+ .catch((error) => {
+ console.error("Error in delayed operation:", error);
+ })
+ .finally(() => {
+ this.removeDelayedOperation(timeoutId);
+ })
+ )
+ ),
+ new Promise((_, reject) =>
+ setTimeout(
+ () => reject(new Error("Shutdown timed out")),
+ DelayedOperationService.SHUTDOWN_TIMEOUT
+ )
+ ),
+ ]);
+
+ console.log("All shutdown handlers executed successfully.");
+ } catch (error) {
+ console.error("Error during shutdown:", error);
+ console.log("Shutdown process completed with errors or timed out.");
+ }
+ }
+ public removeDelayedOperation(timeoutId: NodeJS.Timeout): void {
+ this.delayedOperations.delete(timeoutId);
+ }
+}
diff --git a/valhalla/jawn/src/lib/stores/ScoreStore.ts b/valhalla/jawn/src/lib/stores/ScoreStore.ts
index adef2adec8..d0cf92f126 100644
--- a/valhalla/jawn/src/lib/stores/ScoreStore.ts
+++ b/valhalla/jawn/src/lib/stores/ScoreStore.ts
@@ -11,7 +11,6 @@ export type Score = {
export interface BatchScores {
requestId: string;
- provider: string;
organizationId: string;
mappedScores: Score[];
}
@@ -106,10 +105,8 @@ export class ScoreStore extends BaseStore {
): Promise> {
const queryPlaceholders = newVersions
.map((_, index) => {
- const base = index * 3;
- return `({val_${base} : String}, {val_${base + 1} : String}, {val_${
- base + 2
- } : String})`;
+ const base = index * 2;
+ return `({val_${base} : String}, {val_${base + 1} : String})`;
})
.join(",\n ");
@@ -118,7 +115,7 @@ export class ScoreStore extends BaseStore {
}
const queryParams: (string | number | boolean | Date)[] =
- newVersions.flatMap((v) => [v.requestId, v.organizationId, v.provider]);
+ newVersions.flatMap((v) => [v.requestId, v.organizationId]);
if (queryParams.length === 0) {
return err("No query params");
@@ -129,7 +126,7 @@ export class ScoreStore extends BaseStore {
`
SELECT *
FROM request_response_rmt
- WHERE (request_id, organization_id, provider) IN (${queryPlaceholders})
+ WHERE (request_id, organization_id) IN (${queryPlaceholders})
`,
queryParams
),
@@ -284,6 +281,10 @@ export class ScoreStore extends BaseStore {
feedback.responseId !== "00000000-0000-0000-0000-000000000000"
);
+ if (validFeedbacks.length === 0) {
+ return ok([]);
+ }
+
console.log(
`Upserting feedback for ${
validFeedbacks.length
diff --git a/valhalla/jawn/src/managers/score/ScoreManager.ts b/valhalla/jawn/src/managers/score/ScoreManager.ts
index ca54d46950..4d23665f0e 100644
--- a/valhalla/jawn/src/managers/score/ScoreManager.ts
+++ b/valhalla/jawn/src/managers/score/ScoreManager.ts
@@ -1,14 +1,16 @@
import { err, ok, Result } from "../../lib/shared/result";
-import { BaseManager } from "../BaseManager";
import { AuthParams } from "../../lib/db/supabase";
-import { BatchScores, Score, ScoreStore } from "../../lib/stores/ScoreStore";
+import { Score, ScoreStore } from "../../lib/stores/ScoreStore";
import { dataDogClient } from "../../lib/clients/DataDogClient";
import { KafkaProducer } from "../../lib/clients/KafkaProducer";
import { HeliconeScoresMessage } from "../../lib/handlers/HandlerContext";
import * as Sentry from "@sentry/node";
+import { DelayedOperationService } from "../../lib/shared/delayedOperationService";
+import { BaseManager } from "../BaseManager";
import { validate as uuidValidate } from "uuid";
type Scores = Record;
+const delayMs = 10 * 60 * 1000; // 10 minutes in milliseconds
export interface ScoreRequest {
scores: Scores;
@@ -17,11 +19,13 @@ export interface ScoreRequest {
export class ScoreManager extends BaseManager {
private scoreStore: ScoreStore;
private kafkaProducer: KafkaProducer;
+
constructor(authParams: AuthParams) {
super(authParams);
this.scoreStore = new ScoreStore(authParams.organizationId);
this.kafkaProducer = new KafkaProducer();
}
+
public async addScores(
requestId: string,
scores: Scores
@@ -47,30 +51,55 @@ export class ScoreManager extends BaseManager {
): Promise> {
if (!this.kafkaProducer.isKafkaEnabled()) {
console.log("Kafka is not enabled. Using score manager");
- const scoreManager = new ScoreManager({
- organizationId: this.authParams.organizationId,
- });
- return await scoreManager.handleScores(
- {
- batchId: "",
- partition: 0,
- lastOffset: "",
- messageCount: 1,
- },
- scoresMessage
+
+ // Schedule the delayed operation and register it with ShutdownService
+ const timeoutId = DelayedOperationService.getTimeoutId(() => {
+ return this.handleScores(
+ {
+ batchId: "",
+ partition: 0,
+ lastOffset: "",
+ messageCount: 1,
+ },
+ scoresMessage
+ );
+ }, delayMs);
+
+ // Register the timeout and operation with ShutdownService
+ DelayedOperationService.getInstance().addDelayedOperation(timeoutId, () =>
+ this.handleScores(
+ {
+ batchId: "",
+ partition: 0,
+ lastOffset: "",
+ messageCount: 1,
+ },
+ scoresMessage
+ )
);
+
+ return ok(null);
}
+
console.log("Sending scores message to Kafka");
- const res = await this.kafkaProducer.sendScoresMessage(
- scoresMessage,
- "helicone-scores-prod"
+ // Schedule the Kafka send operation and register it with ShutdownService
+ const timeoutId = setTimeout(() => {
+ this.kafkaProducer
+ .sendScoresMessage(scoresMessage, "helicone-scores-prod")
+ .catch((error) => {
+ console.error("Error sending scores message to Kafka:", error);
+ });
+ }, delayMs);
+
+ // Register the timeout and operation with ShutdownService
+ DelayedOperationService.getInstance().addDelayedOperation(timeoutId, () =>
+ this.kafkaProducer.sendScoresMessage(
+ scoresMessage,
+ "helicone-scores-prod"
+ )
);
- if (res.error) {
- console.error(`Error sending scores message to Kafka: ${res.error}`);
- return err(res.error);
- }
return ok(null);
}
@@ -100,29 +129,15 @@ export class ScoreManager extends BaseManager {
}, new Map())
.values()
);
- const bumpedVersions = await this.scoreStore.bumpRequestVersion(
- filteredMessages.map((scoresMessage) => ({
- id: scoresMessage.requestId,
- organizationId: scoresMessage.organizationId,
- }))
- );
- if (
- bumpedVersions.error ||
- !bumpedVersions.data ||
- bumpedVersions.data.length === 0
- ) {
- return err(bumpedVersions.error);
- }
const scoresScoreResult = await this.scoreStore.putScoresIntoClickhouse(
- bumpedVersions.data.map((scoresMessage) => {
+ filteredMessages.map((scoresMessage) => {
return {
- requestId: scoresMessage.id,
- organizationId: scoresMessage.helicone_org_id,
- provider: scoresMessage.provider,
+ requestId: scoresMessage.requestId,
+ organizationId: scoresMessage.organizationId,
mappedScores:
filteredMessages
- .find((x) => x.requestId === scoresMessage.id)
+ .find((x) => x.requestId === scoresMessage.requestId)
?.scores.map((score) => {
if (score.score_attribute_type === "boolean") {
return {
@@ -178,7 +193,7 @@ export class ScoreManager extends BaseManager {
messageCount: number;
},
scoresMessages: HeliconeScoresMessage[]
- ): Promise> {
+ ): Promise {
console.log(`Handling scores for batch ${batchContext.batchId}`);
const start = performance.now();
const result = await this.procesScores(scoresMessages);
@@ -233,10 +248,8 @@ export class ScoreManager extends BaseManager {
},
});
}
- return err(result.error);
}
console.log("Successfully processed scores messages");
- return ok(null);
}
private mapScores(scores: Scores): Score[] {