Skip to content

Commit

Permalink
dont kill node process (#735)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryzettler authored Nov 15, 2024
1 parent 3668bfa commit b45933c
Showing 1 changed file with 132 additions and 122 deletions.
254 changes: 132 additions & 122 deletions packages/account-postgres-sink-service/src/services/yellowstone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import Client, {
SubscribeUpdate,
SubscribeUpdateAccount,
} from "@triton-one/yellowstone-grpc";
import retry, { Options as RetryOptions } from "async-retry";
import { FastifyInstance } from "fastify";
import { YELLOWSTONE_TOKEN, YELLOWSTONE_URL } from "../env";
import { getPluginsByAccountTypeByProgram } from "../plugins";
Expand All @@ -14,6 +13,7 @@ import { convertYellowstoneTransaction } from "../utils/convertYellowstoneTransa
import { handleAccountWebhook } from "../utils/handleAccountWebhook";
import { handleTransactionWebhook } from "../utils/handleTransactionWebhook";

const MAX_RECONNECT_ATTEMPTS = 5;
export const setupYellowstone = async (
server: FastifyInstance,
configs: IConfig[]
Expand All @@ -22,142 +22,152 @@ export const setupYellowstone = async (
throw new Error("YELLOWSTONE_TOKEN undefined");
}

let isReconnecting = false;
const pluginsByAccountTypeByProgram = await getPluginsByAccountTypeByProgram(
configs
);

const connect = async () => {
await retry(
async (bail, attempt) => {
const client = new Client(YELLOWSTONE_URL, YELLOWSTONE_TOKEN, {
"grpc.max_receive_message_length": 2065853043,
"grpc.keepalive_time_ms": 10000,
"grpc.keepalive_timeout_ms": 5000,
"grpc.keepalive_permit_without_calls": 1,
});
const connect = async (attemptCount = 0) => {
if (attemptCount >= MAX_RECONNECT_ATTEMPTS) {
console.error(
`Yellowstone failed to connect after ${MAX_RECONNECT_ATTEMPTS} attempts.`
);
process.exit(1);
}

const client = new Client(YELLOWSTONE_URL, YELLOWSTONE_TOKEN, {
"grpc.max_receive_message_length": 2065853043,
"grpc.keepalive_time_ms": 10000,
"grpc.keepalive_timeout_ms": 5000,
"grpc.keepalive_permit_without_calls": 1,
});

try {
const stream = await client.subscribe();
console.log("Connected to Yellowstone");
attemptCount = 0;
isReconnecting = false;

stream.on("data", async (data: SubscribeUpdate) => {
try {
const stream = await client.subscribe();
console.log("Connected to Yellowstone");

stream.on("data", async (data: SubscribeUpdate) => {
try {
if (data.transaction) {
const transaction = await convertYellowstoneTransaction(
data.transaction.transaction
);

if (transaction) {
try {
await handleTransactionWebhook({
fastify: server,
configs,
transaction,
});
} catch (err) {
console.error(err);
}
}
if (data.transaction) {
const transaction = await convertYellowstoneTransaction(
data.transaction.transaction
);

if (transaction) {
try {
await handleTransactionWebhook({
fastify: server,
configs,
transaction,
});
} catch (err) {
console.error(err);
}
}
}

if (data.account) {
const account = (data.account as SubscribeUpdateAccount)
?.account;
if (account && configs) {
const owner = new PublicKey(account.owner).toBase58();
const config = configs.find((x) => x.programId === owner);

if (config) {
try {
await handleAccountWebhook({
fastify: server,
programId: new PublicKey(config.programId),
accounts: config.accounts,
account: {
...account,
pubkey: new PublicKey(account.pubkey).toBase58(),
data: [account.data],
},
pluginsByAccountType:
pluginsByAccountTypeByProgram[owner] || {},
});
} catch (err) {
console.error(err);
}
}
if (data.account) {
const account = (data.account as SubscribeUpdateAccount)?.account;
if (account && configs) {
const owner = new PublicKey(account.owner).toBase58();
const config = configs.find((x) => x.programId === owner);

if (config) {
try {
await handleAccountWebhook({
fastify: server,
programId: new PublicKey(config.programId),
accounts: config.accounts,
account: {
...account,
pubkey: new PublicKey(account.pubkey).toBase58(),
data: [account.data],
},
pluginsByAccountType:
pluginsByAccountTypeByProgram[owner] || {},
});
} catch (err) {
console.error(err);
}
}
} catch (err) {
console.error("Yellowstone: Error processing data:", err);
}
});

const request: SubscribeRequest = {
accounts: {
client: {
owner: configs.map((c) => c.programId),
account: [],
filters: [],
},
},
slots: {},
transactions: {
client: {
vote: false,
failed: false,
accountInclude: configs.map((c) => c.programId),
accountExclude: [],
accountRequired: [],
},
},
entry: {},
blocks: {},
blocksMeta: {},
accountsDataSlice: [],
ping: undefined,
commitment: CommitmentLevel.CONFIRMED,
};

stream.write(request, (err: any) => {
if (err) {
console.error(`Failed to write initial request: ${err}`);
throw err;
}
});

stream.on("error", (err) => {
console.error("Yellowstone stream error:", err);
throw err;
});

stream.on("end", () => {
console.log("Yellowstone stream ended");
throw new Error("Stream ended");
});

stream.on("close", () => {
console.log("Yellowstone stream closed");
throw new Error("Stream closed");
});
} catch (err) {
console.log(
`Yellowstone connection error on attempt ${attempt}:`,
err
);
if (attempt >= 5) {
bail(new Error(`Yellowstone failed to connect after 5 attempts.`));
}
} catch (err) {
console.error("Yellowstone: Error processing data:", err);
}
},
{
retries: 5,
factor: 1.1, // Small incremental delay
minTimeout: 0, // First retry with no delay
maxTimeout: 2000, // Cap max delay at 2 seconds
onRetry: (_, attempt) =>
console.log(`Attempting to reconnect (attempt ${attempt} of 5)`),
});

const request: SubscribeRequest = {
accounts: {
client: {
owner: configs.map((c) => c.programId),
account: [],
filters: [],
},
},
slots: {},
transactions: {
client: {
vote: false,
failed: false,
accountInclude: configs.map((c) => c.programId),
accountExclude: [],
accountRequired: [],
},
},
entry: {},
blocks: {},
blocksMeta: {},
accountsDataSlice: [],
ping: undefined,
commitment: CommitmentLevel.CONFIRMED,
};

stream.write(request, (err: any) => {
if (err) {
console.error(`Failed to write initial request: ${err}`);
stream.end();
}
});

stream.on("error", (err) => {
console.error("Yellowstone stream error:", err);
stream.end();
});

stream.on("end", () => {
console.log("Yellowstone stream ended");
if (!isReconnecting) {
isReconnecting = true;
handleReconnect(attemptCount + 1);
}
});

stream.on("close", () => {
console.log("Yellowstone stream closed");
if (!isReconnecting) {
isReconnecting = true;
handleReconnect(attemptCount + 1);
}
});
} catch (err) {
console.log("Yellowstone connection error:", err);
if (!isReconnecting) {
isReconnecting = true;
handleReconnect(attemptCount + 1);
}
}
};

const handleReconnect = async (nextAttempt: number) => {
console.log(
`Attempting to reconnect (attempt ${nextAttempt} of ${MAX_RECONNECT_ATTEMPTS})...`
);

const delay = nextAttempt === 1 ? 0 : 1000;
setTimeout(() => connect(nextAttempt), delay);
};

await connect();
Expand Down

0 comments on commit b45933c

Please sign in to comment.