Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for cursor staleness #761

Merged
merged 1 commit into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/account-postgres-sink-service/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ export const USE_SUBSTREAM = getEnvBoolean("USE_SUBSTREAM");
export const SUBSTREAM_API_KEY = process.env.SUBSTREAM_API_KEY;
export const SUBSTREAM_URL = process.env.SUBSTREAM_URL;
export const SUBSTREAM = process.env.SUBSTREAM;
export const SUBSTREAM_CURSOR_MAX_AGE_DAYS =
Number(process.env.SUBSTREAM_CURSOR_MAX_AGE_DAYS) || 5;
export const SUBSTREAM_CURSOR_STALENESS_THRESHOLD_MS =
Number(process.env.SUBSTREAM_CURSOR_STALENESS_THRESHOLD_MS) || 5 * 60 * 1000; // 5 minutes default

export const USE_KAFKA = getEnvBoolean("USE_KAFKA");
export const KAFKA_USER = process.env.KAFKA_USER;
Expand Down
129 changes: 86 additions & 43 deletions packages/account-postgres-sink-service/src/services/substream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
SUBSTREAM,
SUBSTREAM_API_KEY,
SUBSTREAM_URL,
SUBSTREAM_CURSOR_MAX_AGE_DAYS,
SUBSTREAM_CURSOR_STALENESS_THRESHOLD_MS,
} from "../env";
import { getPluginsByAccountTypeByProgram } from "../plugins";
import { IConfig } from "../types";
Expand All @@ -29,13 +29,80 @@ import { provider } from "../utils/solana";
const MODULE = "filtered_accounts";
const MAX_RECONNECT_ATTEMPTS = 5;

interface IOuputAccount {
interface IOutputAccount {
owner: Buffer;
address: Buffer;
data: Buffer;
deleted: boolean;
}

export const CursorManager = (stalenessThreshold: number) => {
let checkInterval: NodeJS.Timeout | undefined;

const formatStaleness = (staleness: number): string => {
const stalenessInHours = staleness / 3600000;
return stalenessInHours >= 1
? `${stalenessInHours.toFixed(1)}h`
: `${(staleness / 60000).toFixed(1)}m`;
};

const getLatestCursor = async (): Promise<Cursor | null> =>
await Cursor.findOne({ order: [["createdAt", "DESC"]] });

const updateCursor = async (cursor: string): Promise<void> => {
await database.transaction(async (t) => {
await Cursor.upsert({ cursor }, { transaction: t });
await Cursor.destroy({
where: {
cursor: { [Op.ne]: cursor },
},
transaction: t,
});
});
};

const checkStaleness = async (
onStale?: () => void
): Promise<string | undefined> => {
const cursor = await getLatestCursor();
if (!cursor) return undefined;

const staleness =
Date.now() - new Date(cursor.dataValues.createdAt).getTime();
if (staleness >= stalenessThreshold) {
console.log(
`Cursor is stale (${formatStaleness(
staleness
)} old), connecting from current block`
);
onStale && onStale();
return undefined;
}

return cursor.cursor;
};

const startStalenessCheck = (onStale: () => void): void => {
if (checkInterval) clearInterval(checkInterval);
checkInterval = setInterval(() => checkStaleness(onStale), 30_000);
};

const stopStalenessCheck = (): void => {
if (checkInterval) {
clearInterval(checkInterval);
checkInterval = undefined;
}
};

return {
getLatestCursor,
updateCursor,
checkStaleness,
startStalenessCheck,
stopStalenessCheck,
};
};

export const setupSubstream = async (
server: FastifyInstance,
configs: IConfig[]
Expand Down Expand Up @@ -63,7 +130,8 @@ export const setupSubstream = async (
substream.modules!.modules
);

let isReconnecting = false;
let isConnecting = false;
const cursorManager = CursorManager(SUBSTREAM_CURSOR_STALENESS_THRESHOLD_MS);
const pluginsByAccountTypeByProgram = await getPluginsByAccountTypeByProgram(
configs
);
Expand All @@ -76,34 +144,18 @@ export const setupSubstream = async (
process.exit(1);
}

await Cursor.sync({ alter: true });
const lastCursor = await Cursor.findOne({ order: [["createdAt", "DESC"]] });
let cursor: string | undefined;
if (isConnecting) return;
isConnecting = true;

try {
console.log("Connected to Substream");
if (lastCursor) {
const cursorDate = new Date(lastCursor.dataValues.createdAt);
const cursorAge =
(Date.now() - cursorDate.getTime()) / (24 * 60 * 60 * 1000);
if (cursorAge >= SUBSTREAM_CURSOR_MAX_AGE_DAYS) {
console.log(
`Cursor is ${cursorAge.toFixed(
1
)} days old, starting from current block`
);
cursor = undefined;
} else {
cursor = lastCursor.cursor;
console.log(
`Using existing cursor from ${cursorAge.toFixed(1)} days ago`
);
}
} else {
cursor = undefined;
console.log("No existing cursor found, starting from current block");
}
await Cursor.sync({ alter: true });
const cursor = await cursorManager.checkStaleness();

cursorManager.startStalenessCheck(() => {
handleReconnect(0);
});

console.log("Connected to Substream");
const currentBlock = await provider.connection.getSlot("finalized");
const request = createRequest({
substreamPackage: substream,
Expand All @@ -120,7 +172,7 @@ export const setupSubstream = async (
);

attemptCount = 0;
isReconnecting = false;
isConnecting = false;

for await (const response of streamBlocks(transport, request)) {
const message = response.message;
Expand All @@ -136,7 +188,7 @@ export const setupSubstream = async (
const cursor = message.value.cursor;
if (output !== undefined && !isEmptyMessage(output)) {
const accountPromises = (output as any).accounts
.map(async (account: IOuputAccount) => {
.map(async (account: IOutputAccount) => {
const { owner, address, data, deleted } = account;
const ownerKey = new PublicKey(owner);
const addressKey = new PublicKey(address);
Expand All @@ -162,17 +214,7 @@ export const setupSubstream = async (
.filter(Boolean);

await Promise.all(accountPromises);
await database.transaction(async (t) => {
await Cursor.upsert({ cursor }, { transaction: t });
await Cursor.destroy({
where: {
cursor: {
[Op.ne]: cursor,
},
},
transaction: t,
});
});
await cursorManager.updateCursor(cursor);
}
} catch (err) {
console.error("Substream error:", err);
Expand All @@ -181,9 +223,10 @@ export const setupSubstream = async (
}
}
} catch (err) {
cursorManager.stopStalenessCheck();
console.log("Substream connection error:", err);
if (!isReconnecting) {
isReconnecting = true;
if (!isConnecting) {
isConnecting = true;
handleReconnect(attemptCount + 1);
}
}
Expand Down
Loading