diff --git a/block-server/handler.js b/block-server/handler.js index 7d4d2b2e7..70d6703e6 100644 --- a/block-server/handler.js +++ b/block-server/handler.js @@ -1,6 +1,6 @@ 'use strict'; -const AWS = require('aws-sdk'); -const S3= new AWS.S3(); +import { S3 } from '@aws-sdk/client-s3'; +const S3 = new S3(); const NETWORK = process.env.NETWORK || 'mainnet'; diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index ca7125d7c..82b60b8c3 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -151,6 +151,7 @@ pub(crate) async fn process_historical_messages( redis_connection_manager, storage::generate_historical_storage_key(&indexer_function.get_full_name()), serde_json::to_string(&indexer_function)?, + None, ) .await?; } diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index 28c2dc3f3..a67f5653a 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -6,11 +6,12 @@ use tokio::sync::{Mutex, MutexGuard}; use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatch}; use near_lake_framework::near_indexer_primitives::types::{AccountId, BlockHeight}; use near_lake_framework::near_indexer_primitives::{types, StreamerMessage}; +use utils::serialize_to_camel_case_json_string; use crate::indexer_types::IndexerFunction; use indexer_types::{IndexerQueueMessage, IndexerRegistry}; use opts::{Opts, Parser}; -use storage::{self, ConnectionManager}; +use storage::{self, generate_real_time_streamer_message_key, ConnectionManager}; mod historical_block_processing; mod indexer_reducer; @@ -146,6 +147,15 @@ async fn handle_streamer_message( let block_height: BlockHeight = context.streamer_message.block.header.height; + // Cache streamer message block and shards for use in real time processing + storage::set( + context.redis_connection_manager, + generate_real_time_streamer_message_key(block_height), + &serialize_to_camel_case_json_string(&context.streamer_message)?, + Some(60), + ) + .await?; + let spawned_indexers = indexer_registry::index_registry_changes( block_height, &mut indexer_registry_locked, @@ -206,6 +216,7 @@ async fn handle_streamer_message( context.redis_connection_manager, storage::generate_real_time_storage_key(&indexer_function.get_full_name()), serde_json::to_string(indexer_function)?, + None, ) .await?; storage::xadd( diff --git a/indexer/queryapi_coordinator/src/utils.rs b/indexer/queryapi_coordinator/src/utils.rs index 2cf5207ed..9f8c2585a 100644 --- a/indexer/queryapi_coordinator/src/utils.rs +++ b/indexer/queryapi_coordinator/src/utils.rs @@ -1,3 +1,5 @@ +use serde_json::Value; + pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) { let interval_secs = 10; let mut previous_processed_blocks: u64 = @@ -49,3 +51,52 @@ pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; } } + +pub(crate) fn serialize_to_camel_case_json_string( + streamer_message: &near_lake_framework::near_indexer_primitives::StreamerMessage, +) -> anyhow::Result { + // Serialize the Message object to a JSON string + let json_str = serde_json::to_string(&streamer_message)?; + + // Deserialize the JSON string to a Value Object + let mut message_value: Value = serde_json::from_str(&json_str)?; + + // Convert keys to Camel Case + to_camel_case_keys(&mut message_value); + + return serde_json::to_string(&message_value); +} + +fn to_camel_case_keys(message_value: &mut Value) { + // Only process if subfield contains objects + match message_value { + Value::Object(map) => { + for key in map.keys().cloned().collect::>() { + // Generate Camel Case Key + let new_key = key + .split("_") + .enumerate() + .map(|(i, str)| { + if i > 0 { + return str[..1].to_uppercase() + &str[1..]; + } + return str.to_owned(); + }) + .collect::>() + .join(""); + + // Recursively process inner fields and update map with new key + if let Some(mut val) = map.remove(&key) { + to_camel_case_keys(&mut val); + map.insert(new_key, val); + } + } + } + Value::Array(vec) => { + for val in vec { + to_camel_case_keys(val); + } + } + _ => {} + } +} diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 6c449b6b2..a2d20b850 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -2,6 +2,7 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; const STORAGE: &str = "storage_alertexer"; +pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; pub const STREAMS_SET_KEY: &str = "streams"; pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { @@ -12,6 +13,10 @@ pub fn generate_real_time_stream_key(prefix: &str) -> String { format!("{}:real_time:stream", prefix) } +pub fn generate_real_time_streamer_message_key(block_height: u64) -> String { + format!("streamer:message:{}", block_height) +} + pub fn generate_real_time_storage_key(prefix: &str) -> String { format!("{}:real_time:stream:storage", prefix) } @@ -47,13 +52,19 @@ pub async fn set( redis_connection_manager: &ConnectionManager, key: impl ToRedisArgs + std::fmt::Debug, value: impl ToRedisArgs + std::fmt::Debug, + expiration_seconds: Option, ) -> anyhow::Result<()> { - redis::cmd("SET") - .arg(&key) - .arg(&value) - .query_async(&mut redis_connection_manager.clone()) + let mut cmd = redis::cmd("SET"); + cmd.arg(&key).arg(&value); + + // Add expiration arguments if present + if let Some(expiration_seconds) = expiration_seconds { + cmd.arg("EX").arg(expiration_seconds); + } + + cmd.query_async(&mut redis_connection_manager.clone()) .await?; - tracing::debug!(target: STORAGE, "SET: {:?}: {:?}", key, value,); + tracing::debug!(target: STORAGE, "SET: {:?}: {:?} Ex: {:?}", key, value, expiration_seconds); Ok(()) } @@ -113,7 +124,7 @@ pub async fn push_receipt_to_watching_list( receipt_id: &str, cache_value: &[u8], ) -> anyhow::Result<()> { - set(redis_connection_manager, receipt_id, cache_value).await?; + set(redis_connection_manager, receipt_id, cache_value, None).await?; // redis::cmd("INCR") // .arg(format!("receipts_{}", transaction_hash)) // .query_async(&mut redis_connection_manager.clone()) @@ -161,7 +172,13 @@ pub async fn update_last_indexed_block( redis_connection_manager: &ConnectionManager, block_height: u64, ) -> anyhow::Result<()> { - set(redis_connection_manager, "last_indexed_block", block_height).await?; + set( + redis_connection_manager, + "last_indexed_block", + block_height, + None, + ) + .await?; redis::cmd("INCR") .arg("blocks_processed") .query_async(&mut redis_connection_manager.clone()) diff --git a/runner/package.json b/runner/package.json index 6d14e3e5c..213cc4e53 100644 --- a/runner/package.json +++ b/runner/package.json @@ -42,8 +42,8 @@ "typescript": "^5.1.6" }, "dependencies": { + "@aws-sdk/client-s3": "^3.414.0", "@near-lake/primitives": "^0.1.0", - "aws-sdk": "^2.1402.0", "express": "^4.18.2", "node-fetch": "^2.6.11", "node-sql-parser": "^4.10.0", diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index 37a828639..38f9ce77c 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -1,6 +1,7 @@ import { Block } from '@near-lake/primitives'; import type fetch from 'node-fetch'; -import type AWS from 'aws-sdk'; +import { type S3Client, GetObjectCommand } from '@aws-sdk/client-s3'; +import type RedisClient from '../redis-client'; import Indexer from './indexer'; import { VM } from 'vm2'; @@ -162,6 +163,10 @@ CREATE TABLE }), }); + const transparentRedis = { + getStreamerMessage: jest.fn() + } as unknown as RedisClient; + beforeEach(() => { process.env = { ...oldEnv, @@ -182,21 +187,24 @@ CREATE TABLE }), })); const blockHeight = 456; - const mockS3 = { - getObject: jest.fn(() => ({ - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [], - header: { - height: blockHeight - } - }) - } - }) - })), - } as unknown as AWS.S3; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3 }); + const mockData = jest.fn().mockResolvedValue( + JSON.stringify( + { + block: { + chunks: [], + header: { + height: blockHeight + } + }, + shards: {} + } + ) + ); + const mockRedis = { + getStreamerMessage: mockData + } as unknown as RedisClient; + + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: mockRedis }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -211,95 +219,174 @@ CREATE TABLE expect(mockFetch.mock.calls).toMatchSnapshot(); }); - test('Indexer.fetchBlock() should fetch a block from the S3', async () => { + test('Indexer.fetchBlock() should fetch a block from S3', async () => { const author = 'dokiacapital.poolv1.near'; + const mockData = JSON.stringify({ + author + }); + const mockSend = jest.fn().mockResolvedValue({ + Body: { + transformToString: () => mockData + } + }); const mockS3 = { - getObject: jest.fn(() => ({ - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - author - }) - } - }) - })), - } as unknown as AWS.S3; - const indexer = new Indexer('mainnet', { s3: mockS3 }); + send: mockSend, + } as unknown as S3Client; + + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); const blockHeight = 84333960; const block = await indexer.fetchBlockPromise(blockHeight); - - expect(mockS3.getObject).toHaveBeenCalledTimes(1); - expect(mockS3.getObject).toHaveBeenCalledWith({ + const params = { Bucket: 'near-lake-data-mainnet', Key: `${blockHeight.toString().padStart(12, '0')}/block.json` - }); + }; + + expect(mockS3.send).toHaveBeenCalledTimes(1); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand(params))); expect(block.author).toEqual(author); }); - test('Indexer.fetchShard() should fetch the steamer message from S3', async () => { + test('Indexer.fetchShard() should fetch a shard from S3', async () => { + const mockData = JSON.stringify({}); + const mockSend = jest.fn().mockResolvedValue({ + Body: { + transformToString: () => mockData + } + }); const mockS3 = { - getObject: jest.fn(() => ({ - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - } - }) - })), - } as unknown as AWS.S3; - const indexer = new Indexer('mainnet', { s3: mockS3 }); + send: mockSend, + } as unknown as S3Client; + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); const blockHeight = 82699904; const shard = 0; + const params = { + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/shard_${shard}.json` + }; await indexer.fetchShardPromise(blockHeight, shard); - expect(mockS3.getObject).toHaveBeenCalledTimes(1); - expect(mockS3.getObject).toHaveBeenCalledWith({ + expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand(params))); + }); + + test('Indexer.fetchStreamerMessage() should fetch the message from cache and use it directly', async () => { + const blockHeight = 85233529; + const blockHash = 'xyz'; + const getMessage = jest.fn() + .mockReturnValueOnce(JSON.stringify( + { + block: { + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }, + shards: {} + } + )); + const mockRedis = { + getStreamerMessage: getMessage + } as unknown as RedisClient; + const indexer = new Indexer('mainnet', { redisClient: mockRedis }); + + const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, false); + + expect(getMessage).toHaveBeenCalledTimes(1); + expect(JSON.stringify(getMessage.mock.calls[0])).toEqual( + `[${blockHeight}]` + ); + const block = Block.fromStreamerMessage(streamerMessage); + + expect(block.blockHeight).toEqual(blockHeight); + expect(block.blockHash).toEqual(blockHash); + }); + + test('Indexer.fetchStreamerMessage() should fetch the block and shards from S3 upon cache miss', async () => { + const blockHeight = 85233529; + const blockHash = 'xyz'; + const mockSend = jest.fn() + .mockReturnValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }) + } + }) + .mockReturnValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + } + }); + const mockS3 = { + send: mockSend, + } as unknown as S3Client; + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); + + const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, false); + + expect(mockSend).toHaveBeenCalledTimes(5); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/shard_${shard}.json` - }); + Key: `${blockHeight.toString().padStart(12, '0')}/block.json` + }))); + expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` + }))); + expect(transparentRedis.getStreamerMessage).toHaveBeenCalledTimes(1); + + const block = Block.fromStreamerMessage(streamerMessage); + + expect(block.blockHeight).toEqual(blockHeight); + expect(block.blockHash).toEqual(blockHash); }); - test('Indexer.fetchStreamerMessage() should fetch the block/shards and construct the streamer message', async () => { + test('Indexer.fetchStreamerMessage() should fetch the block and shards from S3 and not cache and construct the streamer message if historical', async () => { const blockHeight = 85233529; const blockHash = 'xyz'; - const getObject = jest.fn() + const mockSend = jest.fn() .mockReturnValueOnce({ // block - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - hash: blockHash, - } - }) - } - }) + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }) + } }) .mockReturnValue({ // shard - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - } - }) + Body: { + transformToString: () => JSON.stringify({}) + } }); const mockS3 = { - getObject, - } as unknown as AWS.S3; - const indexer = new Indexer('mainnet', { s3: mockS3 }); + send: mockSend, + } as unknown as S3Client; + const mockRedis = { + getStreamerMessage: jest.fn() + } as unknown as RedisClient; + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis }); - const streamerMessage = await indexer.fetchStreamerMessage(blockHeight); + const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, true); - expect(getObject).toHaveBeenCalledTimes(5); - expect(getObject.mock.calls[0][0]).toEqual({ + expect(mockSend).toHaveBeenCalledTimes(5); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ Bucket: 'near-lake-data-mainnet', Key: `${blockHeight.toString().padStart(12, '0')}/block.json` - }); - expect(getObject.mock.calls[1][0]).toEqual({ + }))); + expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ Bucket: 'near-lake-data-mainnet', Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` - }); + }))); + expect(mockRedis.getStreamerMessage).toHaveBeenCalledTimes(0); const block = Block.fromStreamerMessage(streamerMessage); @@ -308,7 +395,7 @@ CREATE TABLE }); test('Indexer.transformIndexerFunction() applies the necessary transformations', () => { - const indexer = new Indexer('mainnet'); + const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); const transformedFunction = indexer.transformIndexerFunction('console.log(\'hello\')'); @@ -340,7 +427,7 @@ CREATE TABLE } }) }); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -392,7 +479,7 @@ CREATE TABLE test('Indexer.buildContext() can fetch from the near social api', async () => { const mockFetch = jest.fn(); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -421,7 +508,7 @@ CREATE TABLE errors: ['boom'] }) }); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, INVALID_HASURA_ROLE); @@ -436,7 +523,7 @@ CREATE TABLE data: 'mock', }), }); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -463,7 +550,7 @@ CREATE TABLE }); test('GetTables works for a variety of input schemas', async () => { - const indexer = new Indexer('mainnet'); + const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); const simpleSchemaTables = indexer.getTableNames(SIMPLE_SCHEMA); expect(simpleSchemaTables).toStrictEqual(['posts']); @@ -503,7 +590,7 @@ CREATE TABLE }); test('SanitizeTableName works properly on many test cases', async () => { - const indexer = new Indexer('mainnet'); + const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); expect(indexer.sanitizeTableName('table_name')).toStrictEqual('TableName'); expect(indexer.sanitizeTableName('tablename')).toStrictEqual('Tablename'); // name is not capitalized @@ -518,7 +605,7 @@ CREATE TABLE }); test('indexer fails to build context.db due to collision on sanitized table names', async () => { - const indexer = new Indexer('mainnet'); + const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); const schemaWithDuplicateSanitizedTableNames = `CREATE TABLE "test table" ( @@ -540,7 +627,11 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToInsert = [{ @@ -576,7 +667,11 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToSelect = { @@ -603,7 +698,11 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const whereObj = { @@ -634,7 +733,11 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToInsert = [{ @@ -667,7 +770,11 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const deleteFilter = { @@ -683,7 +790,11 @@ CREATE TABLE create: jest.fn() }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext(STRESS_TEST_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); expect(Object.keys(context.db)).toStrictEqual([ @@ -722,7 +833,11 @@ CREATE TABLE create: jest.fn() }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext('', 'morgs.near/social_feed1', 1, 'postgres'); expect(Object.keys(context.db)).toStrictEqual([]); @@ -783,28 +898,24 @@ CREATE TABLE }); const mockS3 = { - getObject: jest.fn() - .mockReturnValueOnce({ // block - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }), + send: jest.fn() + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, }) - .mockReturnValue({ // shard - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - }, - }), + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, }), - } as unknown as AWS.S3; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3 }); + } as unknown as S3Client; + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -875,20 +986,18 @@ CREATE TABLE })); const blockHeight = 456; const mockS3 = { - getObject: jest.fn(() => ({ - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [], - header: { - height: blockHeight - } - }) - } - }) - })), - } as unknown as AWS.S3; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3 }); + send: jest.fn().mockResolvedValue({ + Body: { + transformToString: () => JSON.stringify({ + chunks: [], + header: { + height: blockHeight + } + }) + } + }), + } as unknown as S3Client; + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -911,33 +1020,29 @@ CREATE TABLE }), })); const mockS3 = { - getObject: jest + send: jest .fn() - .mockReturnValueOnce({ // block - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }), + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, }) - .mockReturnValue({ // shard - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - }, - }), + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, }), - } as unknown as AWS.S3; + } as unknown as S3Client; const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn(), }; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, provisioner }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); const functions = { 'morgs.near/test': { @@ -967,33 +1072,29 @@ CREATE TABLE }), })); const mockS3 = { - getObject: jest + send: jest .fn() - .mockReturnValueOnce({ // block - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }), + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, }) - .mockReturnValue({ // shard - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - }, - }), + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, }), - } as unknown as AWS.S3; + } as unknown as S3Client; const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(true), provisionUserApi: jest.fn(), }; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, provisioner }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); const functions: Record = { 'morgs.near/test': { @@ -1015,33 +1116,29 @@ CREATE TABLE }), })); const mockS3 = { - getObject: jest + send: jest .fn() - .mockReturnValueOnce({ // block - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }), + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, }) - .mockReturnValue({ // shard - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - }, - }), + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, }), - } as unknown as AWS.S3; + } as unknown as S3Client; const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(true), provisionUserApi: jest.fn(), }; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, provisioner }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); const functions: Record = { 'morgs.near/test': { @@ -1066,34 +1163,30 @@ CREATE TABLE }), })); const mockS3 = { - getObject: jest + send: jest .fn() - .mockReturnValueOnce({ // block - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }), + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, }) - .mockReturnValue({ // shard - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - }, - }), + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, }), - } as unknown as AWS.S3; + } as unknown as S3Client; const error = new Error('something went wrong with provisioning'); const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn().mockRejectedValue(error), }; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, provisioner }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); const functions: Record = { 'morgs.near/test': { @@ -1116,7 +1209,7 @@ CREATE TABLE data: {} }) }); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); // @ts-expect-error legacy test const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, null); @@ -1152,7 +1245,7 @@ CREATE TABLE }) }); const role = 'morgs_near'; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); const mutation = ` diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 3a2858cd5..e866652b2 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -1,18 +1,21 @@ import fetch, { type Response } from 'node-fetch'; import { VM } from 'vm2'; -import AWS from 'aws-sdk'; +import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3'; import { Block } from '@near-lake/primitives'; import { Parser } from 'node-sql-parser'; +import { METRICS } from '../metrics'; import Provisioner from '../provisioner'; import DmlHandler from '../dml-handler/dml-handler'; +import RedisClient from '../redis-client'; interface Dependencies { fetch: typeof fetch - s3: AWS.S3 + s3: S3Client provisioner: Provisioner DmlHandler: typeof DmlHandler parser: Parser + redisClient: RedisClient }; interface Context { @@ -44,10 +47,11 @@ export default class Indexer { this.network = network; this.deps = { fetch, - s3: new AWS.S3(), + s3: new S3Client(), provisioner: new Provisioner(), DmlHandler, parser: new Parser(), + redisClient: deps?.redisClient ?? new RedisClient(), ...deps, }; } @@ -58,7 +62,7 @@ export default class Indexer { isHistorical: boolean, options: { provision?: boolean } = { provision: false } ): Promise { - const blockWithHelpers = Block.fromStreamerMessage(await this.fetchStreamerMessage(blockHeight)); + const blockWithHelpers = Block.fromStreamerMessage(await this.fetchStreamerMessage(blockHeight, isHistorical)); const lag = Date.now() - Math.floor(Number(blockWithHelpers.header().timestampNanosec) / 1000000); @@ -132,7 +136,17 @@ export default class Indexer { return blockHeight.toString().padStart(12, '0'); } - async fetchStreamerMessage (blockHeight: number): Promise<{ block: any, shards: any[] }> { + async fetchStreamerMessage (blockHeight: number, isHistorical: boolean): Promise<{ block: any, shards: any[] }> { + if (!isHistorical) { + const cachedMessage = await this.deps.redisClient.getStreamerMessage(blockHeight); + if (cachedMessage) { + METRICS.CACHE_HIT.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc(); + const parsedMessage = JSON.parse(cachedMessage); + return parsedMessage; + } else { + METRICS.CACHE_MISS.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc(); + } + } const blockPromise = this.fetchBlockPromise(blockHeight); const shardsPromises = await this.fetchShardsPromises(blockHeight, 4); @@ -156,9 +170,9 @@ export default class Indexer { Bucket: `near-lake-data-${this.network}`, Key: `${this.normalizeBlockHeight(blockHeight)}/shard_${shardId}.json`, }; - return await this.deps.s3.getObject(params).promise().then((response) => { - return JSON.parse(response.Body?.toString() ?? '{}', (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); - }); + const response = await this.deps.s3.send(new GetObjectCommand(params)); + const shardData = await response.Body?.transformToString() ?? '{}'; + return JSON.parse(shardData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); } async fetchBlockPromise (blockHeight: number): Promise { @@ -168,10 +182,9 @@ export default class Indexer { Bucket: 'near-lake-data-' + this.network, Key: `${folder}/${file}`, }; - return await this.deps.s3.getObject(params).promise().then((response) => { - const block = JSON.parse(response.Body?.toString() ?? '{}', (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); - return block; - }); + const response = await this.deps.s3.send(new GetObjectCommand(params)); + const blockData = await response.Body?.transformToString() ?? '{}'; + return JSON.parse(blockData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); } enableAwaitTransform (indexerFunction: string): string { @@ -479,7 +492,7 @@ export default class Indexer { } renameUnderscoreFieldsToCamelCase (value: Record): Record { - if (typeof value === 'object' && !Array.isArray(value)) { + if (value !== null && typeof value === 'object' && !Array.isArray(value)) { // It's a non-null, non-array object, create a replacement with the keys initially-capped const newValue: any = {}; for (const key in value) { diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index 65ef6b990..ee757adc7 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -13,9 +13,23 @@ const EXECUTION_DURATION = new promClient.Gauge({ labelNames: ['indexer', 'type'], }); +const CACHE_HIT = new promClient.Counter({ + name: 'queryapi_runner_cache_hit', + help: 'The number of times cache was hit successfully', + labelNames: ['type', 'key'] +}); + +const CACHE_MISS = new promClient.Counter({ + name: 'queryapi_runner_cache_miss', + help: 'The number of times cache was missed', + labelNames: ['type', 'key'] +}); + export const METRICS = { EXECUTION_DURATION, UNPROCESSED_STREAM_MESSAGES, + CACHE_HIT, + CACHE_MISS }; export const startServer = async (): Promise => { diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/redis-client/redis-client.test.ts index 85588ccab..26030f249 100644 --- a/runner/src/redis-client/redis-client.test.ts +++ b/runner/src/redis-client/redis-client.test.ts @@ -81,4 +81,17 @@ describe('RedisClient', () => { expect(mockClient.sMembers).toHaveBeenCalledWith('streams'); expect(streams).toEqual(['streamKey1', 'streamKey2']); }); + + it('returns streamer message', async () => { + const mockClient = { + on: jest.fn(), + connect: jest.fn().mockResolvedValue(null), + get: jest.fn(), + } as any; + + const client = new RedisClient(mockClient); + await client.getStreamerMessage(1000); + + expect(mockClient.get).toHaveBeenCalledWith('streamer:message:1000'); + }); }); diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index 9caa66226..18e11b854 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -20,6 +20,7 @@ export default class RedisClient { SMALLEST_STREAM_ID = '0'; LARGEST_STREAM_ID = '+'; STREAMS_SET_KEY = 'streams'; + STREAMER_MESSAGE_HASH_KEY_BASE = 'streamer:message:'; constructor ( private readonly client: RedisClientType = createClient({ url: process.env.REDIS_CONNECTION_STRING }) @@ -83,4 +84,8 @@ export default class RedisClient { async getStreams (): Promise { return await this.client.sMembers(this.STREAMS_SET_KEY); } + + async getStreamerMessage (blockHeight: number): Promise { + return await this.client.get(`${this.STREAMER_MESSAGE_HASH_KEY_BASE}${blockHeight}`); + } } diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 7e1fe2237..28c24128b 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -3,6 +3,7 @@ import { Worker, isMainThread } from 'worker_threads'; import { type Message } from './types'; import { METRICS } from '../metrics'; +import { Gauge } from 'prom-client'; export default class StreamHandler { private readonly worker?: Worker; @@ -24,6 +25,8 @@ export default class StreamHandler { } private handleMessage (message: Message): void { - METRICS[message.type].labels(message.labels).set(message.value); + if (METRICS[message.type] instanceof Gauge) { + (METRICS[message.type] as Gauge).labels(message.labels).set(message.value); + } } }