Skip to content

Commit

Permalink
Add substream cursor reuse strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
bryzettler committed Dec 17, 2024
1 parent 90b359d commit 4996f97
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
2 changes: 2 additions & 0 deletions packages/account-postgres-sink-service/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ export const USE_SUBSTREAM = getEnvBoolean("USE_SUBSTREAM");
export const SUBSTREAM_API_KEY = process.env.SUBSTREAM_API_KEY;
export const SUBSTREAM_URL = process.env.SUBSTREAM_URL;
export const SUBSTREAM = process.env.SUBSTREAM;
export const SUBSTREAM_CURSOR_MAX_AGE_DAYS =
Number(process.env.SUBSTREAM_CURSOR_MAX_AGE_DAYS) || 5;

export const USE_KAFKA = getEnvBoolean("USE_KAFKA");
export const KAFKA_USER = process.env.KAFKA_USER;
Expand Down
31 changes: 27 additions & 4 deletions packages/account-postgres-sink-service/src/services/substream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
SUBSTREAM,
SUBSTREAM_API_KEY,
SUBSTREAM_URL,
SUBSTREAM_CURSOR_MAX_AGE_DAYS,
} from "../env";
import { getPluginsByAccountTypeByProgram } from "../plugins";
import { IConfig } from "../types";
Expand Down Expand Up @@ -77,9 +78,33 @@ export const setupSubstream = async (

await Cursor.sync({ alter: true });
const lastCursor = await Cursor.findOne({ order: [["createdAt", "DESC"]] });
let cursor: string | undefined;

try {
let cursor = lastCursor?.cursor;
console.log("Connected to Substream");
if (lastCursor) {
const cursorDate = new Date(lastCursor.dataValues.createdAt);
const cursorAge =
(Date.now() - cursorDate.getTime()) / (24 * 60 * 60 * 1000);

if (cursorAge > SUBSTREAM_CURSOR_MAX_AGE_DAYS) {
console.log(
`Cursor is ${Math.floor(
cursorAge
)} days old, starting from current block`
);
cursor = undefined;
} else {
cursor = lastCursor.cursor;
console.log(
`Using existing cursor from ${Math.floor(cursorAge)} days ago`
);
}
} else {
cursor = undefined;
console.log("No existing cursor found, starting from current block");
}

const currentBlock = await provider.connection.getSlot("finalized");
const request = createRequest({
substreamPackage: substream,
Expand All @@ -89,10 +114,9 @@ export const setupSubstream = async (
startCursor: cursor,
});

console.log("Connected to Substream");
console.log(
`Substream: Streaming from ${
lastCursor ? `cursor ${lastCursor.cursor}` : `block ${currentBlock}`
cursor ? `cursor ${cursor}` : `block ${currentBlock}`
}`
);

Expand All @@ -111,7 +135,6 @@ export const setupSubstream = async (
try {
const output = unpackMapOutput(response, registry);
const cursor = message.value.cursor;

if (output !== undefined && !isEmptyMessage(output)) {
const accountPromises = (output as any).accounts
.map(async (account: IOuputAccount) => {
Expand Down

0 comments on commit 4996f97

Please sign in to comment.