diff --git a/block-server/handler.js b/block-server/handler.js index 7d4d2b2e7..70d6703e6 100644 --- a/block-server/handler.js +++ b/block-server/handler.js @@ -1,6 +1,6 @@ 'use strict'; -const AWS = require('aws-sdk'); -const S3= new AWS.S3(); +import { S3 } from '@aws-sdk/client-s3'; +const S3 = new S3(); const NETWORK = process.env.NETWORK || 'mainnet'; diff --git a/frontend/replacement.dev.json b/frontend/replacement.dev.json index 1a233cb9d..2ce60ce37 100644 --- a/frontend/replacement.dev.json +++ b/frontend/replacement.dev.json @@ -1,6 +1,7 @@ { "REPL_ACCOUNT_ID": "dev-queryapi.dataplatform.near", "REPL_GRAPHQL_ENDPOINT": "https://near-queryapi.dev.api.pagoda.co", + "REPL_GRAPHQL_ENDPOINT_V2": "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app", "REPL_EXTERNAL_APP_URL": "https://queryapi-frontend-vcqilefdcq-ew.a.run.app", "REPL_REGISTRY_CONTRACT_ID": "dev-queryapi.dataplatform.near" } diff --git a/frontend/replacement.local.json b/frontend/replacement.local.json index 9b8fa584e..5e113b5a6 100644 --- a/frontend/replacement.local.json +++ b/frontend/replacement.local.json @@ -1,6 +1,7 @@ { "REPL_ACCOUNT_ID": "dataplatform.near", "REPL_GRAPHQL_ENDPOINT": "https://near-queryapi.api.pagoda.co", + "REPL_GRAPHQL_ENDPOINT_V2": "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app", "REPL_EXTERNAL_APP_URL": "http://localhost:3000", "REPL_REGISTRY_CONTRACT_ID": "queryapi.dataplatform.near" } diff --git a/frontend/replacement.mainnet.json b/frontend/replacement.mainnet.json index 7a98e12d4..fc1bd5ae2 100644 --- a/frontend/replacement.mainnet.json +++ b/frontend/replacement.mainnet.json @@ -1,6 +1,7 @@ { "REPL_ACCOUNT_ID": "dataplatform.near", "REPL_GRAPHQL_ENDPOINT": "https://near-queryapi.api.pagoda.co", + "REPL_GRAPHQL_ENDPOINT_V2": "https://queryapi-hasura-graphql-mainnet-24ktefolwq-ew.a.run.app", "REPL_EXTERNAL_APP_URL": "https://queryapi-frontend-24ktefolwq-ew.a.run.app", "REPL_REGISTRY_CONTRACT_ID": "queryapi.dataplatform.near" } diff --git a/frontend/widgets/src/QueryApi.IndexerStatus.jsx b/frontend/widgets/src/QueryApi.IndexerStatus.jsx index 4def76129..ba61af987 100644 --- a/frontend/widgets/src/QueryApi.IndexerStatus.jsx +++ b/frontend/widgets/src/QueryApi.IndexerStatus.jsx @@ -95,6 +95,9 @@ const TextLink = styled.a` if (!indexer_name) return "missing indexer_name"; +let v1_endpoint = `${REPL_GRAPHQL_ENDPOINT}`; +let v2_endpoint = `${REPL_GRAPHQL_ENDPOINT_V2}`; + State.init({ logs: [], state: [], @@ -105,11 +108,17 @@ State.init({ indexer_resPage: 0, logsPage: 0, statePage: 0, + v2Toggle: Storage.privateGet("QueryApiV2Toggle") || false, }); +let graphQLEndpoint = state.v2Toggle ? v2_endpoint : v1_endpoint; + function fetchGraphQL(operationsDoc, operationName, variables) { - return asyncFetch(`${REPL_GRAPHQL_ENDPOINT}/v1/graphql`, { + return asyncFetch(`${graphQLEndpoint}/v1/graphql`, { method: "POST", + headers: { + "x-hasura-role": "append" + }, body: JSON.stringify({ query: operationsDoc, variables: variables, @@ -119,7 +128,7 @@ function fetchGraphQL(operationsDoc, operationName, variables) { } const createGraphQLLink = () => { - const queryLink = `https://cloud.hasura.io/public/graphiql?endpoint=${REPL_GRAPHQL_ENDPOINT}/v1/graphql&query=query+IndexerQuery+%7B%0A++indexer_state%28where%3A+%7Bfunction_name%3A+%7B_eq%3A+%22function_placeholder%22%7D%7D%29+%7B%0A++++function_name%0A++++current_block_height%0A++%7D%0A++indexer_log_entries%28%0A++++where%3A+%7Bfunction_name%3A+%7B_eq%3A+%22function_placeholder%22%7D%7D%0A++++order_by%3A+%7B+timestamp%3A+desc%7D%0A++%29+%7B%0A++++function_name%0A++++id%0A++++message%0A++++timestamp%0A++%7D%0A%7D%0A`; + const queryLink = `https://cloud.hasura.io/public/graphiql?endpoint=${graphQLEndpoint}/v1/graphql&query=query+IndexerQuery+%7B%0A++indexer_state%28where%3A+%7Bfunction_name%3A+%7B_eq%3A+%22function_placeholder%22%7D%7D%29+%7B%0A++++function_name%0A++++current_block_height%0A++%7D%0A++indexer_log_entries%28%0A++++where%3A+%7Bfunction_name%3A+%7B_eq%3A+%22function_placeholder%22%7D%7D%0A++++order_by%3A+%7B+timestamp%3A+desc%7D%0A++%29+%7B%0A++++function_name%0A++++id%0A++++message%0A++++timestamp%0A++%7D%0A%7D%0A`; return queryLink.replaceAll( "function_placeholder", `${accountId}/${indexer_name}` @@ -152,14 +161,15 @@ const indexerStateDoc = ` current_block_height current_historical_block_height } - indexer_state_aggregate(where: {function_name: {_eq: "${accountId}/${indexer_name}"}}) { - aggregate { - count - } - } } `; -if (!state.initialFetch) { + +const prevV2ToggleSelected = Storage.privateGet("QueryApiV2Toggle"); +if ( + !state.initialFetch || + (prevV2ToggleSelected !== state.v2Toggle) +) { + Storage.privateSet("QueryApiV2Toggle", state.v2Toggle); fetchGraphQL(logsDoc, "QueryLogs", { offset: state.logsPage * LIMIT, }).then((result) => { @@ -235,6 +245,44 @@ const onIndexerResPageChange = (page) => { State.update({ indexer_resPage: page, currentPage: page }); }; +const DisclaimerContainer = styled.div` + padding: 10px; + margin: 0.5px; + text-color: black; + display: flex; + width: 50; + border: 2px solid rgb(240, 240, 240); + border-radius: 8px; + align-items: "center"; + margin-bottom: 5px; +`; + +const Notice = styled.div` + font-weight: 900; + font-size: 22px; + align-self: flex-start; + margin: 10px 0px 30px; + text-align: center; + padding-bottom: 5px; + border-bottom: 1px solid rgb(240, 240, 241); + color: rgb(36, 39, 42); +`; + +const DisclaimerText = styled.p` + font-size: 14px; + line-height: 20px; + font-weight: 400; + color: rgb(17, 24, 28); + word-break: break-word; + width: 700px; + text-align: start; + padding-left: 10px; + + @media (max-width: 1024px) { + width: 80%; + } +`; + return ( <> @@ -244,6 +292,39 @@ return ( GraphQL Playground +
+ +
+ V2 Testing Notice +
+ + QueryAPI is still in beta. We are working on a OueryAPI V2 + with faster historical processing, easier access to DB and + more control over your indexer. V2 is running in parallel and + you can see the logs from this new version by toggling this + button. + + { + State.update({ v2Toggle: !state.v2Toggle }); + }, + }} + /> +
+
+
+
@@ -270,7 +351,9 @@ return ( {x.function_name} {x.current_block_height} - {x.current_historical_block_height} + + {x.current_historical_block_height} + {x.status} ))} diff --git a/frontend/widgets/src/components/toggle.jsx b/frontend/widgets/src/components/toggle.jsx new file mode 100644 index 000000000..a81b7d9bf --- /dev/null +++ b/frontend/widgets/src/components/toggle.jsx @@ -0,0 +1,78 @@ +const ToggleRoot = styled.div` + justify-content: space-between; + width: fit-content; + max-width: 100%; +`; + +const ToggleSwitchRoot = styled("Switch.Root")` + all: unset; + display: block; + width: 42px; + height: 25px; + background-color: #d1d1d1; + border-radius: 9999px; + position: relative; + box-shadow: 0 2px 10px var(--blackA7); + + &[data-state="checked"] { + background-color: #00d084; + } + + &[data-disabled=""] { + opacity: 0.7; + } +`; + +const ToggleSwitchThumb = styled("Switch.Thumb")` + all: unset; + display: block; + width: 21px; + height: 21px; + border-radius: 9999px; + transition: transform 100ms; + transform: translateX(2px); + will-change: transform; + + &[data-state="checked"] { + transform: translateX(19px); + } +`; + +const ToggleLabel = styled.label` + white-space: nowrap; +`; + +const Toggle = ({ + active, + className, + direction, + disabled, + key, + label, + onSwitch, + ...rest +}) => ( + + {label} + + + {!disabled && } + + +); + +return Toggle(props); diff --git a/hasura/metadata/databases/default/tables/public_indexer_log_entries.yaml b/hasura/metadata/databases/default/tables/public_indexer_log_entries.yaml index c95174150..50b671595 100644 --- a/hasura/metadata/databases/default/tables/public_indexer_log_entries.yaml +++ b/hasura/metadata/databases/default/tables/public_indexer_log_entries.yaml @@ -19,5 +19,6 @@ select_permissions: - message - timestamp - id + allow_aggregations: true filter: {} role: append diff --git a/hasura/metadata/databases/default/tables/public_indexer_state.yaml b/hasura/metadata/databases/default/tables/public_indexer_state.yaml index 51084d5a3..450b822e0 100644 --- a/hasura/metadata/databases/default/tables/public_indexer_state.yaml +++ b/hasura/metadata/databases/default/tables/public_indexer_state.yaml @@ -17,6 +17,7 @@ select_permissions: - current_block_height - current_historical_block_height - status + allow_aggregations: true filter: {} role: append update_permissions: diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index ca7125d7c..82b60b8c3 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -151,6 +151,7 @@ pub(crate) async fn process_historical_messages( redis_connection_manager, storage::generate_historical_storage_key(&indexer_function.get_full_name()), serde_json::to_string(&indexer_function)?, + None, ) .await?; } diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index 28c2dc3f3..a67f5653a 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -6,11 +6,12 @@ use tokio::sync::{Mutex, MutexGuard}; use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatch}; use near_lake_framework::near_indexer_primitives::types::{AccountId, BlockHeight}; use near_lake_framework::near_indexer_primitives::{types, StreamerMessage}; +use utils::serialize_to_camel_case_json_string; use crate::indexer_types::IndexerFunction; use indexer_types::{IndexerQueueMessage, IndexerRegistry}; use opts::{Opts, Parser}; -use storage::{self, ConnectionManager}; +use storage::{self, generate_real_time_streamer_message_key, ConnectionManager}; mod historical_block_processing; mod indexer_reducer; @@ -146,6 +147,15 @@ async fn handle_streamer_message( let block_height: BlockHeight = context.streamer_message.block.header.height; + // Cache streamer message block and shards for use in real time processing + storage::set( + context.redis_connection_manager, + generate_real_time_streamer_message_key(block_height), + &serialize_to_camel_case_json_string(&context.streamer_message)?, + Some(60), + ) + .await?; + let spawned_indexers = indexer_registry::index_registry_changes( block_height, &mut indexer_registry_locked, @@ -206,6 +216,7 @@ async fn handle_streamer_message( context.redis_connection_manager, storage::generate_real_time_storage_key(&indexer_function.get_full_name()), serde_json::to_string(indexer_function)?, + None, ) .await?; storage::xadd( diff --git a/indexer/queryapi_coordinator/src/utils.rs b/indexer/queryapi_coordinator/src/utils.rs index 2cf5207ed..9f8c2585a 100644 --- a/indexer/queryapi_coordinator/src/utils.rs +++ b/indexer/queryapi_coordinator/src/utils.rs @@ -1,3 +1,5 @@ +use serde_json::Value; + pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) { let interval_secs = 10; let mut previous_processed_blocks: u64 = @@ -49,3 +51,52 @@ pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; } } + +pub(crate) fn serialize_to_camel_case_json_string( + streamer_message: &near_lake_framework::near_indexer_primitives::StreamerMessage, +) -> anyhow::Result { + // Serialize the Message object to a JSON string + let json_str = serde_json::to_string(&streamer_message)?; + + // Deserialize the JSON string to a Value Object + let mut message_value: Value = serde_json::from_str(&json_str)?; + + // Convert keys to Camel Case + to_camel_case_keys(&mut message_value); + + return serde_json::to_string(&message_value); +} + +fn to_camel_case_keys(message_value: &mut Value) { + // Only process if subfield contains objects + match message_value { + Value::Object(map) => { + for key in map.keys().cloned().collect::>() { + // Generate Camel Case Key + let new_key = key + .split("_") + .enumerate() + .map(|(i, str)| { + if i > 0 { + return str[..1].to_uppercase() + &str[1..]; + } + return str.to_owned(); + }) + .collect::>() + .join(""); + + // Recursively process inner fields and update map with new key + if let Some(mut val) = map.remove(&key) { + to_camel_case_keys(&mut val); + map.insert(new_key, val); + } + } + } + Value::Array(vec) => { + for val in vec { + to_camel_case_keys(val); + } + } + _ => {} + } +} diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 6c449b6b2..a2d20b850 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -2,6 +2,7 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; const STORAGE: &str = "storage_alertexer"; +pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; pub const STREAMS_SET_KEY: &str = "streams"; pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { @@ -12,6 +13,10 @@ pub fn generate_real_time_stream_key(prefix: &str) -> String { format!("{}:real_time:stream", prefix) } +pub fn generate_real_time_streamer_message_key(block_height: u64) -> String { + format!("streamer:message:{}", block_height) +} + pub fn generate_real_time_storage_key(prefix: &str) -> String { format!("{}:real_time:stream:storage", prefix) } @@ -47,13 +52,19 @@ pub async fn set( redis_connection_manager: &ConnectionManager, key: impl ToRedisArgs + std::fmt::Debug, value: impl ToRedisArgs + std::fmt::Debug, + expiration_seconds: Option, ) -> anyhow::Result<()> { - redis::cmd("SET") - .arg(&key) - .arg(&value) - .query_async(&mut redis_connection_manager.clone()) + let mut cmd = redis::cmd("SET"); + cmd.arg(&key).arg(&value); + + // Add expiration arguments if present + if let Some(expiration_seconds) = expiration_seconds { + cmd.arg("EX").arg(expiration_seconds); + } + + cmd.query_async(&mut redis_connection_manager.clone()) .await?; - tracing::debug!(target: STORAGE, "SET: {:?}: {:?}", key, value,); + tracing::debug!(target: STORAGE, "SET: {:?}: {:?} Ex: {:?}", key, value, expiration_seconds); Ok(()) } @@ -113,7 +124,7 @@ pub async fn push_receipt_to_watching_list( receipt_id: &str, cache_value: &[u8], ) -> anyhow::Result<()> { - set(redis_connection_manager, receipt_id, cache_value).await?; + set(redis_connection_manager, receipt_id, cache_value, None).await?; // redis::cmd("INCR") // .arg(format!("receipts_{}", transaction_hash)) // .query_async(&mut redis_connection_manager.clone()) @@ -161,7 +172,13 @@ pub async fn update_last_indexed_block( redis_connection_manager: &ConnectionManager, block_height: u64, ) -> anyhow::Result<()> { - set(redis_connection_manager, "last_indexed_block", block_height).await?; + set( + redis_connection_manager, + "last_indexed_block", + block_height, + None, + ) + .await?; redis::cmd("INCR") .arg("blocks_processed") .query_async(&mut redis_connection_manager.clone()) diff --git a/runner/package.json b/runner/package.json index 6d14e3e5c..213cc4e53 100644 --- a/runner/package.json +++ b/runner/package.json @@ -42,8 +42,8 @@ "typescript": "^5.1.6" }, "dependencies": { + "@aws-sdk/client-s3": "^3.414.0", "@near-lake/primitives": "^0.1.0", - "aws-sdk": "^2.1402.0", "express": "^4.18.2", "node-fetch": "^2.6.11", "node-sql-parser": "^4.10.0", diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index 37a828639..38f9ce77c 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -1,6 +1,7 @@ import { Block } from '@near-lake/primitives'; import type fetch from 'node-fetch'; -import type AWS from 'aws-sdk'; +import { type S3Client, GetObjectCommand } from '@aws-sdk/client-s3'; +import type RedisClient from '../redis-client'; import Indexer from './indexer'; import { VM } from 'vm2'; @@ -162,6 +163,10 @@ CREATE TABLE }), }); + const transparentRedis = { + getStreamerMessage: jest.fn() + } as unknown as RedisClient; + beforeEach(() => { process.env = { ...oldEnv, @@ -182,21 +187,24 @@ CREATE TABLE }), })); const blockHeight = 456; - const mockS3 = { - getObject: jest.fn(() => ({ - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [], - header: { - height: blockHeight - } - }) - } - }) - })), - } as unknown as AWS.S3; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3 }); + const mockData = jest.fn().mockResolvedValue( + JSON.stringify( + { + block: { + chunks: [], + header: { + height: blockHeight + } + }, + shards: {} + } + ) + ); + const mockRedis = { + getStreamerMessage: mockData + } as unknown as RedisClient; + + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: mockRedis }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -211,95 +219,174 @@ CREATE TABLE expect(mockFetch.mock.calls).toMatchSnapshot(); }); - test('Indexer.fetchBlock() should fetch a block from the S3', async () => { + test('Indexer.fetchBlock() should fetch a block from S3', async () => { const author = 'dokiacapital.poolv1.near'; + const mockData = JSON.stringify({ + author + }); + const mockSend = jest.fn().mockResolvedValue({ + Body: { + transformToString: () => mockData + } + }); const mockS3 = { - getObject: jest.fn(() => ({ - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - author - }) - } - }) - })), - } as unknown as AWS.S3; - const indexer = new Indexer('mainnet', { s3: mockS3 }); + send: mockSend, + } as unknown as S3Client; + + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); const blockHeight = 84333960; const block = await indexer.fetchBlockPromise(blockHeight); - - expect(mockS3.getObject).toHaveBeenCalledTimes(1); - expect(mockS3.getObject).toHaveBeenCalledWith({ + const params = { Bucket: 'near-lake-data-mainnet', Key: `${blockHeight.toString().padStart(12, '0')}/block.json` - }); + }; + + expect(mockS3.send).toHaveBeenCalledTimes(1); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand(params))); expect(block.author).toEqual(author); }); - test('Indexer.fetchShard() should fetch the steamer message from S3', async () => { + test('Indexer.fetchShard() should fetch a shard from S3', async () => { + const mockData = JSON.stringify({}); + const mockSend = jest.fn().mockResolvedValue({ + Body: { + transformToString: () => mockData + } + }); const mockS3 = { - getObject: jest.fn(() => ({ - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - } - }) - })), - } as unknown as AWS.S3; - const indexer = new Indexer('mainnet', { s3: mockS3 }); + send: mockSend, + } as unknown as S3Client; + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); const blockHeight = 82699904; const shard = 0; + const params = { + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/shard_${shard}.json` + }; await indexer.fetchShardPromise(blockHeight, shard); - expect(mockS3.getObject).toHaveBeenCalledTimes(1); - expect(mockS3.getObject).toHaveBeenCalledWith({ + expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand(params))); + }); + + test('Indexer.fetchStreamerMessage() should fetch the message from cache and use it directly', async () => { + const blockHeight = 85233529; + const blockHash = 'xyz'; + const getMessage = jest.fn() + .mockReturnValueOnce(JSON.stringify( + { + block: { + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }, + shards: {} + } + )); + const mockRedis = { + getStreamerMessage: getMessage + } as unknown as RedisClient; + const indexer = new Indexer('mainnet', { redisClient: mockRedis }); + + const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, false); + + expect(getMessage).toHaveBeenCalledTimes(1); + expect(JSON.stringify(getMessage.mock.calls[0])).toEqual( + `[${blockHeight}]` + ); + const block = Block.fromStreamerMessage(streamerMessage); + + expect(block.blockHeight).toEqual(blockHeight); + expect(block.blockHash).toEqual(blockHash); + }); + + test('Indexer.fetchStreamerMessage() should fetch the block and shards from S3 upon cache miss', async () => { + const blockHeight = 85233529; + const blockHash = 'xyz'; + const mockSend = jest.fn() + .mockReturnValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }) + } + }) + .mockReturnValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + } + }); + const mockS3 = { + send: mockSend, + } as unknown as S3Client; + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); + + const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, false); + + expect(mockSend).toHaveBeenCalledTimes(5); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/shard_${shard}.json` - }); + Key: `${blockHeight.toString().padStart(12, '0')}/block.json` + }))); + expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` + }))); + expect(transparentRedis.getStreamerMessage).toHaveBeenCalledTimes(1); + + const block = Block.fromStreamerMessage(streamerMessage); + + expect(block.blockHeight).toEqual(blockHeight); + expect(block.blockHash).toEqual(blockHash); }); - test('Indexer.fetchStreamerMessage() should fetch the block/shards and construct the streamer message', async () => { + test('Indexer.fetchStreamerMessage() should fetch the block and shards from S3 and not cache and construct the streamer message if historical', async () => { const blockHeight = 85233529; const blockHash = 'xyz'; - const getObject = jest.fn() + const mockSend = jest.fn() .mockReturnValueOnce({ // block - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - hash: blockHash, - } - }) - } - }) + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }) + } }) .mockReturnValue({ // shard - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - } - }) + Body: { + transformToString: () => JSON.stringify({}) + } }); const mockS3 = { - getObject, - } as unknown as AWS.S3; - const indexer = new Indexer('mainnet', { s3: mockS3 }); + send: mockSend, + } as unknown as S3Client; + const mockRedis = { + getStreamerMessage: jest.fn() + } as unknown as RedisClient; + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis }); - const streamerMessage = await indexer.fetchStreamerMessage(blockHeight); + const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, true); - expect(getObject).toHaveBeenCalledTimes(5); - expect(getObject.mock.calls[0][0]).toEqual({ + expect(mockSend).toHaveBeenCalledTimes(5); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ Bucket: 'near-lake-data-mainnet', Key: `${blockHeight.toString().padStart(12, '0')}/block.json` - }); - expect(getObject.mock.calls[1][0]).toEqual({ + }))); + expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ Bucket: 'near-lake-data-mainnet', Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` - }); + }))); + expect(mockRedis.getStreamerMessage).toHaveBeenCalledTimes(0); const block = Block.fromStreamerMessage(streamerMessage); @@ -308,7 +395,7 @@ CREATE TABLE }); test('Indexer.transformIndexerFunction() applies the necessary transformations', () => { - const indexer = new Indexer('mainnet'); + const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); const transformedFunction = indexer.transformIndexerFunction('console.log(\'hello\')'); @@ -340,7 +427,7 @@ CREATE TABLE } }) }); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -392,7 +479,7 @@ CREATE TABLE test('Indexer.buildContext() can fetch from the near social api', async () => { const mockFetch = jest.fn(); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -421,7 +508,7 @@ CREATE TABLE errors: ['boom'] }) }); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, INVALID_HASURA_ROLE); @@ -436,7 +523,7 @@ CREATE TABLE data: 'mock', }), }); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -463,7 +550,7 @@ CREATE TABLE }); test('GetTables works for a variety of input schemas', async () => { - const indexer = new Indexer('mainnet'); + const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); const simpleSchemaTables = indexer.getTableNames(SIMPLE_SCHEMA); expect(simpleSchemaTables).toStrictEqual(['posts']); @@ -503,7 +590,7 @@ CREATE TABLE }); test('SanitizeTableName works properly on many test cases', async () => { - const indexer = new Indexer('mainnet'); + const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); expect(indexer.sanitizeTableName('table_name')).toStrictEqual('TableName'); expect(indexer.sanitizeTableName('tablename')).toStrictEqual('Tablename'); // name is not capitalized @@ -518,7 +605,7 @@ CREATE TABLE }); test('indexer fails to build context.db due to collision on sanitized table names', async () => { - const indexer = new Indexer('mainnet'); + const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); const schemaWithDuplicateSanitizedTableNames = `CREATE TABLE "test table" ( @@ -540,7 +627,11 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToInsert = [{ @@ -576,7 +667,11 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToSelect = { @@ -603,7 +698,11 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const whereObj = { @@ -634,7 +733,11 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToInsert = [{ @@ -667,7 +770,11 @@ CREATE TABLE }) }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const deleteFilter = { @@ -683,7 +790,11 @@ CREATE TABLE create: jest.fn() }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext(STRESS_TEST_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); expect(Object.keys(context.db)).toStrictEqual([ @@ -722,7 +833,11 @@ CREATE TABLE create: jest.fn() }; - const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { + fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, + DmlHandler: mockDmlHandler + }); const context = indexer.buildContext('', 'morgs.near/social_feed1', 1, 'postgres'); expect(Object.keys(context.db)).toStrictEqual([]); @@ -783,28 +898,24 @@ CREATE TABLE }); const mockS3 = { - getObject: jest.fn() - .mockReturnValueOnce({ // block - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }), + send: jest.fn() + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, }) - .mockReturnValue({ // shard - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - }, - }), + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, }), - } as unknown as AWS.S3; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3 }); + } as unknown as S3Client; + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -875,20 +986,18 @@ CREATE TABLE })); const blockHeight = 456; const mockS3 = { - getObject: jest.fn(() => ({ - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [], - header: { - height: blockHeight - } - }) - } - }) - })), - } as unknown as AWS.S3; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3 }); + send: jest.fn().mockResolvedValue({ + Body: { + transformToString: () => JSON.stringify({ + chunks: [], + header: { + height: blockHeight + } + }) + } + }), + } as unknown as S3Client; + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -911,33 +1020,29 @@ CREATE TABLE }), })); const mockS3 = { - getObject: jest + send: jest .fn() - .mockReturnValueOnce({ // block - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }), + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, }) - .mockReturnValue({ // shard - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - }, - }), + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, }), - } as unknown as AWS.S3; + } as unknown as S3Client; const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn(), }; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, provisioner }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); const functions = { 'morgs.near/test': { @@ -967,33 +1072,29 @@ CREATE TABLE }), })); const mockS3 = { - getObject: jest + send: jest .fn() - .mockReturnValueOnce({ // block - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }), + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, }) - .mockReturnValue({ // shard - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - }, - }), + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, }), - } as unknown as AWS.S3; + } as unknown as S3Client; const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(true), provisionUserApi: jest.fn(), }; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, provisioner }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); const functions: Record = { 'morgs.near/test': { @@ -1015,33 +1116,29 @@ CREATE TABLE }), })); const mockS3 = { - getObject: jest + send: jest .fn() - .mockReturnValueOnce({ // block - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }), + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, }) - .mockReturnValue({ // shard - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - }, - }), + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, }), - } as unknown as AWS.S3; + } as unknown as S3Client; const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(true), provisionUserApi: jest.fn(), }; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, provisioner }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); const functions: Record = { 'morgs.near/test': { @@ -1066,34 +1163,30 @@ CREATE TABLE }), })); const mockS3 = { - getObject: jest + send: jest .fn() - .mockReturnValueOnce({ // block - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - }, - }), - }, - }), + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, }) - .mockReturnValue({ // shard - promise: async () => await Promise.resolve({ - Body: { - toString: () => JSON.stringify({}) - }, - }), + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, }), - } as unknown as AWS.S3; + } as unknown as S3Client; const error = new Error('something went wrong with provisioning'); const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn().mockRejectedValue(error), }; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, provisioner }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); const functions: Record = { 'morgs.near/test': { @@ -1116,7 +1209,7 @@ CREATE TABLE data: {} }) }); - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); // @ts-expect-error legacy test const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, null); @@ -1152,7 +1245,7 @@ CREATE TABLE }) }); const role = 'morgs_near'; - const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); const mutation = ` diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 3a2858cd5..e866652b2 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -1,18 +1,21 @@ import fetch, { type Response } from 'node-fetch'; import { VM } from 'vm2'; -import AWS from 'aws-sdk'; +import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3'; import { Block } from '@near-lake/primitives'; import { Parser } from 'node-sql-parser'; +import { METRICS } from '../metrics'; import Provisioner from '../provisioner'; import DmlHandler from '../dml-handler/dml-handler'; +import RedisClient from '../redis-client'; interface Dependencies { fetch: typeof fetch - s3: AWS.S3 + s3: S3Client provisioner: Provisioner DmlHandler: typeof DmlHandler parser: Parser + redisClient: RedisClient }; interface Context { @@ -44,10 +47,11 @@ export default class Indexer { this.network = network; this.deps = { fetch, - s3: new AWS.S3(), + s3: new S3Client(), provisioner: new Provisioner(), DmlHandler, parser: new Parser(), + redisClient: deps?.redisClient ?? new RedisClient(), ...deps, }; } @@ -58,7 +62,7 @@ export default class Indexer { isHistorical: boolean, options: { provision?: boolean } = { provision: false } ): Promise { - const blockWithHelpers = Block.fromStreamerMessage(await this.fetchStreamerMessage(blockHeight)); + const blockWithHelpers = Block.fromStreamerMessage(await this.fetchStreamerMessage(blockHeight, isHistorical)); const lag = Date.now() - Math.floor(Number(blockWithHelpers.header().timestampNanosec) / 1000000); @@ -132,7 +136,17 @@ export default class Indexer { return blockHeight.toString().padStart(12, '0'); } - async fetchStreamerMessage (blockHeight: number): Promise<{ block: any, shards: any[] }> { + async fetchStreamerMessage (blockHeight: number, isHistorical: boolean): Promise<{ block: any, shards: any[] }> { + if (!isHistorical) { + const cachedMessage = await this.deps.redisClient.getStreamerMessage(blockHeight); + if (cachedMessage) { + METRICS.CACHE_HIT.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc(); + const parsedMessage = JSON.parse(cachedMessage); + return parsedMessage; + } else { + METRICS.CACHE_MISS.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc(); + } + } const blockPromise = this.fetchBlockPromise(blockHeight); const shardsPromises = await this.fetchShardsPromises(blockHeight, 4); @@ -156,9 +170,9 @@ export default class Indexer { Bucket: `near-lake-data-${this.network}`, Key: `${this.normalizeBlockHeight(blockHeight)}/shard_${shardId}.json`, }; - return await this.deps.s3.getObject(params).promise().then((response) => { - return JSON.parse(response.Body?.toString() ?? '{}', (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); - }); + const response = await this.deps.s3.send(new GetObjectCommand(params)); + const shardData = await response.Body?.transformToString() ?? '{}'; + return JSON.parse(shardData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); } async fetchBlockPromise (blockHeight: number): Promise { @@ -168,10 +182,9 @@ export default class Indexer { Bucket: 'near-lake-data-' + this.network, Key: `${folder}/${file}`, }; - return await this.deps.s3.getObject(params).promise().then((response) => { - const block = JSON.parse(response.Body?.toString() ?? '{}', (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); - return block; - }); + const response = await this.deps.s3.send(new GetObjectCommand(params)); + const blockData = await response.Body?.transformToString() ?? '{}'; + return JSON.parse(blockData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); } enableAwaitTransform (indexerFunction: string): string { @@ -479,7 +492,7 @@ export default class Indexer { } renameUnderscoreFieldsToCamelCase (value: Record): Record { - if (typeof value === 'object' && !Array.isArray(value)) { + if (value !== null && typeof value === 'object' && !Array.isArray(value)) { // It's a non-null, non-array object, create a replacement with the keys initially-capped const newValue: any = {}; for (const key in value) { diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index 65ef6b990..ee757adc7 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -13,9 +13,23 @@ const EXECUTION_DURATION = new promClient.Gauge({ labelNames: ['indexer', 'type'], }); +const CACHE_HIT = new promClient.Counter({ + name: 'queryapi_runner_cache_hit', + help: 'The number of times cache was hit successfully', + labelNames: ['type', 'key'] +}); + +const CACHE_MISS = new promClient.Counter({ + name: 'queryapi_runner_cache_miss', + help: 'The number of times cache was missed', + labelNames: ['type', 'key'] +}); + export const METRICS = { EXECUTION_DURATION, UNPROCESSED_STREAM_MESSAGES, + CACHE_HIT, + CACHE_MISS }; export const startServer = async (): Promise => { diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/redis-client/redis-client.test.ts index 85588ccab..26030f249 100644 --- a/runner/src/redis-client/redis-client.test.ts +++ b/runner/src/redis-client/redis-client.test.ts @@ -81,4 +81,17 @@ describe('RedisClient', () => { expect(mockClient.sMembers).toHaveBeenCalledWith('streams'); expect(streams).toEqual(['streamKey1', 'streamKey2']); }); + + it('returns streamer message', async () => { + const mockClient = { + on: jest.fn(), + connect: jest.fn().mockResolvedValue(null), + get: jest.fn(), + } as any; + + const client = new RedisClient(mockClient); + await client.getStreamerMessage(1000); + + expect(mockClient.get).toHaveBeenCalledWith('streamer:message:1000'); + }); }); diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index 9caa66226..18e11b854 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -20,6 +20,7 @@ export default class RedisClient { SMALLEST_STREAM_ID = '0'; LARGEST_STREAM_ID = '+'; STREAMS_SET_KEY = 'streams'; + STREAMER_MESSAGE_HASH_KEY_BASE = 'streamer:message:'; constructor ( private readonly client: RedisClientType = createClient({ url: process.env.REDIS_CONNECTION_STRING }) @@ -83,4 +84,8 @@ export default class RedisClient { async getStreams (): Promise { return await this.client.sMembers(this.STREAMS_SET_KEY); } + + async getStreamerMessage (blockHeight: number): Promise { + return await this.client.get(`${this.STREAMER_MESSAGE_HASH_KEY_BASE}${blockHeight}`); + } } diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 7e1fe2237..e4cfe63ae 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -3,12 +3,13 @@ import { Worker, isMainThread } from 'worker_threads'; import { type Message } from './types'; import { METRICS } from '../metrics'; +import { Gauge } from 'prom-client'; export default class StreamHandler { private readonly worker?: Worker; constructor ( - streamKey: string + public readonly streamKey: string ) { if (isMainThread) { this.worker = new Worker(path.join(__dirname, 'worker.js'), { @@ -18,12 +19,22 @@ export default class StreamHandler { }); this.worker.on('message', this.handleMessage); + this.worker.on('error', this.handleError); } else { throw new Error('StreamHandler should not be instantiated in a worker thread'); } } + private handleError (error: Error): void { + console.log(`Encountered error processing stream: ${this.streamKey}, terminating thread`, error); + this.worker?.terminate().catch(() => { + console.log(`Failed to terminate thread for stream: ${this.streamKey}`); + }); + } + private handleMessage (message: Message): void { - METRICS[message.type].labels(message.labels).set(message.value); + if (METRICS[message.type] instanceof Gauge) { + (METRICS[message.type] as Gauge).labels(message.labels).set(message.value); + } } }