Skip to content

Commit

Permalink
singleton for shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
maamalama committed Sep 18, 2024
1 parent 8e641c3 commit f5d4797
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 156 deletions.
2 changes: 1 addition & 1 deletion valhalla/jawn/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import * as publicSwaggerDoc from "./tsoa-build/public/swagger.json";
import { initLogs } from "./utils/injectLogs";
import { initSentry } from "./utils/injectSentry";
import { startConsumers } from "./workers/consumerInterface";
import { ShutdownService } from "./managers/shutdown/ShutdownService";
import { ShutdownService } from "./lib/shared/ShutdownService";

export const ENVIRONMENT: "production" | "development" = (process.env
.VERCEL_ENV ?? "development") as any;
Expand Down
74 changes: 74 additions & 0 deletions valhalla/jawn/src/lib/shared/ShutdownService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
export class ShutdownService {
private static instance: ShutdownService;
private delayedOperations: Map<NodeJS.Timeout, () => Promise<any>> =
new Map();
private static readonly SHUTDOWN_TIMEOUT = 120000; // 120 seconds timeout


public static getInstance(): ShutdownService {
if (!ShutdownService.instance) {
ShutdownService.instance = new ShutdownService();
}
return ShutdownService.instance;
}

public addDelayedOperation(
timeoutId: NodeJS.Timeout,
operation: () => Promise<any>
): void {
this.delayedOperations.set(timeoutId, operation);
}

public static getTimeoutId(
operation: () => Promise<any>,
delayMs: number
): NodeJS.Timeout {
return setTimeout(() => {
operation().catch((error) => {
console.error("Error in delayed operation:", error);
});
}, delayMs);
}

public async executeShutdown(): Promise<void> {
console.log("Executing shutdown handlers...");
try {
// Clear all timeouts and collect operations
const operations = Array.from(this.delayedOperations.entries());
this.delayedOperations.clear();

for (const [timeoutId, operation] of operations) {
clearTimeout(timeoutId);
}

// Execute delayed operations
await Promise.all([
Promise.all(
operations.map(([timeoutId, op]) =>
op()
.catch((error) => {
console.error("Error in delayed operation:", error);
})
.finally(() => {
this.removeDelayedOperation(timeoutId);
})
)
),
new Promise((_, reject) =>
setTimeout(
() => reject(new Error("Shutdown timed out")),
ShutdownService.SHUTDOWN_TIMEOUT
)
),
]);

console.log("All shutdown handlers executed successfully.");
} catch (error) {
console.error("Error during shutdown:", error);
console.log("Shutdown process completed with errors or timed out.");
}
}
public removeDelayedOperation(timeoutId: NodeJS.Timeout): void {
this.delayedOperations.delete(timeoutId);
}
}
94 changes: 38 additions & 56 deletions valhalla/jawn/src/managers/score/ScoreManager.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import { err, ok, Result } from "../../lib/shared/result";
import { BaseManager } from "../BaseManager";
import { AuthParams } from "../../lib/db/supabase";
import { BatchScores, Score, ScoreStore } from "../../lib/stores/ScoreStore";
import { Score, ScoreStore } from "../../lib/stores/ScoreStore";
import { dataDogClient } from "../../lib/clients/DataDogClient";
import { KafkaProducer } from "../../lib/clients/KafkaProducer";
import { HeliconeScoresMessage } from "../../lib/handlers/HandlerContext";
import * as Sentry from "@sentry/node";
import { ShutdownService } from "../shutdown/ShutdownService";
import { clearTimeout } from "timers";
import { ShutdownableManager } from "../shutdown/ShutdownableManager";
import { ShutdownService } from "../../lib/shared/ShutdownService";

type Scores = Record<string, number | boolean>;
const delayMs = 10 * 60 * 1000; // 10 minutes in milliseconds
Expand All @@ -17,16 +14,11 @@ export interface ScoreRequest {
scores: Scores;
}

