diff --git a/valhalla/jawn/src/index.ts b/valhalla/jawn/src/index.ts index e9281467a3..1a6546eec3 100644 --- a/valhalla/jawn/src/index.ts +++ b/valhalla/jawn/src/index.ts @@ -21,7 +21,7 @@ import * as publicSwaggerDoc from "./tsoa-build/public/swagger.json"; import { initLogs } from "./utils/injectLogs"; import { initSentry } from "./utils/injectSentry"; import { startConsumers } from "./workers/consumerInterface"; -import { ShutdownService } from "./managers/shutdown/ShutdownService"; +import { ShutdownService } from "./lib/shared/ShutdownService"; export const ENVIRONMENT: "production" | "development" = (process.env .VERCEL_ENV ?? "development") as any; diff --git a/valhalla/jawn/src/lib/shared/ShutdownService.ts b/valhalla/jawn/src/lib/shared/ShutdownService.ts new file mode 100644 index 0000000000..fa5513a09f --- /dev/null +++ b/valhalla/jawn/src/lib/shared/ShutdownService.ts @@ -0,0 +1,74 @@ +export class ShutdownService { + private static instance: ShutdownService; + private delayedOperations: Map Promise> = + new Map(); + private static readonly SHUTDOWN_TIMEOUT = 120000; // 120 seconds timeout + + + public static getInstance(): ShutdownService { + if (!ShutdownService.instance) { + ShutdownService.instance = new ShutdownService(); + } + return ShutdownService.instance; + } + + public addDelayedOperation( + timeoutId: NodeJS.Timeout, + operation: () => Promise + ): void { + this.delayedOperations.set(timeoutId, operation); + } + + public static getTimeoutId( + operation: () => Promise, + delayMs: number + ): NodeJS.Timeout { + return setTimeout(() => { + operation().catch((error) => { + console.error("Error in delayed operation:", error); + }); + }, delayMs); + } + + public async executeShutdown(): Promise { + console.log("Executing shutdown handlers..."); + try { + // Clear all timeouts and collect operations + const operations = Array.from(this.delayedOperations.entries()); + this.delayedOperations.clear(); + + for (const [timeoutId, operation] of operations) { + clearTimeout(timeoutId); + } + + // Execute delayed operations + await Promise.all([ + Promise.all( + operations.map(([timeoutId, op]) => + op() + .catch((error) => { + console.error("Error in delayed operation:", error); + }) + .finally(() => { + this.removeDelayedOperation(timeoutId); + }) + ) + ), + new Promise((_, reject) => + setTimeout( + () => reject(new Error("Shutdown timed out")), + ShutdownService.SHUTDOWN_TIMEOUT + ) + ), + ]); + + console.log("All shutdown handlers executed successfully."); + } catch (error) { + console.error("Error during shutdown:", error); + console.log("Shutdown process completed with errors or timed out."); + } + } + public removeDelayedOperation(timeoutId: NodeJS.Timeout): void { + this.delayedOperations.delete(timeoutId); + } +} diff --git a/valhalla/jawn/src/managers/score/ScoreManager.ts b/valhalla/jawn/src/managers/score/ScoreManager.ts index 7bd29e289e..fd5e10023e 100644 --- a/valhalla/jawn/src/managers/score/ScoreManager.ts +++ b/valhalla/jawn/src/managers/score/ScoreManager.ts @@ -1,14 +1,11 @@ import { err, ok, Result } from "../../lib/shared/result"; -import { BaseManager } from "../BaseManager"; import { AuthParams } from "../../lib/db/supabase"; -import { BatchScores, Score, ScoreStore } from "../../lib/stores/ScoreStore"; +import { Score, ScoreStore } from "../../lib/stores/ScoreStore"; import { dataDogClient } from "../../lib/clients/DataDogClient"; import { KafkaProducer } from "../../lib/clients/KafkaProducer"; import { HeliconeScoresMessage } from "../../lib/handlers/HandlerContext"; import * as Sentry from "@sentry/node"; -import { ShutdownService } from "../shutdown/ShutdownService"; -import { clearTimeout } from "timers"; -import { ShutdownableManager } from "../shutdown/ShutdownableManager"; +import { ShutdownService } from "../../lib/shared/ShutdownService"; type Scores = Record; const delayMs = 10 * 60 * 1000; // 10 minutes in milliseconds @@ -17,16 +14,11 @@ export interface ScoreRequest { scores: Scores; } -export class ScoreManager extends ShutdownableManager { +export class ScoreManager { private scoreStore: ScoreStore; private kafkaProducer: KafkaProducer; - private delayedOperations: Map Promise> = - new Map(); - private static readonly MAX_OPERATION_TIME = 10 * 60 * 1000; // 10 minutes in milliseconds - private static readonly SHUTDOWN_TIMEOUT = 60000; // 60 seconds timeout - constructor(authParams: AuthParams) { - super(authParams); + constructor(private authParams: AuthParams) { this.scoreStore = new ScoreStore(authParams.organizationId); this.kafkaProducer = new KafkaProducer(); } @@ -56,7 +48,22 @@ export class ScoreManager extends ShutdownableManager { ): Promise> { if (!this.kafkaProducer.isKafkaEnabled()) { console.log("Kafka is not enabled. Using score manager"); - const delayedOperation = this.createDelayedOperation(() => + + // Schedule the delayed operation and register it with ShutdownService + const timeoutId = ShutdownService.getTimeoutId(() => { + return this.handleScores( + { + batchId: "", + partition: 0, + lastOffset: "", + messageCount: 1, + }, + scoresMessage + ); + }, delayMs); + + // Register the timeout and operation with ShutdownService + ShutdownService.getInstance().addDelayedOperation(timeoutId, () => this.handleScores( { batchId: "", @@ -67,28 +74,30 @@ export class ScoreManager extends ShutdownableManager { scoresMessage ) ); + return ok(null); } - console.log("Sending scores message to Kafka"); - - // const delayedOperation = this.createDelayedOperation(() => - // this.kafkaProducer.sendScoresMessage( - // scoresMessage, - // "helicone-scores-prod" - // ) - // ); - // this.delayedOperations.push(delayedOperation); - return ok(null); - } + console.log("Sending scores message to Kafka"); - private createDelayedOperation(operation: () => Promise): void { + // Schedule the Kafka send operation and register it with ShutdownService const timeoutId = setTimeout(() => { - operation().finally(() => { - this.delayedOperations.delete(timeoutId); - }); + this.kafkaProducer + .sendScoresMessage(scoresMessage, "helicone-scores-prod") + .catch((error) => { + console.error("Error sending scores message to Kafka:", error); + }); }, delayMs); - this.delayedOperations.set(timeoutId, operation); + + // Register the timeout and operation with ShutdownService + ShutdownService.getInstance().addDelayedOperation(timeoutId, () => + this.kafkaProducer.sendScoresMessage( + scoresMessage, + "helicone-scores-prod" + ) + ); + + return ok(null); } private async procesScores( @@ -247,31 +256,4 @@ export class ScoreManager extends ShutdownableManager { }; }); } - - public async shutdown(): Promise { - console.log( - `Executing ${this.delayedOperations.size} delayed operations in ScoreManager immediately...` - ); - const operations = Array.from(this.delayedOperations.entries()); - this.delayedOperations.clear(); - - for (const [timeoutId, operation] of operations) { - clearTimeout(timeoutId); - } - - try { - await Promise.race([ - Promise.all(operations.map(([_, op]) => op())), - new Promise((resolve) => - setTimeout(resolve, ScoreManager.SHUTDOWN_TIMEOUT) - ), - ]); - } catch (error) { - console.error("Error during ScoreManager shutdown:", error); - } - - console.log( - "All delayed operations in ScoreManager completed or timed out." - ); - } } diff --git a/valhalla/jawn/src/managers/shutdown/ShutdownService.ts b/valhalla/jawn/src/managers/shutdown/ShutdownService.ts deleted file mode 100644 index d7d78d3095..0000000000 --- a/valhalla/jawn/src/managers/shutdown/ShutdownService.ts +++ /dev/null @@ -1,39 +0,0 @@ -type ShutdownHandler = () => Promise; - -export class ShutdownService { - private static instance: ShutdownService; - private handlers: ShutdownHandler[] = []; - private static readonly SHUTDOWN_TIMEOUT = 60000; - - private constructor() {} - - public static getInstance(): ShutdownService { - if (!ShutdownService.instance) { - ShutdownService.instance = new ShutdownService(); - } - return ShutdownService.instance; - } - - public addHandler(handler: ShutdownHandler): void { - this.handlers.push(handler); - } - - public async executeShutdown(): Promise { - console.log("Executing shutdown handlers..."); - try { - await Promise.all([ - Promise.all(this.handlers.map((handler) => handler())), - new Promise((_, reject) => - setTimeout( - () => reject(new Error("Shutdown timed out")), - ShutdownService.SHUTDOWN_TIMEOUT - ) - ), - ]); - console.log("All shutdown handlers executed successfully."); - } catch (error) { - console.error("Error during shutdown:", error); - console.log("Shutdown process completed with errors or timed out."); - } - } -} diff --git a/valhalla/jawn/src/managers/shutdown/ShutdownableManager.ts b/valhalla/jawn/src/managers/shutdown/ShutdownableManager.ts deleted file mode 100644 index 6f21a06116..0000000000 --- a/valhalla/jawn/src/managers/shutdown/ShutdownableManager.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { AuthParams } from "../../lib/db/supabase"; -import { BaseManager } from "../BaseManager"; -import { ShutdownService } from "./ShutdownService"; - -export abstract class ShutdownableManager extends BaseManager { - constructor(authParams: AuthParams) { - super(authParams); - ShutdownService.getInstance().addHandler(() => this.shutdown()); - } - - public abstract shutdown(): Promise; -} diff --git a/valhalla/jawn/src/workers/consumerManager.ts b/valhalla/jawn/src/workers/consumerManager.ts deleted file mode 100644 index 3f6d745168..0000000000 --- a/valhalla/jawn/src/workers/consumerManager.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { Worker } from "worker_threads"; -import { ShutdownService } from "../managers/shutdown/ShutdownService"; - -export class ConsumerManager { - private static instance: ConsumerManager; - private workers: Worker[] = []; - - private constructor() { - // Register shutdown handler - ShutdownService.getInstance().addHandler(() => this.shutdown()); - } - - public static getInstance(): ConsumerManager { - if (!ConsumerManager.instance) { - ConsumerManager.instance = new ConsumerManager(); - } - return ConsumerManager.instance; - } - - public addWorker(worker: Worker) { - this.workers.push(worker); - } - - public async shutdown(): Promise { - console.log(`Shutting down ${this.workers.length} worker threads...`); - const stopPromises = this.workers.map((worker) => { - return new Promise((resolve, reject) => { - worker.once("exit", (code) => { - resolve(); - }); - worker.once("error", (error) => { - console.error("Worker error:", error); - reject(error); - }); - // Send a shutdown signal to the worker - worker.postMessage("shutdown"); - - // If worker doesn't exit after a timeout, terminate it forcefully - setTimeout(() => { - worker.terminate(); - resolve(); - }, 10000); // 10 seconds timeout - }); - }); - await Promise.all(stopPromises); - console.log("All worker threads shut down."); - } -}