From 2f5505b67d29d36d5330637bc4cb37759bc81d64 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 9 Nov 2023 15:32:39 +1300 Subject: [PATCH] Revert "Revert "feat: Pre-Fetch Streamer Messages"" (#378) Reverts near/queryapi#377 - Merging #269 back in --- docker-compose.yml | 16 + prometheus.yml | 7 + runner/src/indexer/indexer.test.ts | 435 ++++--------------- runner/src/indexer/indexer.ts | 98 +---- runner/src/lake-client/index.ts | 1 + runner/src/lake-client/lake-client.test.ts | 170 ++++++++ runner/src/lake-client/lake-client.ts | 90 ++++ runner/src/metrics.ts | 42 +- runner/src/redis-client/index.ts | 2 +- runner/src/redis-client/redis-client.test.ts | 20 +- runner/src/redis-client/redis-client.ts | 13 +- runner/src/stream-handler/worker.ts | 139 ++++-- 12 files changed, 545 insertions(+), 488 deletions(-) create mode 100644 prometheus.yml create mode 100644 runner/src/lake-client/index.ts create mode 100644 runner/src/lake-client/lake-client.test.ts create mode 100644 runner/src/lake-client/lake-client.ts diff --git a/docker-compose.yml b/docker-compose.yml index aa4dc242d..8911aa97e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -88,7 +88,23 @@ services: HASURA_GRAPHQL_ENABLED_LOG_TYPES: startup, http-log, webhook-log, websocket-log, query-log HASURA_GRAPHQL_ADMIN_SECRET: myadminsecretkey HASURA_GRAPHQL_AUTH_HOOK: http://hasura-auth:4000/auth + grafana: + image: grafana/grafana + volumes: + - grafana:/var/lib/grafana + ports: + - "3000:3000" + environment: + - GF_SECURITY_ADMIN_PASSWORD=secret + + prometheus: + image: prom/prometheus + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + ports: + - "9090:9090" volumes: postgres: redis: + grafana: diff --git a/prometheus.yml b/prometheus.yml new file mode 100644 index 000000000..cd0eab3f2 --- /dev/null +++ b/prometheus.yml @@ -0,0 +1,7 @@ +global: + scrape_interval: 1s + +scrape_configs: + - job_name: 'queryapi-runner' + static_configs: + - targets: ['host.docker.internal:9180'] \ No newline at end of file diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index 38f9ce77c..b9313f631 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -1,7 +1,5 @@ -import { Block } from '@near-lake/primitives'; +import { Block, type StreamerMessage } from '@near-lake/primitives'; import type fetch from 'node-fetch'; -import { type S3Client, GetObjectCommand } from '@aws-sdk/client-s3'; -import type RedisClient from '../redis-client'; import Indexer from './indexer'; import { VM } from 'vm2'; @@ -30,7 +28,7 @@ describe('Indexer unit tests', () => { );`; const SOCIAL_SCHEMA = ` - CREATE TABLE + CREATE TABLE "posts" ( "id" SERIAL NOT NULL, "account_id" VARCHAR NOT NULL, @@ -163,10 +161,6 @@ CREATE TABLE }), }); - const transparentRedis = { - getStreamerMessage: jest.fn() - } as unknown as RedisClient; - beforeEach(() => { process.env = { ...oldEnv, @@ -187,24 +181,17 @@ CREATE TABLE }), })); const blockHeight = 456; - const mockData = jest.fn().mockResolvedValue( - JSON.stringify( - { - block: { - chunks: [], - header: { - height: blockHeight - } - }, - shards: {} + const mockBlock = Block.fromStreamerMessage({ + block: { + chunks: [], + header: { + height: blockHeight } - ) - ); - const mockRedis = { - getStreamerMessage: mockData - } as unknown as RedisClient; + }, + shards: {} + } as unknown as StreamerMessage) as unknown as Block; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: mockRedis }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -214,188 +201,13 @@ CREATE TABLE `, schema: SIMPLE_SCHEMA }; - await indexer.runFunctions(blockHeight, functions, false); + await indexer.runFunctions(mockBlock, functions, false); expect(mockFetch.mock.calls).toMatchSnapshot(); }); - test('Indexer.fetchBlock() should fetch a block from S3', async () => { - const author = 'dokiacapital.poolv1.near'; - const mockData = JSON.stringify({ - author - }); - const mockSend = jest.fn().mockResolvedValue({ - Body: { - transformToString: () => mockData - } - }); - const mockS3 = { - send: mockSend, - } as unknown as S3Client; - - const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); - - const blockHeight = 84333960; - const block = await indexer.fetchBlockPromise(blockHeight); - const params = { - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/block.json` - }; - - expect(mockS3.send).toHaveBeenCalledTimes(1); - expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand(params))); - expect(block.author).toEqual(author); - }); - - test('Indexer.fetchShard() should fetch a shard from S3', async () => { - const mockData = JSON.stringify({}); - const mockSend = jest.fn().mockResolvedValue({ - Body: { - transformToString: () => mockData - } - }); - const mockS3 = { - send: mockSend, - } as unknown as S3Client; - const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); - - const blockHeight = 82699904; - const shard = 0; - const params = { - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/shard_${shard}.json` - }; - await indexer.fetchShardPromise(blockHeight, shard); - - expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand(params))); - }); - - test('Indexer.fetchStreamerMessage() should fetch the message from cache and use it directly', async () => { - const blockHeight = 85233529; - const blockHash = 'xyz'; - const getMessage = jest.fn() - .mockReturnValueOnce(JSON.stringify( - { - block: { - chunks: [0], - header: { - height: blockHeight, - hash: blockHash, - } - }, - shards: {} - } - )); - const mockRedis = { - getStreamerMessage: getMessage - } as unknown as RedisClient; - const indexer = new Indexer('mainnet', { redisClient: mockRedis }); - - const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, false); - - expect(getMessage).toHaveBeenCalledTimes(1); - expect(JSON.stringify(getMessage.mock.calls[0])).toEqual( - `[${blockHeight}]` - ); - const block = Block.fromStreamerMessage(streamerMessage); - - expect(block.blockHeight).toEqual(blockHeight); - expect(block.blockHash).toEqual(blockHash); - }); - - test('Indexer.fetchStreamerMessage() should fetch the block and shards from S3 upon cache miss', async () => { - const blockHeight = 85233529; - const blockHash = 'xyz'; - const mockSend = jest.fn() - .mockReturnValueOnce({ // block - Body: { - transformToString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - hash: blockHash, - } - }) - } - }) - .mockReturnValue({ // shard - Body: { - transformToString: () => JSON.stringify({}) - } - }); - const mockS3 = { - send: mockSend, - } as unknown as S3Client; - const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); - - const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, false); - - expect(mockSend).toHaveBeenCalledTimes(5); - expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/block.json` - }))); - expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` - }))); - expect(transparentRedis.getStreamerMessage).toHaveBeenCalledTimes(1); - - const block = Block.fromStreamerMessage(streamerMessage); - - expect(block.blockHeight).toEqual(blockHeight); - expect(block.blockHash).toEqual(blockHash); - }); - - test('Indexer.fetchStreamerMessage() should fetch the block and shards from S3 and not cache and construct the streamer message if historical', async () => { - const blockHeight = 85233529; - const blockHash = 'xyz'; - const mockSend = jest.fn() - .mockReturnValueOnce({ // block - Body: { - transformToString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - hash: blockHash, - } - }) - } - }) - .mockReturnValue({ // shard - Body: { - transformToString: () => JSON.stringify({}) - } - }); - const mockS3 = { - send: mockSend, - } as unknown as S3Client; - const mockRedis = { - getStreamerMessage: jest.fn() - } as unknown as RedisClient; - const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis }); - - const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, true); - - expect(mockSend).toHaveBeenCalledTimes(5); - expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/block.json` - }))); - expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` - }))); - expect(mockRedis.getStreamerMessage).toHaveBeenCalledTimes(0); - - const block = Block.fromStreamerMessage(streamerMessage); - - expect(block.blockHeight).toEqual(blockHeight); - expect(block.blockHash).toEqual(blockHash); - }); - test('Indexer.transformIndexerFunction() applies the necessary transformations', () => { - const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); + const indexer = new Indexer(); const transformedFunction = indexer.transformIndexerFunction('console.log(\'hello\')'); @@ -427,7 +239,7 @@ CREATE TABLE } }) }); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -479,7 +291,7 @@ CREATE TABLE test('Indexer.buildContext() can fetch from the near social api', async () => { const mockFetch = jest.fn(); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -508,7 +320,7 @@ CREATE TABLE errors: ['boom'] }) }); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, INVALID_HASURA_ROLE); @@ -523,7 +335,7 @@ CREATE TABLE data: 'mock', }), }); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -550,7 +362,7 @@ CREATE TABLE }); test('GetTables works for a variety of input schemas', async () => { - const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); + const indexer = new Indexer(); const simpleSchemaTables = indexer.getTableNames(SIMPLE_SCHEMA); expect(simpleSchemaTables).toStrictEqual(['posts']); @@ -590,7 +402,7 @@ CREATE TABLE }); test('SanitizeTableName works properly on many test cases', async () => { - const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); + const indexer = new Indexer(); expect(indexer.sanitizeTableName('table_name')).toStrictEqual('TableName'); expect(indexer.sanitizeTableName('tablename')).toStrictEqual('Tablename'); // name is not capitalized @@ -605,7 +417,7 @@ CREATE TABLE }); test('indexer fails to build context.db due to collision on sanitized table names', async () => { - const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); + const indexer = new Indexer(); const schemaWithDuplicateSanitizedTableNames = `CREATE TABLE "test table" ( @@ -627,9 +439,8 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { + const indexer = new Indexer({ fetch: genericMockFetch as unknown as typeof fetch, - redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); @@ -667,9 +478,8 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { + const indexer = new Indexer({ fetch: genericMockFetch as unknown as typeof fetch, - redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); @@ -698,9 +508,8 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { + const indexer = new Indexer({ fetch: genericMockFetch as unknown as typeof fetch, - redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); @@ -733,9 +542,8 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { + const indexer = new Indexer({ fetch: genericMockFetch as unknown as typeof fetch, - redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); @@ -770,9 +578,8 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { + const indexer = new Indexer({ fetch: genericMockFetch as unknown as typeof fetch, - redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); @@ -790,9 +597,8 @@ CREATE TABLE create: jest.fn() }; - const indexer = new Indexer('mainnet', { + const indexer = new Indexer({ fetch: genericMockFetch as unknown as typeof fetch, - redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(STRESS_TEST_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); @@ -833,9 +639,8 @@ CREATE TABLE create: jest.fn() }; - const indexer = new Indexer('mainnet', { + const indexer = new Indexer({ fetch: genericMockFetch as unknown as typeof fetch, - redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext('', 'morgs.near/social_feed1', 1, 'postgres'); @@ -897,25 +702,16 @@ CREATE TABLE }), }); - const mockS3 = { - send: jest.fn() - .mockResolvedValueOnce({ // block - Body: { - transformToString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }) - .mockResolvedValue({ // shard - Body: { - transformToString: () => JSON.stringify({}) - }, - }), - } as unknown as S3Client; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis }); + const mockBlock = Block.fromStreamerMessage({ + block: { + chunks: [0], + header: { + height: blockHeight + } + }, + shards: {} + } as unknown as StreamerMessage) as unknown as Block; + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -951,7 +747,7 @@ CREATE TABLE schema: SIMPLE_SCHEMA }; - await indexer.runFunctions(blockHeight, functions, false); + await indexer.runFunctions(mockBlock, functions, false); expect(mockFetch.mock.calls).toMatchSnapshot(); }); @@ -985,19 +781,16 @@ CREATE TABLE }), })); const blockHeight = 456; - const mockS3 = { - send: jest.fn().mockResolvedValue({ - Body: { - transformToString: () => JSON.stringify({ - chunks: [], - header: { - height: blockHeight - } - }) + const mockBlock = Block.fromStreamerMessage({ + block: { + chunks: [0], + header: { + height: blockHeight } - }), - } as unknown as S3Client; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis }); + }, + shards: {} + } as unknown as StreamerMessage) as unknown as Block; + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -1007,7 +800,7 @@ CREATE TABLE schema: SIMPLE_SCHEMA }; - await expect(indexer.runFunctions(blockHeight, functions, false)).rejects.toThrow(new Error('boom')); + await expect(indexer.runFunctions(mockBlock, functions, false)).rejects.toThrow(new Error('boom')); expect(mockFetch.mock.calls).toMatchSnapshot(); }); @@ -1019,30 +812,20 @@ CREATE TABLE errors: null, }), })); - const mockS3 = { - send: jest - .fn() - .mockResolvedValueOnce({ // block - Body: { - transformToString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }) - .mockResolvedValue({ // shard - Body: { - transformToString: () => JSON.stringify({}) - }, - }), - } as unknown as S3Client; + const mockBlock = Block.fromStreamerMessage({ + block: { + chunks: [0], + header: { + height: blockHeight + } + }, + shards: {} + } as unknown as StreamerMessage) as unknown as Block; const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn(), }; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); const functions = { 'morgs.near/test': { @@ -1052,7 +835,7 @@ CREATE TABLE schema: SIMPLE_SCHEMA, } }; - await indexer.runFunctions(1, functions, false, { provision: true }); + await indexer.runFunctions(mockBlock, functions, false, { provision: true }); expect(provisioner.isUserApiProvisioned).toHaveBeenCalledWith('morgs.near', 'test'); expect(provisioner.provisionUserApi).toHaveBeenCalledTimes(1); @@ -1071,30 +854,20 @@ CREATE TABLE errors: null, }), })); - const mockS3 = { - send: jest - .fn() - .mockResolvedValueOnce({ // block - Body: { - transformToString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }) - .mockResolvedValue({ // shard - Body: { - transformToString: () => JSON.stringify({}) - }, - }), - } as unknown as S3Client; + const mockBlock = Block.fromStreamerMessage({ + block: { + chunks: [0], + header: { + height: blockHeight + } + }, + shards: {} + } as unknown as StreamerMessage) as unknown as Block; const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(true), provisionUserApi: jest.fn(), }; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); const functions: Record = { 'morgs.near/test': { @@ -1102,7 +875,7 @@ CREATE TABLE schema: SIMPLE_SCHEMA, } }; - await indexer.runFunctions(1, functions, false, { provision: true }); + await indexer.runFunctions(mockBlock, functions, false, { provision: true }); expect(provisioner.provisionUserApi).not.toHaveBeenCalled(); }); @@ -1115,30 +888,20 @@ CREATE TABLE errors: null, }), })); - const mockS3 = { - send: jest - .fn() - .mockResolvedValueOnce({ // block - Body: { - transformToString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }) - .mockResolvedValue({ // shard - Body: { - transformToString: () => JSON.stringify({}) - }, - }), - } as unknown as S3Client; + const mockBlock = Block.fromStreamerMessage({ + block: { + chunks: [0], + header: { + height: blockHeight + } + }, + shards: {} + } as unknown as StreamerMessage) as unknown as Block; const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(true), provisionUserApi: jest.fn(), }; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); const functions: Record = { 'morgs.near/test': { @@ -1148,7 +911,7 @@ CREATE TABLE schema: SIMPLE_SCHEMA, } }; - await indexer.runFunctions(blockHeight, functions, false, { provision: true }); + await indexer.runFunctions(mockBlock, functions, false, { provision: true }); expect(provisioner.provisionUserApi).not.toHaveBeenCalled(); expect(mockFetch.mock.calls).toMatchSnapshot(); @@ -1162,31 +925,21 @@ CREATE TABLE errors: null, }), })); - const mockS3 = { - send: jest - .fn() - .mockResolvedValueOnce({ // block - Body: { - transformToString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }) - .mockResolvedValue({ // shard - Body: { - transformToString: () => JSON.stringify({}) - }, - }), - } as unknown as S3Client; + const mockBlock = Block.fromStreamerMessage({ + block: { + chunks: [0], + header: { + height: blockHeight + } + }, + shards: {} + } as unknown as StreamerMessage) as unknown as Block; const error = new Error('something went wrong with provisioning'); const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn().mockRejectedValue(error), }; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); const functions: Record = { 'morgs.near/test': { @@ -1197,7 +950,7 @@ CREATE TABLE } }; - await expect(indexer.runFunctions(blockHeight, functions, false, { provision: true })).rejects.toThrow(error); + await expect(indexer.runFunctions(mockBlock, functions, false, { provision: true })).rejects.toThrow(error); expect(mockFetch.mock.calls).toMatchSnapshot(); }); @@ -1209,7 +962,7 @@ CREATE TABLE data: {} }) }); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); // @ts-expect-error legacy test const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, null); @@ -1245,7 +998,7 @@ CREATE TABLE }) }); const role = 'morgs_near'; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); const mutation = ` diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index e866652b2..fd69a3898 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -1,21 +1,16 @@ import fetch, { type Response } from 'node-fetch'; import { VM } from 'vm2'; -import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3'; -import { Block } from '@near-lake/primitives'; +import { type Block } from '@near-lake/primitives'; import { Parser } from 'node-sql-parser'; -import { METRICS } from '../metrics'; import Provisioner from '../provisioner'; import DmlHandler from '../dml-handler/dml-handler'; -import RedisClient from '../redis-client'; interface Dependencies { fetch: typeof fetch - s3: S3Client provisioner: Provisioner DmlHandler: typeof DmlHandler parser: Parser - redisClient: RedisClient }; interface Context { @@ -40,31 +35,27 @@ export default class Indexer { private readonly deps: Dependencies; constructor ( - private readonly network: string, deps?: Partial ) { this.DEFAULT_HASURA_ROLE = 'append'; - this.network = network; this.deps = { fetch, - s3: new S3Client(), provisioner: new Provisioner(), DmlHandler, parser: new Parser(), - redisClient: deps?.redisClient ?? new RedisClient(), ...deps, }; } async runFunctions ( - blockHeight: number, + block: Block, functions: Record, isHistorical: boolean, options: { provision?: boolean } = { provision: false } ): Promise { - const blockWithHelpers = Block.fromStreamerMessage(await this.fetchStreamerMessage(blockHeight, isHistorical)); + const blockHeight = block.blockHeight; - const lag = Date.now() - Math.floor(Number(blockWithHelpers.header().timestampNanosec) / 1000000); + const lag = Date.now() - Math.floor(Number(block.header().timestampNanosec) / 1000000); const simultaneousPromises: Array> = []; const allMutations: string[] = []; @@ -98,11 +89,10 @@ export default class Indexer { } await this.setStatus(functionName, blockHeight, 'RUNNING'); - const vm = new VM({ timeout: 3000, allowAsync: true }); const context = this.buildContext(indexerFunction.schema, functionName, blockHeight, hasuraRoleName); - vm.freeze(blockWithHelpers, 'block'); + vm.freeze(block, 'block'); vm.freeze(context, 'context'); vm.freeze(context, 'console'); // provide console.log via context.log @@ -118,7 +108,6 @@ export default class Indexer { await this.writeLog(functionName, blockHeight, 'Error running IndexerFunction', error.message); throw e; } - simultaneousPromises.push(this.writeFunctionState(functionName, blockHeight, isHistorical)); } catch (e) { console.error(`${functionName}: Failed to run function`, e); @@ -131,62 +120,6 @@ export default class Indexer { return allMutations; } - // pad with 0s to 12 digits - normalizeBlockHeight (blockHeight: number): string { - return blockHeight.toString().padStart(12, '0'); - } - - async fetchStreamerMessage (blockHeight: number, isHistorical: boolean): Promise<{ block: any, shards: any[] }> { - if (!isHistorical) { - const cachedMessage = await this.deps.redisClient.getStreamerMessage(blockHeight); - if (cachedMessage) { - METRICS.CACHE_HIT.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc(); - const parsedMessage = JSON.parse(cachedMessage); - return parsedMessage; - } else { - METRICS.CACHE_MISS.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc(); - } - } - const blockPromise = this.fetchBlockPromise(blockHeight); - const shardsPromises = await this.fetchShardsPromises(blockHeight, 4); - - const results = await Promise.all([blockPromise, ...shardsPromises]); - const block = results.shift(); - const shards = results; - return { - block, - shards, - }; - } - - async fetchShardsPromises (blockHeight: number, numberOfShards: number): Promise>> { - return ([...Array(numberOfShards).keys()].map(async (shardId) => - await this.fetchShardPromise(blockHeight, shardId) - )); - } - - async fetchShardPromise (blockHeight: number, shardId: number): Promise { - const params = { - Bucket: `near-lake-data-${this.network}`, - Key: `${this.normalizeBlockHeight(blockHeight)}/shard_${shardId}.json`, - }; - const response = await this.deps.s3.send(new GetObjectCommand(params)); - const shardData = await response.Body?.transformToString() ?? '{}'; - return JSON.parse(shardData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); - } - - async fetchBlockPromise (blockHeight: number): Promise { - const file = 'block.json'; - const folder = this.normalizeBlockHeight(blockHeight); - const params = { - Bucket: 'near-lake-data-' + this.network, - Key: `${folder}/${file}`, - }; - const response = await this.deps.s3.send(new GetObjectCommand(params)); - const blockData = await response.Body?.transformToString() ?? '{}'; - return JSON.parse(blockData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); - } - enableAwaitTransform (indexerFunction: string): string { return ` async function f(){ @@ -490,25 +423,4 @@ export default class Indexer { return data; } - - renameUnderscoreFieldsToCamelCase (value: Record): Record { - if (value !== null && typeof value === 'object' && !Array.isArray(value)) { - // It's a non-null, non-array object, create a replacement with the keys initially-capped - const newValue: any = {}; - for (const key in value) { - const newKey: string = key - .split('_') - .map((word, i) => { - if (i > 0) { - return word.charAt(0).toUpperCase() + word.slice(1); - } - return word; - }) - .join(''); - newValue[newKey] = value[key]; - } - return newValue; - } - return value; - } } diff --git a/runner/src/lake-client/index.ts b/runner/src/lake-client/index.ts new file mode 100644 index 000000000..41779a063 --- /dev/null +++ b/runner/src/lake-client/index.ts @@ -0,0 +1 @@ +export { default } from './lake-client'; diff --git a/runner/src/lake-client/lake-client.test.ts b/runner/src/lake-client/lake-client.test.ts new file mode 100644 index 000000000..f04a18a28 --- /dev/null +++ b/runner/src/lake-client/lake-client.test.ts @@ -0,0 +1,170 @@ +import { GetObjectCommand, type S3Client } from '@aws-sdk/client-s3'; +import LakeClient from './lake-client'; +import type RedisClient from '../redis-client'; + +describe('LakeClient', () => { + const transparentRedis = { + getStreamerMessage: jest.fn() + } as unknown as RedisClient; + + test('Indexer.fetchBlock() should fetch the block and shards from S3 upon cache miss', async () => { + const blockHeight = 85233529; + const blockHash = 'xyz'; + const mockSend = jest.fn() + .mockReturnValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }) + } + }) + .mockReturnValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + } + }); + const mockS3 = { + send: mockSend, + } as unknown as S3Client; + const client = new LakeClient('mainnet', mockS3, transparentRedis); + + const block = await client.fetchBlock(blockHeight, true); + + expect(mockSend).toHaveBeenCalledTimes(5); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/block.json` + }))); + expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` + }))); + + expect(block.blockHeight).toEqual(blockHeight); + expect(block.blockHash).toEqual(blockHash); + }); + + test('fetchBlock should fetch the streamer message from cache, convert it to block, and return it', async () => { + const blockHeight = 85233529; + const blockHash = 'xyz'; + const getMessage = jest.fn() + .mockReturnValueOnce(JSON.stringify( + { + block: { + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }, + shards: {} + } + )); + const mockRedis = { + getStreamerMessage: getMessage + } as unknown as RedisClient; + const mockS3 = {} as unknown as S3Client; + const client = new LakeClient('mainnet', mockS3, mockRedis); + + const block = await client.fetchBlock(blockHeight, false); + + expect(getMessage).toHaveBeenCalledTimes(1); + expect(JSON.stringify(getMessage.mock.calls[0])).toEqual( + `[${blockHeight}]` + ); + + expect(block.blockHeight).toEqual(blockHeight); + expect(block.blockHash).toEqual(blockHash); + }); + + test('fetchBlock should fetch the block and shards from S3 upon cache miss', async () => { + const blockHeight = 85233529; + const blockHash = 'xyz'; + const mockSend = jest.fn() + .mockReturnValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }) + } + }) + .mockReturnValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + } + }); + const mockS3 = { + send: mockSend, + } as unknown as S3Client; + const client = new LakeClient('mainnet', mockS3, transparentRedis); + + const block = await client.fetchBlock(blockHeight, false); + + expect(mockSend).toHaveBeenCalledTimes(5); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/block.json` + }))); + expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` + }))); + expect(transparentRedis.getStreamerMessage).toHaveBeenCalledTimes(1); + + expect(block.blockHeight).toEqual(blockHeight); + expect(block.blockHash).toEqual(blockHash); + }); + + test('fetchBlock should not hit cache and instead fetch the block and shards from S3 if historical', async () => { + const blockHeight = 85233529; + const blockHash = 'xyz'; + const mockSend = jest.fn() + .mockReturnValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }) + } + }) + .mockReturnValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + } + }); + const mockS3 = { + send: mockSend, + } as unknown as S3Client; + const mockRedis = { + getStreamerMessage: jest.fn() + } as unknown as RedisClient; + const client = new LakeClient('mainnet', mockS3, mockRedis); + + const block = await client.fetchBlock(blockHeight, true); + + expect(mockSend).toHaveBeenCalledTimes(5); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/block.json` + }))); + expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` + }))); + expect(mockRedis.getStreamerMessage).toHaveBeenCalledTimes(0); + + expect(block.blockHeight).toEqual(blockHeight); + expect(block.blockHash).toEqual(blockHash); + }); +}); diff --git a/runner/src/lake-client/lake-client.ts b/runner/src/lake-client/lake-client.ts new file mode 100644 index 000000000..d06d5cef9 --- /dev/null +++ b/runner/src/lake-client/lake-client.ts @@ -0,0 +1,90 @@ +import { GetObjectCommand, S3Client } from '@aws-sdk/client-s3'; +import { Block } from '@near-lake/primitives'; +import { METRICS } from '../metrics'; +import RedisClient from '../redis-client'; + +export default class LakeClient { + constructor ( + private readonly network: string = 'mainnet', + private readonly s3Client: S3Client = new S3Client(), + private readonly redisClient: RedisClient = new RedisClient() + ) {} + + // pad with 0s to 12 digits + private normalizeBlockHeight (blockHeight: number): string { + return blockHeight.toString().padStart(12, '0'); + } + + private async fetchShardsPromises (blockHeight: number, numberOfShards: number): Promise>> { + return ([...Array(numberOfShards).keys()].map(async (shardId) => + await this.fetchShardPromise(blockHeight, shardId) + )); + } + + private async fetchShardPromise (blockHeight: number, shardId: number): Promise { + const params = { + Bucket: `near-lake-data-${this.network}`, + Key: `${this.normalizeBlockHeight(blockHeight)}/shard_${shardId}.json`, + }; + const response = await this.s3Client.send(new GetObjectCommand(params)); + const shardData = await response.Body?.transformToString() ?? '{}'; + return JSON.parse(shardData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); + } + + private async fetchBlockPromise (blockHeight: number): Promise { + const file = 'block.json'; + const folder = this.normalizeBlockHeight(blockHeight); + const params = { + Bucket: 'near-lake-data-' + this.network, + Key: `${folder}/${file}`, + }; + const response = await this.s3Client.send(new GetObjectCommand(params)); + const blockData = await response.Body?.transformToString() ?? '{}'; + return JSON.parse(blockData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); + } + + private renameUnderscoreFieldsToCamelCase (value: Record): Record { + if (value !== null && typeof value === 'object' && !Array.isArray(value)) { + // It's a non-null, non-array object, create a replacement with the keys initially-capped + const newValue: any = {}; + for (const key in value) { + const newKey: string = key + .split('_') + .map((word, i) => { + if (i > 0) { + return word.charAt(0).toUpperCase() + word.slice(1); + } + return word; + }) + .join(''); + newValue[newKey] = value[key]; + } + return newValue; + } + return value; + } + + async fetchBlock (blockHeight: number, isHistorical: boolean): Promise { + if (!isHistorical) { + const cachedMessage = await this.redisClient.getStreamerMessage(blockHeight); + if (cachedMessage) { + METRICS.CACHE_HIT.inc(); + const parsedMessage = JSON.parse(cachedMessage); + return Block.fromStreamerMessage(parsedMessage); + } else { + METRICS.CACHE_MISS.inc(); + } + } + + const blockPromise = this.fetchBlockPromise(blockHeight); + const shardsPromises = await this.fetchShardsPromises(blockHeight, 4); + + const results = await Promise.all([blockPromise, ...shardsPromises]); + const block = results.shift(); + const shards = results; + return Block.fromStreamerMessage({ + block, + shards, + }); + } +} diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index c3ab74b81..e76d4d86d 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -1,35 +1,47 @@ import express from 'express'; import { Gauge, Histogram, Counter, AggregatorRegistry } from 'prom-client'; +const BLOCK_WAIT_DURATION = new Gauge({ + name: 'queryapi_runner_block_wait_duration_milliseconds', + help: 'Time an indexer function waited for a block before processing', + labelNames: ['indexer', 'type'], +}); + +const CACHE_HIT = new Counter({ + name: 'queryapi_runner_cache_hit', + help: 'The number of times cache was hit successfully' +}); + +const CACHE_MISS = new Counter({ + name: 'queryapi_runner_cache_miss', + help: 'The number of times cache was missed' +}); + const UNPROCESSED_STREAM_MESSAGES = new Gauge({ name: 'queryapi_runner_unprocessed_stream_messages', help: 'Number of Redis Stream messages not yet processed', labelNames: ['indexer', 'type'], }); +const LAST_PROCESSED_BLOCK_HEIGHT = new Gauge({ + name: 'queryapi_runner_last_processed_block_height', + help: 'Previous block height processed by an indexer', + labelNames: ['indexer', 'type'], +}); + const EXECUTION_DURATION = new Histogram({ name: 'queryapi_runner_execution_duration_milliseconds', help: 'Time taken to execute an indexer function', labelNames: ['indexer', 'type'], }); -const CACHE_HIT = new Counter({ - name: 'queryapi_runner_cache_hit', - help: 'The number of times cache was hit successfully', - labelNames: ['type', 'key'] -}); - -const CACHE_MISS = new Counter({ - name: 'queryapi_runner_cache_miss', - help: 'The number of times cache was missed', - labelNames: ['type', 'key'] -}); - export const METRICS = { - EXECUTION_DURATION, - UNPROCESSED_STREAM_MESSAGES, + BLOCK_WAIT_DURATION, CACHE_HIT, - CACHE_MISS + CACHE_MISS, + UNPROCESSED_STREAM_MESSAGES, + LAST_PROCESSED_BLOCK_HEIGHT, + EXECUTION_DURATION, }; const aggregatorRegistry = new AggregatorRegistry(); diff --git a/runner/src/redis-client/index.ts b/runner/src/redis-client/index.ts index efa0f96e7..938571c25 100644 --- a/runner/src/redis-client/index.ts +++ b/runner/src/redis-client/index.ts @@ -1 +1 @@ -export { default } from './redis-client'; +export { default, type StreamType } from './redis-client'; diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/redis-client/redis-client.test.ts index 26030f249..1abfd262f 100644 --- a/runner/src/redis-client/redis-client.test.ts +++ b/runner/src/redis-client/redis-client.test.ts @@ -10,7 +10,7 @@ describe('RedisClient', () => { const client = new RedisClient(mockClient); - const message = await client.getNextStreamMessage('streamKey'); + const message = await client.getStreamMessages('streamKey'); expect(mockClient.xRead).toHaveBeenCalledWith( { key: 'streamKey', id: '0' }, @@ -19,6 +19,24 @@ describe('RedisClient', () => { expect(message).toBeUndefined(); }); + it('returns count of messages after id with block', async () => { + const mockClient = { + on: jest.fn(), + connect: jest.fn().mockResolvedValue(null), + xRead: jest.fn().mockResolvedValue(null), + } as any; + + const client = new RedisClient(mockClient); + + const message = await client.getStreamMessages('streamKey', '123-0', 10); + + expect(mockClient.xRead).toHaveBeenCalledWith( + { key: 'streamKey', id: '123-0' }, + { COUNT: 10 } + ); + expect(message).toBeUndefined(); + }); + it('deletes the stream message', async () => { const mockClient = { on: jest.fn(), diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index 18e11b854..3edbde25a 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -14,7 +14,7 @@ interface StreamStorage { schema: string } -type StreamType = 'historical' | 'real-time'; +export type StreamType = 'historical' | 'real-time'; export default class RedisClient { SMALLEST_STREAM_ID = '0'; @@ -44,12 +44,14 @@ export default class RedisClient { await this.client.disconnect(); } - async getNextStreamMessage ( + async getStreamMessages ( streamKey: string, + streamId = this.SMALLEST_STREAM_ID, + count = 1 ): Promise { const results = await this.client.xRead( - { key: streamKey, id: this.SMALLEST_STREAM_ID }, - { COUNT: 1 } + { key: streamKey, id: streamId }, + { COUNT: count } ); return results?.[0].messages as StreamMessage[]; @@ -64,8 +66,9 @@ export default class RedisClient { async getUnprocessedStreamMessages ( streamKey: string, + startId = this.SMALLEST_STREAM_ID, ): Promise { - const results = await this.client.xRange(streamKey, this.SMALLEST_STREAM_ID, this.LARGEST_STREAM_ID); + const results = await this.client.xRange(streamKey, startId, this.LARGEST_STREAM_ID); return results as StreamMessage[]; }; diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 1cc1c531d..ae0d29c56 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -2,69 +2,144 @@ import { isMainThread, parentPort, workerData } from 'worker_threads'; import promClient from 'prom-client'; import Indexer from '../indexer'; -import RedisClient from '../redis-client'; +import RedisClient, { type StreamType } from '../redis-client'; import { METRICS } from '../metrics'; +import type { Block } from '@near-lake/primitives'; +import LakeClient from '../lake-client'; if (isMainThread) { throw new Error('Worker should not be run on main thread'); } - -const indexer = new Indexer('mainnet'); -const redisClient = new RedisClient(); +interface QueueMessage { + block: Block + streamMessageId: string +} +type PrefetchQueue = Array>; + +interface WorkerContext { + redisClient: RedisClient + lakeClient: LakeClient + queue: PrefetchQueue + streamKey: string + streamType: StreamType +} const sleep = async (ms: number): Promise => { await new Promise((resolve) => setTimeout(resolve, ms)); }; void (async function main () { const { streamKey } = workerData; + const redisClient = new RedisClient(); + const workerContext: WorkerContext = { + redisClient, + lakeClient: new LakeClient(), + queue: [], + streamKey, + streamType: redisClient.getStreamType(streamKey), + }; console.log('Started processing stream: ', streamKey); - let indexerName = ''; - const streamType = redisClient.getStreamType(streamKey); - const isHistorical = streamType === 'historical'; + await handleStream(workerContext, streamKey); +})(); + +async function handleStream (workerContext: WorkerContext, streamKey: string): Promise { + void blockQueueProducer(workerContext, streamKey); + void blockQueueConsumer(workerContext, streamKey); +} + +function incrementId (id: string): string { + const [main, sequence] = id.split('-'); + return `${Number(main) + 1}-${sequence}`; +} + +async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise { + const HISTORICAL_BATCH_SIZE = 100; + let streamMessageStartId = '0'; while (true) { - try { - const startTime = performance.now(); + const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length; + if (preFetchCount <= 0) { + await sleep(100); + continue; + } + const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount); + if (messages == null) { + await sleep(100); + continue; + } + console.log(`Fetched ${messages?.length} messages from stream ${streamKey}`); + + for (const streamMessage of messages) { + const { id, message } = streamMessage; + workerContext.queue.push(generateQueueMessage(workerContext, Number(message.block_height), id)); + } - const messages = await redisClient.getNextStreamMessage(streamKey); - const indexerConfig = await redisClient.getStreamStorage(streamKey); + streamMessageStartId = incrementId(messages[messages.length - 1].id); + } +} - indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`; +async function blockQueueConsumer (workerContext: WorkerContext, streamKey: string): Promise { + const indexer = new Indexer(); + const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey); + const indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`; + const functions = { + [indexerName]: { + account_id: indexerConfig.account_id, + function_name: indexerConfig.function_name, + code: indexerConfig.code, + schema: indexerConfig.schema, + provisioned: false, + }, + }; - if (messages == null) { - await sleep(1000); + while (true) { + let streamMessageId = ''; + try { + while (workerContext.queue.length === 0) { + await sleep(100); + } + const queueMessage = await workerContext.queue.at(0); + if (queueMessage === undefined) { continue; } + const startTime = performance.now(); + const blockStartTime = startTime; + const block = queueMessage.block; + streamMessageId = queueMessage.streamMessageId; - const [{ id, message }] = messages; + if (block === undefined || block.blockHeight == null) { + console.error('Block failed to process or does not have block height', block); + continue; + } + METRICS.BLOCK_WAIT_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).set(performance.now() - blockStartTime); + await indexer.runFunctions(block, functions, false, { provision: true }); - const functions = { - [indexerName]: { - account_id: indexerConfig.account_id, - function_name: indexerConfig.function_name, - code: indexerConfig.code, - schema: indexerConfig.schema, - provisioned: false, - }, - }; - await indexer.runFunctions(Number(message.block_height), functions, isHistorical, { - provision: true, - }); + await workerContext.redisClient.deleteStreamMessage(streamKey, streamMessageId); + await workerContext.queue.shift(); - await redisClient.deleteStreamMessage(streamKey, id); + METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - startTime); - METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).observe(performance.now() - startTime); + if (workerContext.streamType === 'historical') { + METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight); + } console.log(`Success: ${indexerName}`); } catch (err) { await sleep(10000); console.log(`Failed: ${indexerName}`, err); } finally { - const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey); - METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0); + const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey, streamMessageId); + METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages?.length ?? 0); parentPort?.postMessage(await promClient.register.getMetricsAsJSON()); } } -})(); +} + +async function generateQueueMessage (workerContext: WorkerContext, blockHeight: number, streamMessageId: string): Promise { + const block = await workerContext.lakeClient.fetchBlock(blockHeight, workerContext.streamType === 'historical'); + return { + block, + streamMessageId + }; +}