diff --git a/packages/account-postgres-sink-service/src/env.ts b/packages/account-postgres-sink-service/src/env.ts index 39ecb8724..9c3763999 100644 --- a/packages/account-postgres-sink-service/src/env.ts +++ b/packages/account-postgres-sink-service/src/env.ts @@ -27,8 +27,8 @@ export const USE_SUBSTREAM = getEnvBoolean("USE_SUBSTREAM"); export const SUBSTREAM_API_KEY = process.env.SUBSTREAM_API_KEY; export const SUBSTREAM_URL = process.env.SUBSTREAM_URL; export const SUBSTREAM = process.env.SUBSTREAM; -export const SUBSTREAM_CURSOR_MAX_AGE_DAYS = - Number(process.env.SUBSTREAM_CURSOR_MAX_AGE_DAYS) || 5; +export const SUBSTREAM_CURSOR_STALENESS_THRESHOLD_MS = + Number(process.env.SUBSTREAM_CURSOR_STALENESS_THRESHOLD_MS) || 5 * 60 * 1000; // 5 minutes default export const USE_KAFKA = getEnvBoolean("USE_KAFKA"); export const KAFKA_USER = process.env.KAFKA_USER; diff --git a/packages/account-postgres-sink-service/src/services/substream.ts b/packages/account-postgres-sink-service/src/services/substream.ts index f2afc647c..5e0fc3cd0 100644 --- a/packages/account-postgres-sink-service/src/services/substream.ts +++ b/packages/account-postgres-sink-service/src/services/substream.ts @@ -18,7 +18,7 @@ import { SUBSTREAM, SUBSTREAM_API_KEY, SUBSTREAM_URL, - SUBSTREAM_CURSOR_MAX_AGE_DAYS, + SUBSTREAM_CURSOR_STALENESS_THRESHOLD_MS, } from "../env"; import { getPluginsByAccountTypeByProgram } from "../plugins"; import { IConfig } from "../types"; @@ -29,13 +29,80 @@ import { provider } from "../utils/solana"; const MODULE = "filtered_accounts"; const MAX_RECONNECT_ATTEMPTS = 5; -interface IOuputAccount { +interface IOutputAccount { owner: Buffer; address: Buffer; data: Buffer; deleted: boolean; } +export const CursorManager = (stalenessThreshold: number) => { + let checkInterval: NodeJS.Timeout | undefined; + + const formatStaleness = (staleness: number): string => { + const stalenessInHours = staleness / 3600000; + return stalenessInHours >= 1 + ? `${stalenessInHours.toFixed(1)}h` + : `${(staleness / 60000).toFixed(1)}m`; + }; + + const getLatestCursor = async (): Promise => + await Cursor.findOne({ order: [["createdAt", "DESC"]] }); + + const updateCursor = async (cursor: string): Promise => { + await database.transaction(async (t) => { + await Cursor.upsert({ cursor }, { transaction: t }); + await Cursor.destroy({ + where: { + cursor: { [Op.ne]: cursor }, + }, + transaction: t, + }); + }); + }; + + const checkStaleness = async ( + onStale?: () => void + ): Promise => { + const cursor = await getLatestCursor(); + if (!cursor) return undefined; + + const staleness = + Date.now() - new Date(cursor.dataValues.createdAt).getTime(); + if (staleness >= stalenessThreshold) { + console.log( + `Cursor is stale (${formatStaleness( + staleness + )} old), connecting from current block` + ); + onStale && onStale(); + return undefined; + } + + return cursor.cursor; + }; + + const startStalenessCheck = (onStale: () => void): void => { + if (checkInterval) clearInterval(checkInterval); + checkInterval = setInterval(() => checkStaleness(onStale), 30_000); + }; + + const stopStalenessCheck = (): void => { + if (checkInterval) { + clearInterval(checkInterval); + checkInterval = undefined; + } + }; + + return { + getLatestCursor, + updateCursor, + checkStaleness, + startStalenessCheck, + stopStalenessCheck, + }; +}; + export const setupSubstream = async ( server: FastifyInstance, configs: IConfig[] @@ -63,7 +130,8 @@ export const setupSubstream = async ( substream.modules!.modules ); - let isReconnecting = false; + let isConnecting = false; + const cursorManager = CursorManager(SUBSTREAM_CURSOR_STALENESS_THRESHOLD_MS); const pluginsByAccountTypeByProgram = await getPluginsByAccountTypeByProgram( configs ); @@ -76,34 +144,18 @@ export const setupSubstream = async ( process.exit(1); } - await Cursor.sync({ alter: true }); - const lastCursor = await Cursor.findOne({ order: [["createdAt", "DESC"]] }); - let cursor: string | undefined; + if (isConnecting) return; + isConnecting = true; try { - console.log("Connected to Substream"); - if (lastCursor) { - const cursorDate = new Date(lastCursor.dataValues.createdAt); - const cursorAge = - (Date.now() - cursorDate.getTime()) / (24 * 60 * 60 * 1000); - if (cursorAge >= SUBSTREAM_CURSOR_MAX_AGE_DAYS) { - console.log( - `Cursor is ${cursorAge.toFixed( - 1 - )} days old, starting from current block` - ); - cursor = undefined; - } else { - cursor = lastCursor.cursor; - console.log( - `Using existing cursor from ${cursorAge.toFixed(1)} days ago` - ); - } - } else { - cursor = undefined; - console.log("No existing cursor found, starting from current block"); - } + await Cursor.sync({ alter: true }); + const cursor = await cursorManager.checkStaleness(); + cursorManager.startStalenessCheck(() => { + handleReconnect(0); + }); + + console.log("Connected to Substream"); const currentBlock = await provider.connection.getSlot("finalized"); const request = createRequest({ substreamPackage: substream, @@ -120,7 +172,7 @@ export const setupSubstream = async ( ); attemptCount = 0; - isReconnecting = false; + isConnecting = false; for await (const response of streamBlocks(transport, request)) { const message = response.message; @@ -136,7 +188,7 @@ export const setupSubstream = async ( const cursor = message.value.cursor; if (output !== undefined && !isEmptyMessage(output)) { const accountPromises = (output as any).accounts - .map(async (account: IOuputAccount) => { + .map(async (account: IOutputAccount) => { const { owner, address, data, deleted } = account; const ownerKey = new PublicKey(owner); const addressKey = new PublicKey(address); @@ -162,17 +214,7 @@ export const setupSubstream = async ( .filter(Boolean); await Promise.all(accountPromises); - await database.transaction(async (t) => { - await Cursor.upsert({ cursor }, { transaction: t }); - await Cursor.destroy({ - where: { - cursor: { - [Op.ne]: cursor, - }, - }, - transaction: t, - }); - }); + await cursorManager.updateCursor(cursor); } } catch (err) { console.error("Substream error:", err); @@ -181,9 +223,10 @@ export const setupSubstream = async ( } } } catch (err) { + cursorManager.stopStalenessCheck(); console.log("Substream connection error:", err); - if (!isReconnecting) { - isReconnecting = true; + if (!isConnecting) { + isConnecting = true; handleReconnect(attemptCount + 1); } }