Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scores stop version updates + kafka producer delays #2644

Merged
merged 14 commits into from
Sep 20, 2024
5 changes: 5 additions & 0 deletions docs/features/advanced-usage/scores.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import QuestionsSection from "/snippets/questions-section.mdx";
[plan](https://www.helicone.ai/pricing).
</Info>

<Warning>
Scores will be ingested with a **10 minutes delay** to ensure there is enough
time to process the request.
</Warning>

## Introduction

Helicone's scores [API](https://docs.helicone.ai/rest/request/post-v1request-score) allows you to score your requests and experiments. You can use this feature to evaluate the performance of your prompts and compare different experiments and datasets. E.g., if you are building an image classification application, you might need a variety of scores to help you determine how accurate the outputs are compared to what you expect. For example, an image classification app might have one score that tells you how accurate the model classifies images into the correct categories, and another that measures the confidence level of the model's predictions.
Expand Down
28 changes: 27 additions & 1 deletion valhalla/jawn/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import * as publicSwaggerDoc from "./tsoa-build/public/swagger.json";
import { initLogs } from "./utils/injectLogs";
import { initSentry } from "./utils/injectSentry";
import { startConsumers } from "./workers/consumerInterface";
import { DelayedOperationService } from "./lib/shared/delayedOperationService";

export const ENVIRONMENT: "production" | "development" = (process.env
.VERCEL_ENV ?? "development") as any;
Expand Down Expand Up @@ -202,5 +203,30 @@ const server = app.listen(

server.on("error", console.error);

// Thisp
server.setTimeout(1000 * 60 * 10); // 10 minutes

// This shuts down the server and all delayed operations with delay only locally, on AWS it will be killed by the OS with no delay
// Please wait few minutes before terminating the original task on AWS
async function gracefulShutdown(signal: string) {
console.log(`Received ${signal}. Starting graceful shutdown...`);

server.close(async () => {
console.log("HTTP server closed.");

await DelayedOperationService.getInstance().executeShutdown();

console.log("Graceful shutdown completed.");
process.exit(0);
});

// If server hasn't closed in 30 seconds, force shutdown
setTimeout(() => {
console.error(
"Could not close connections in time, forcefully shutting down"
);
process.exit(1);
}, 30000);
}

process.on("SIGTERM", () => gracefulShutdown("SIGTERM"));
process.on("SIGINT", () => gracefulShutdown("SIGINT"));
73 changes: 73 additions & 0 deletions valhalla/jawn/src/lib/shared/delayedOperationService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
export class DelayedOperationService {
private static instance: DelayedOperationService;
private delayedOperations: Map<NodeJS.Timeout, () => Promise<any>> =
new Map();
private static readonly SHUTDOWN_TIMEOUT = 30000; // 30 seconds timeout

public static getInstance(): DelayedOperationService {
if (!DelayedOperationService.instance) {
DelayedOperationService.instance = new DelayedOperationService();
}
return DelayedOperationService.instance;
}

public addDelayedOperation(
timeoutId: NodeJS.Timeout,
operation: () => Promise<any>
): void {
this.delayedOperations.set(timeoutId, operation);
}

public static getTimeoutId(
operation: () => Promise<any>,
delayMs: number
): NodeJS.Timeout {
return setTimeout(() => {
operation().catch((error) => {
console.error("Error in delayed operation:", error);
});
}, delayMs);
}

public async executeShutdown(): Promise<void> {
console.log("Executing shutdown handlers...");
try {
// Clear all timeouts and collect operations
const operations = Array.from(this.delayedOperations.entries());
this.delayedOperations.clear();

for (const [timeoutId, operation] of operations) {
clearTimeout(timeoutId);
}

// Execute delayed operations
await Promise.all([
Promise.all(
operations.map(([timeoutId, op]) =>
op()
.catch((error) => {
console.error("Error in delayed operation:", error);
})
.finally(() => {
this.removeDelayedOperation(timeoutId);
})
)
),
new Promise((_, reject) =>
setTimeout(
() => reject(new Error("Shutdown timed out")),
DelayedOperationService.SHUTDOWN_TIMEOUT
)
),
]);

console.log("All shutdown handlers executed successfully.");
} catch (error) {
console.error("Error during shutdown:", error);
console.log("Shutdown process completed with errors or timed out.");
}
}
public removeDelayedOperation(timeoutId: NodeJS.Timeout): void {
this.delayedOperations.delete(timeoutId);
}
}
15 changes: 8 additions & 7 deletions valhalla/jawn/src/lib/stores/ScoreStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ export type Score = {

export interface BatchScores {
requestId: string;
provider: string;
organizationId: string;
mappedScores: Score[];
}
Expand Down Expand Up @@ -106,10 +105,8 @@ export class ScoreStore extends BaseStore {
): Promise<Result<RequestResponseRMT[], string>> {
const queryPlaceholders = newVersions
.map((_, index) => {
const base = index * 3;
return `({val_${base} : String}, {val_${base + 1} : String}, {val_${
base + 2
} : String})`;
const base = index * 2;
return `({val_${base} : String}, {val_${base + 1} : String})`;
})
.join(",\n ");

Expand All @@ -118,7 +115,7 @@ export class ScoreStore extends BaseStore {
}

const queryParams: (string | number | boolean | Date)[] =
newVersions.flatMap((v) => [v.requestId, v.organizationId, v.provider]);
newVersions.flatMap((v) => [v.requestId, v.organizationId]);

if (queryParams.length === 0) {
return err("No query params");
Expand All @@ -129,7 +126,7 @@ export class ScoreStore extends BaseStore {
`
SELECT *
FROM request_response_rmt
WHERE (request_id, organization_id, provider) IN (${queryPlaceholders})
WHERE (request_id, organization_id) IN (${queryPlaceholders})
`,
queryParams
),
Expand Down Expand Up @@ -284,6 +281,10 @@ export class ScoreStore extends BaseStore {
feedback.responseId !== "00000000-0000-0000-0000-000000000000"
);

if (validFeedbacks.length === 0) {
return ok([]);
}

console.log(
`Upserting feedback for ${
validFeedbacks.length
Expand Down
95 changes: 54 additions & 41 deletions valhalla/jawn/src/managers/score/ScoreManager.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { err, ok, Result } from "../../lib/shared/result";
import { BaseManager } from "../BaseManager";
import { AuthParams } from "../../lib/db/supabase";
import { BatchScores, Score, ScoreStore } from "../../lib/stores/ScoreStore";
import { Score, ScoreStore } from "../../lib/stores/ScoreStore";
import { dataDogClient } from "../../lib/clients/DataDogClient";
import { KafkaProducer } from "../../lib/clients/KafkaProducer";
import { HeliconeScoresMessage } from "../../lib/handlers/HandlerContext";
import * as Sentry from "@sentry/node";
import { DelayedOperationService } from "../../lib/shared/delayedOperationService";
import { BaseManager } from "../BaseManager";
import { validate as uuidValidate } from "uuid";

type Scores = Record<string, number | boolean>;
const delayMs = 10 * 60 * 1000; // 10 minutes in milliseconds

export interface ScoreRequest {
scores: Scores;
Expand All @@ -17,11 +19,13 @@ export interface ScoreRequest {
export class ScoreManager extends BaseManager {
private scoreStore: ScoreStore;
private kafkaProducer: KafkaProducer;

constructor(authParams: AuthParams) {
super(authParams);
this.scoreStore = new ScoreStore(authParams.organizationId);
this.kafkaProducer = new KafkaProducer();
}

public async addScores(
requestId: string,
scores: Scores
Expand All @@ -47,30 +51,55 @@ export class ScoreManager extends BaseManager {
): Promise<Result<null, string>> {
if (!this.kafkaProducer.isKafkaEnabled()) {
console.log("Kafka is not enabled. Using score manager");
const scoreManager = new ScoreManager({
organizationId: this.authParams.organizationId,
});
return await scoreManager.handleScores(
{
batchId: "",
partition: 0,
lastOffset: "",
messageCount: 1,
},
scoresMessage

// Schedule the delayed operation and register it with ShutdownService
const timeoutId = DelayedOperationService.getTimeoutId(() => {
return this.handleScores(
{
batchId: "",
partition: 0,
lastOffset: "",
messageCount: 1,
},
scoresMessage
);
}, delayMs);

// Register the timeout and operation with ShutdownService
DelayedOperationService.getInstance().addDelayedOperation(timeoutId, () =>
this.handleScores(
{
batchId: "",
partition: 0,
lastOffset: "",
messageCount: 1,
},
scoresMessage
)
);

return ok(null);
}

console.log("Sending scores message to Kafka");

const res = await this.kafkaProducer.sendScoresMessage(
scoresMessage,
"helicone-scores-prod"
// Schedule the Kafka send operation and register it with ShutdownService
const timeoutId = setTimeout(() => {
this.kafkaProducer
.sendScoresMessage(scoresMessage, "helicone-scores-prod")
.catch((error) => {
console.error("Error sending scores message to Kafka:", error);
});
}, delayMs);

// Register the timeout and operation with ShutdownService
DelayedOperationService.getInstance().addDelayedOperation(timeoutId, () =>
this.kafkaProducer.sendScoresMessage(
scoresMessage,
"helicone-scores-prod"
)
);

if (res.error) {
console.error(`Error sending scores message to Kafka: ${res.error}`);
return err(res.error);
}
return ok(null);
}

Expand Down Expand Up @@ -100,29 +129,15 @@ export class ScoreManager extends BaseManager {
}, new Map<string, HeliconeScoresMessage>())
.values()
);
const bumpedVersions = await this.scoreStore.bumpRequestVersion(
filteredMessages.map((scoresMessage) => ({
id: scoresMessage.requestId,
organizationId: scoresMessage.organizationId,
}))
);

if (
bumpedVersions.error ||
!bumpedVersions.data ||
bumpedVersions.data.length === 0
) {
return err(bumpedVersions.error);
}
const scoresScoreResult = await this.scoreStore.putScoresIntoClickhouse(
bumpedVersions.data.map((scoresMessage) => {
filteredMessages.map((scoresMessage) => {
return {
requestId: scoresMessage.id,
organizationId: scoresMessage.helicone_org_id,
provider: scoresMessage.provider,
requestId: scoresMessage.requestId,
organizationId: scoresMessage.organizationId,
mappedScores:
filteredMessages
.find((x) => x.requestId === scoresMessage.id)
.find((x) => x.requestId === scoresMessage.requestId)
?.scores.map((score) => {
if (score.score_attribute_type === "boolean") {
return {
Expand Down Expand Up @@ -178,7 +193,7 @@ export class ScoreManager extends BaseManager {
messageCount: number;
},
scoresMessages: HeliconeScoresMessage[]
): Promise<Result<null, string>> {
): Promise<void> {
console.log(`Handling scores for batch ${batchContext.batchId}`);
const start = performance.now();
const result = await this.procesScores(scoresMessages);
Expand Down Expand Up @@ -233,10 +248,8 @@ export class ScoreManager extends BaseManager {
},
});
}
return err(result.error);
}
console.log("Successfully processed scores messages");
return ok(null);
}

private mapScores(scores: Scores): Score[] {
Expand Down
Loading