export class ScoreManager extends ShutdownableManager {
export class ScoreManager {
private scoreStore: ScoreStore;
private kafkaProducer: KafkaProducer;
private delayedOperations: Map<NodeJS.Timeout, () => Promise<void>> =
new Map();
private static readonly MAX_OPERATION_TIME = 10 * 60 * 1000; // 10 minutes in milliseconds
private static readonly SHUTDOWN_TIMEOUT = 60000; // 60 seconds timeout

constructor(authParams: AuthParams) {
super(authParams);
constructor(private authParams: AuthParams) {
this.scoreStore = new ScoreStore(authParams.organizationId);
this.kafkaProducer = new KafkaProducer();
}
Expand Down Expand Up @@ -56,7 +48,22 @@ export class ScoreManager extends ShutdownableManager {
): Promise<Result<null, string>> {
if (!this.kafkaProducer.isKafkaEnabled()) {
console.log("Kafka is not enabled. Using score manager");
const delayedOperation = this.createDelayedOperation(() =>

// Schedule the delayed operation and register it with ShutdownService
const timeoutId = ShutdownService.getTimeoutId(() => {
return this.handleScores(
{
batchId: "",
partition: 0,
lastOffset: "",
messageCount: 1,
},
scoresMessage
);
}, delayMs);

// Register the timeout and operation with ShutdownService
ShutdownService.getInstance().addDelayedOperation(timeoutId, () =>
this.handleScores(
{
batchId: "",
Expand All @@ -67,28 +74,30 @@ export class ScoreManager extends ShutdownableManager {
scoresMessage
)
);

return ok(null);
}
console.log("Sending scores message to Kafka");

// const delayedOperation = this.createDelayedOperation(() =>
// this.kafkaProducer.sendScoresMessage(
// scoresMessage,
// "helicone-scores-prod"
// )
// );
// this.delayedOperations.push(delayedOperation);

return ok(null);
}
console.log("Sending scores message to Kafka");

private createDelayedOperation(operation: () => Promise<void>): void {
// Schedule the Kafka send operation and register it with ShutdownService
const timeoutId = setTimeout(() => {
operation().finally(() => {
this.delayedOperations.delete(timeoutId);
});
this.kafkaProducer
.sendScoresMessage(scoresMessage, "helicone-scores-prod")
.catch((error) => {
console.error("Error sending scores message to Kafka:", error);
});
}, delayMs);
this.delayedOperations.set(timeoutId, operation);

// Register the timeout and operation with ShutdownService
ShutdownService.getInstance().addDelayedOperation(timeoutId, () =>
this.kafkaProducer.sendScoresMessage(
scoresMessage,
"helicone-scores-prod"
)
);

return ok(null);
}

private async procesScores(
Expand Down Expand Up @@ -247,31 +256,4 @@ export class ScoreManager extends ShutdownableManager {
};
});
}

public async shutdown(): Promise<void> {
console.log(
`Executing ${this.delayedOperations.size} delayed operations in ScoreManager immediately...`
);
const operations = Array.from(this.delayedOperations.entries());
this.delayedOperations.clear();

for (const [timeoutId, operation] of operations) {
clearTimeout(timeoutId);
}

try {
await Promise.race([
Promise.all(operations.map(([_, op]) => op())),
new Promise((resolve) =>
setTimeout(resolve, ScoreManager.SHUTDOWN_TIMEOUT)
),
]);
} catch (error) {
console.error("Error during ScoreManager shutdown:", error);
}

console.log(
"All delayed operations in ScoreManager completed or timed out."
);
}
}
39 changes: 0 additions & 39 deletions valhalla/jawn/src/managers/shutdown/ShutdownService.ts

This file was deleted.

12 changes: 0 additions & 12 deletions valhalla/jawn/src/managers/shutdown/ShutdownableManager.ts

This file was deleted.

48 changes: 0 additions & 48 deletions valhalla/jawn/src/workers/consumerManager.ts

This file was deleted.

0 comments on commit f5d4797

Please sign in to comment.