Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix bugs in stream message handling and minor logging improvements #438

Merged
merged 4 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions runner/src/indexer/__snapshots__/indexer.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ exports[`Indexer unit tests Indexer.runFunctions() allows imperative execution o
[
"mock-hasura-endpoint/v1/graphql",
{
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":82699904,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}",
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":82699904,"message":"Running function buildnear.testnet/test on block 82699904, lag is: NaNms from block timestamp"}}",
"headers": {
"Content-Type": "application/json",
"X-Hasura-Admin-Secret": "mock-hasura-secret",
Expand Down Expand Up @@ -90,7 +90,7 @@ exports[`Indexer unit tests Indexer.runFunctions() catches errors 1`] = `
[
"mock-hasura-endpoint/v1/graphql",
{
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}",
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test on block 456, lag is: NaNms from block timestamp"}}",
"headers": {
"Content-Type": "application/json",
"X-Hasura-Admin-Secret": "mock-hasura-secret",
Expand Down Expand Up @@ -147,7 +147,7 @@ exports[`Indexer unit tests Indexer.runFunctions() logs provisioning failures 1`
[
"mock-hasura-endpoint/v1/graphql",
{
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test, lag is: NaNms from block timestamp"}}",
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test on block 82699904, lag is: NaNms from block timestamp"}}",
"headers": {
"Content-Type": "application/json",
"X-Hasura-Admin-Secret": "mock-hasura-secret",
Expand Down Expand Up @@ -217,7 +217,7 @@ exports[`Indexer unit tests Indexer.runFunctions() should execute all functions
[
"mock-hasura-endpoint/v1/graphql",
{
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}",
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test on block 456, lag is: NaNms from block timestamp"}}",
"headers": {
"Content-Type": "application/json",
"X-Hasura-Admin-Secret": "mock-hasura-secret",
Expand Down Expand Up @@ -274,7 +274,7 @@ exports[`Indexer unit tests Indexer.runFunctions() supplies the required role to
[
"mock-hasura-endpoint/v1/graphql",
{
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test, lag is: NaNms from block timestamp"}}",
"body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test on block 82699904, lag is: NaNms from block timestamp"}}",
"headers": {
"Content-Type": "application/json",
"X-Hasura-Admin-Secret": "mock-hasura-secret",
Expand Down
2 changes: 1 addition & 1 deletion runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export default class Indexer {
try {
const indexerFunction = functions[functionName];

const runningMessage = `Running function ${functionName}` + (isHistorical ? ' historical backfill' : `, lag is: ${lag?.toString()}ms from block timestamp`);
const runningMessage = `Running function ${functionName} on block ${blockHeight}` + (isHistorical ? ' historical backfill' : `, lag is: ${lag?.toString()}ms from block timestamp`);
console.log(runningMessage); // Print the running message to the console (Lambda logs)

simultaneousPromises.push(this.writeLog(functionName, blockHeight, runningMessage));
Expand Down
12 changes: 4 additions & 8 deletions runner/src/redis-client/redis-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,19 @@ describe('RedisClient', () => {
expect(mockClient.xDel).toHaveBeenCalledWith('streamKey', '1-1');
});

it('returns the range of messages after the passed id', async () => {
it('returns the number of messages in stream', async () => {
const mockClient = {
on: jest.fn(),
connect: jest.fn().mockResolvedValue(null),
xRange: jest.fn().mockResolvedValue([
'data'
]),
xLen: jest.fn().mockResolvedValue(2),
} as any;

const client = new RedisClient(mockClient);

const unprocessedMessages = await client.getUnprocessedStreamMessages('streamKey');

expect(mockClient.xRange).toHaveBeenCalledWith('streamKey', '0', '+');
expect(unprocessedMessages).toEqual([
'data'
]);
expect(mockClient.xLen).toHaveBeenCalledWith('streamKey');
expect(unprocessedMessages).toEqual(2);
});

it('returns stream storage data', async () => {
Expand Down
7 changes: 3 additions & 4 deletions runner/src/redis-client/redis-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ export default class RedisClient {

async getUnprocessedStreamMessages (
streamKey: string,
startId = this.SMALLEST_STREAM_ID,
): Promise<StreamMessage[]> {
const results = await this.client.xRange(streamKey, startId, this.LARGEST_STREAM_ID);
): Promise<number> {
const results = await this.client.xLen(streamKey);

return results as StreamMessage[];
return results;
};

async getStreamStorage (streamKey: string): Promise<StreamStorage> {
Expand Down
18 changes: 11 additions & 7 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async function handleStream (workerContext: WorkerContext, streamKey: string): P

function incrementId (id: string): string {
const [main, sequence] = id.split('-');
return `${Number(main) + 1}-${sequence}`;
return `${main}-${Number(sequence) + 1}`;
}

async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise<void> {
Expand All @@ -65,6 +65,7 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri
const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount);
if (messages == null) {
await sleep(100);
streamMessageStartId = '0';
continue;
}
console.log(`Fetched ${messages?.length} messages from stream ${streamKey}`);
Expand All @@ -83,11 +84,13 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
const isHistorical = workerContext.streamType === 'historical';
let streamMessageId = '';
let indexerName = '';
let currBlockHeight = 0;

while (true) {
try {
while (workerContext.queue.length === 0) {
if (workerContext.queue.length === 0) {
await sleep(100);
continue;
}
const startTime = performance.now();
const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey);
Expand All @@ -107,6 +110,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
continue;
}
const block = queueMessage.block;
currBlockHeight = block.blockHeight;
streamMessageId = queueMessage.streamMessageId;

if (block === undefined || block.blockHeight == null) {
Expand All @@ -121,15 +125,15 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri

METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - startTime);

METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight);
METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(currBlockHeight);

console.log(`Success: ${indexerName}`);
console.log(`Success: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}}`);
} catch (err) {
await sleep(10000);
console.log(`Failed: ${indexerName}`, err);
console.log(`Failed: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}`, err);
} finally {
const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey, streamMessageId);
METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages?.length ?? 0);
const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey);
METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages);
gabehamilton marked this conversation as resolved.
Show resolved Hide resolved

parentPort?.postMessage(await promClient.register.getMetricsAsJSON());
}
Expand Down