Skip to content

Commit

Permalink
fix: Fix bugs in stream message handling and minor logging improvemen…
Browse files Browse the repository at this point in the history
…ts (#438)

It was observed that there are three related bugs in stream message
processing:

1. In some cases, the stream will increment the sequence and not the
main number, causing incrementId to skip messages.
2. The current stream ID is used when calling xRange, giving an
inaccurate count of messages if any skips occur.
3. Unprocessed messages are no longer reported when the queue of blocks
ie empty. This occasionally led to a non-zero message count being
scraped and then left that way even when the stream was in fact zero.

In addition, xRange takes a significant amount of time to run depending
on the size of the queue. This impacts all other operations since redis
is single-threaded. xLen takes much less time to call while also
accurately returning the count of messages in the stream.


To resolve the bugs, I now increment the sequence instead of the main
ID. In addition, if no more stream messages are being fetched, the
stream message ID is reset to '0' to ensure new messages are collected
regardless of their ID. Finally, I replaced xRange with xLen. I also
changed a blocking while loop to an if statement in consumer so that if
the queue is empty, it continues, which triggers the finally.

I made some small additions to logging statements to include the indexer
type and block number for success/failure to help diagnose problems with
indexers in the future.
  • Loading branch information
darunrs authored Nov 30, 2023
1 parent 24dd0df commit 98022a1
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 27 deletions.
10 changes: 5 additions & 5 deletions runner/src/indexer/__snapshots__/indexer.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ exports[`Indexer unit tests Indexer.runFunctions() allows imperative execution o
[
"mock-hasura-endpoint/v1/graphql",
{
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":82699904,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}",
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":82699904,"message":"Running function buildnear.testnet/test on block 82699904, lag is: NaNms from block timestamp"}}",
"headers": {
"Content-Type": "application/json",
"X-Hasura-Admin-Secret": "mock-hasura-secret",
Expand Down Expand Up @@ -90,7 +90,7 @@ exports[`Indexer unit tests Indexer.runFunctions() catches errors 1`] = `
[
"mock-hasura-endpoint/v1/graphql",
{
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}",
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test on block 456, lag is: NaNms from block timestamp"}}",
"headers": {
"Content-Type": "application/json",
"X-Hasura-Admin-Secret": "mock-hasura-secret",
Expand Down Expand Up @@ -147,7 +147,7 @@ exports[`Indexer unit tests Indexer.runFunctions() logs provisioning failures 1`
[
"mock-hasura-endpoint/v1/graphql",
{
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test, lag is: NaNms from block timestamp"}}",
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test on block 82699904, lag is: NaNms from block timestamp"}}",
"headers": {
"Content-Type": "application/json",
"X-Hasura-Admin-Secret": "mock-hasura-secret",
Expand Down Expand Up @@ -217,7 +217,7 @@ exports[`Indexer unit tests Indexer.runFunctions() should execute all functions
[
"mock-hasura-endpoint/v1/graphql",
{
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}",
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test on block 456, lag is: NaNms from block timestamp"}}",
"headers": {
"Content-Type": "application/json",
"X-Hasura-Admin-Secret": "mock-hasura-secret",
Expand Down Expand Up @@ -274,7 +274,7 @@ exports[`Indexer unit tests Indexer.runFunctions() supplies the required role to
[
"mock-hasura-endpoint/v1/graphql",
{
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test, lag is: NaNms from block timestamp"}}",
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test on block 82699904, lag is: NaNms from block timestamp"}}",
"headers": {
"Content-Type": "application/json",
"X-Hasura-Admin-Secret": "mock-hasura-secret",
Expand Down
2 changes: 1 addition & 1 deletion runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export default class Indexer {
try {
const indexerFunction = functions[functionName];

const runningMessage = `Running function ${functionName}` + (isHistorical ? ' historical backfill' : `, lag is: ${lag?.toString()}ms from block timestamp`);
const runningMessage = `Running function ${functionName} on block ${blockHeight}` + (isHistorical ? ' historical backfill' : `, lag is: ${lag?.toString()}ms from block timestamp`);
console.log(runningMessage); // Print the running message to the console (Lambda logs)

simultaneousPromises.push(this.writeLog(functionName, blockHeight, runningMessage));
Expand Down
14 changes: 5 additions & 9 deletions runner/src/redis-client/redis-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,19 @@ describe('RedisClient', () => {
expect(mockClient.xDel).toHaveBeenCalledWith('streamKey', '1-1');
});

it('returns the range of messages after the passed id', async () => {
it('returns the number of messages in stream', async () => {
const mockClient = {
on: jest.fn(),
connect: jest.fn().mockResolvedValue(null),
xRange: jest.fn().mockResolvedValue([
'data'
]),
xLen: jest.fn().mockResolvedValue(2),
} as any;

const client = new RedisClient(mockClient);

const unprocessedMessages = await client.getUnprocessedStreamMessages('streamKey');
const unprocessedMessageCount = await client.getUnprocessedStreamMessageCount('streamKey');

expect(mockClient.xRange).toHaveBeenCalledWith('streamKey', '0', '+');
expect(unprocessedMessages).toEqual([
'data'
]);
expect(mockClient.xLen).toHaveBeenCalledWith('streamKey');
expect(unprocessedMessageCount).toEqual(2);
});

it('returns stream storage data', async () => {
Expand Down
9 changes: 4 additions & 5 deletions runner/src/redis-client/redis-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,12 @@ export default class RedisClient {
await this.client.xDel(streamKey, id);
};

async getUnprocessedStreamMessages (
async getUnprocessedStreamMessageCount (
streamKey: string,
startId = this.SMALLEST_STREAM_ID,
): Promise<StreamMessage[]> {
const results = await this.client.xRange(streamKey, startId, this.LARGEST_STREAM_ID);
): Promise<number> {
const results = await this.client.xLen(streamKey);

return results as StreamMessage[];
return results;
};

async getStreamStorage (streamKey: string): Promise<StreamStorage> {
Expand Down
18 changes: 11 additions & 7 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async function handleStream (workerContext: WorkerContext, streamKey: string): P

function incrementId (id: string): string {
const [main, sequence] = id.split('-');
return `${Number(main) + 1}-${sequence}`;
return `${main}-${Number(sequence) + 1}`;
}

async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise<void> {
Expand All @@ -65,6 +65,7 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri
const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount);
if (messages == null) {
await sleep(100);
streamMessageStartId = '0';
continue;
}
console.log(`Fetched ${messages?.length} messages from stream ${streamKey}`);
Expand All @@ -83,11 +84,13 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
const isHistorical = workerContext.streamType === 'historical';
let streamMessageId = '';
let indexerName = '';
let currBlockHeight = 0;

while (true) {
try {
while (workerContext.queue.length === 0) {
if (workerContext.queue.length === 0) {
await sleep(100);
continue;
}
const startTime = performance.now();
const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey);
Expand All @@ -107,6 +110,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
continue;
}
const block = queueMessage.block;
currBlockHeight = block.blockHeight;
streamMessageId = queueMessage.streamMessageId;

if (block === undefined || block.blockHeight == null) {
Expand All @@ -121,15 +125,15 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri

METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - startTime);

METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight);
METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(currBlockHeight);

console.log(`Success: ${indexerName}`);
console.log(`Success: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}}`);
} catch (err) {
await sleep(10000);
console.log(`Failed: ${indexerName}`, err);
console.log(`Failed: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}`, err);
} finally {
const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey, streamMessageId);
METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages?.length ?? 0);
const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(streamKey);
METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessageCount);

parentPort?.postMessage(await promClient.register.getMetricsAsJSON());
}
Expand Down

0 comments on commit 98022a1

Please sign in to comment.