diff --git a/.github/workflows/deploy-lambdas.yml b/.github/workflows/deploy-lambdas.yml index 5a8879a12..29c53bc29 100644 --- a/.github/workflows/deploy-lambdas.yml +++ b/.github/workflows/deploy-lambdas.yml @@ -36,6 +36,7 @@ jobs: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} HASURA_ENDPOINT: ${{ vars.HASURA_ENDPOINT }} + HASURA_ENDPOINT_V2: ${{ vars.HASURA_ENDPOINT_V2 }} HASURA_ADMIN_SECRET: ${{ secrets.HASURA_ADMIN_SECRET }} PG_ADMIN_USER: ${{ secrets.PG_ADMIN_USER }} PG_ADMIN_PASSWORD: ${{ secrets.PG_ADMIN_PASSWORD }} diff --git a/frontend/replacement.dev.json b/frontend/replacement.dev.json index 2ce60ce37..1a233cb9d 100644 --- a/frontend/replacement.dev.json +++ b/frontend/replacement.dev.json @@ -1,7 +1,6 @@ { "REPL_ACCOUNT_ID": "dev-queryapi.dataplatform.near", "REPL_GRAPHQL_ENDPOINT": "https://near-queryapi.dev.api.pagoda.co", - "REPL_GRAPHQL_ENDPOINT_V2": "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app", "REPL_EXTERNAL_APP_URL": "https://queryapi-frontend-vcqilefdcq-ew.a.run.app", "REPL_REGISTRY_CONTRACT_ID": "dev-queryapi.dataplatform.near" } diff --git a/frontend/replacement.local.json b/frontend/replacement.local.json index 5e113b5a6..9b8fa584e 100644 --- a/frontend/replacement.local.json +++ b/frontend/replacement.local.json @@ -1,7 +1,6 @@ { "REPL_ACCOUNT_ID": "dataplatform.near", "REPL_GRAPHQL_ENDPOINT": "https://near-queryapi.api.pagoda.co", - "REPL_GRAPHQL_ENDPOINT_V2": "https://queryapi-hasura-graphql-mainnet-vcqilefdcq-ew.a.run.app", "REPL_EXTERNAL_APP_URL": "http://localhost:3000", "REPL_REGISTRY_CONTRACT_ID": "queryapi.dataplatform.near" } diff --git a/frontend/replacement.mainnet.json b/frontend/replacement.mainnet.json index fc1bd5ae2..7a98e12d4 100644 --- a/frontend/replacement.mainnet.json +++ b/frontend/replacement.mainnet.json @@ -1,7 +1,6 @@ { "REPL_ACCOUNT_ID": "dataplatform.near", "REPL_GRAPHQL_ENDPOINT": "https://near-queryapi.api.pagoda.co", - "REPL_GRAPHQL_ENDPOINT_V2": "https://queryapi-hasura-graphql-mainnet-24ktefolwq-ew.a.run.app", "REPL_EXTERNAL_APP_URL": "https://queryapi-frontend-24ktefolwq-ew.a.run.app", "REPL_REGISTRY_CONTRACT_ID": "queryapi.dataplatform.near" } diff --git a/frontend/widgets/src/QueryApi.IndexerStatus.jsx b/frontend/widgets/src/QueryApi.IndexerStatus.jsx index ef6989c43..d222c19de 100644 --- a/frontend/widgets/src/QueryApi.IndexerStatus.jsx +++ b/frontend/widgets/src/QueryApi.IndexerStatus.jsx @@ -95,9 +95,6 @@ const TextLink = styled.a` if (!indexer_name) return "missing indexer_name"; -let v1_endpoint = `${REPL_GRAPHQL_ENDPOINT}`; -let v2_endpoint = `${REPL_GRAPHQL_ENDPOINT_V2}`; - State.init({ logs: [], state: [], @@ -108,10 +105,9 @@ State.init({ indexer_resPage: 0, logsPage: 0, statePage: 0, - v2Toggle: Storage.privateGet("QueryApiV2Toggle") || false, }); -let graphQLEndpoint = state.v2Toggle ? v2_endpoint : v1_endpoint; +let graphQLEndpoint = `${REPL_GRAPHQL_ENDPOINT}`; function fetchGraphQL(operationsDoc, operationName, variables) { return asyncFetch(`${graphQLEndpoint}/v1/graphql`, { @@ -164,12 +160,7 @@ const indexerStateDoc = ` } `; -const prevV2ToggleSelected = Storage.privateGet("QueryApiV2Toggle"); -if ( - !state.initialFetch || - (prevV2ToggleSelected !== state.v2Toggle) -) { - Storage.privateSet("QueryApiV2Toggle", state.v2Toggle); +if (!state.initialFetch) { State.update({ logs: [], state: [], @@ -256,44 +247,6 @@ const onIndexerResPageChange = (page) => { State.update({ indexer_resPage: page, currentPage: page }); }; -const DisclaimerContainer = styled.div` - padding: 10px; - margin: 0.5px; - text-color: black; - display: flex; - width: 50; - border: 2px solid rgb(240, 240, 240); - border-radius: 8px; - align-items: "center"; - margin-bottom: 5px; -`; - -const Notice = styled.div` - font-weight: 900; - font-size: 22px; - align-self: flex-start; - margin: 10px 0px 30px; - text-align: center; - padding-bottom: 5px; - border-bottom: 1px solid rgb(240, 240, 241); - color: rgb(36, 39, 42); -`; - -const DisclaimerText = styled.p` - font-size: 14px; - line-height: 20px; - font-weight: 400; - color: rgb(17, 24, 28); - word-break: break-word; - width: 700px; - text-align: start; - padding-left: 10px; - - @media (max-width: 1024px) { - width: 80%; - } -`; - return ( <> @@ -303,39 +256,6 @@ return ( GraphQL Playground -
- -
- V2 Testing Notice -
- - QueryAPI is still in beta. We are working on a OueryAPI V2 - with faster historical processing, easier access to DB and - more control over your indexer. V2 is running in parallel and - you can see the logs from this new version by toggling this - button. - - { - State.update({ v2Toggle: !state.v2Toggle }); - }, - }} - /> -
-
-
-
diff --git a/indexer-js-queue-handler/serverless.yml b/indexer-js-queue-handler/serverless.yml index e24a522c2..ac4710bb6 100644 --- a/indexer-js-queue-handler/serverless.yml +++ b/indexer-js-queue-handler/serverless.yml @@ -12,6 +12,7 @@ provider: REGION: ${self:provider.region} STAGE: ${opt:stage, 'dev'} HASURA_ENDPOINT: ${env:HASURA_ENDPOINT} + HASURA_ENDPOINT_V2: ${env:HASURA_ENDPOINT_V2} HASURA_ADMIN_SECRET: ${env:HASURA_ADMIN_SECRET} PG_ADMIN_USER: ${env:PG_ADMIN_USER} PG_ADMIN_PASSWORD: ${env:PG_ADMIN_PASSWORD} diff --git a/indexer-js-queue-handler/social-lag-metrics-writer.js b/indexer-js-queue-handler/social-lag-metrics-writer.js index ec9d28f92..b3ebe220d 100644 --- a/indexer-js-queue-handler/social-lag-metrics-writer.js +++ b/indexer-js-queue-handler/social-lag-metrics-writer.js @@ -35,7 +35,7 @@ export const handler = async () => { }, }), fetchJson( - `${process.env.HASURA_ENDPOINT}/v1/graphql`, + `${process.env.HASURA_ENDPOINT_V2}/v1/graphql`, { query: `{ dataplatform_near_social_feed_posts( @@ -54,8 +54,7 @@ export const handler = async () => { const nearSocialBlockHeight = nearSocialResponse[0].blockHeight; const feedIndexerBlockHeight = - feedIndexerResponse.data.dataplatform_near_social_feed_posts[0] - .block_height; + feedIndexerResponse.data.dataplatform_near_social_feed_posts[0].block_height; const lag = nearSocialBlockHeight - feedIndexerBlockHeight; diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index 20708445b..c3ab74b81 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -1,25 +1,25 @@ import express from 'express'; -import promClient from 'prom-client'; +import { Gauge, Histogram, Counter, AggregatorRegistry } from 'prom-client'; -const UNPROCESSED_STREAM_MESSAGES = new promClient.Gauge({ +const UNPROCESSED_STREAM_MESSAGES = new Gauge({ name: 'queryapi_runner_unprocessed_stream_messages', help: 'Number of Redis Stream messages not yet processed', labelNames: ['indexer', 'type'], }); -const EXECUTION_DURATION = new promClient.Histogram({ +const EXECUTION_DURATION = new Histogram({ name: 'queryapi_runner_execution_duration_milliseconds', help: 'Time taken to execute an indexer function', labelNames: ['indexer', 'type'], }); -const CACHE_HIT = new promClient.Counter({ +const CACHE_HIT = new Counter({ name: 'queryapi_runner_cache_hit', help: 'The number of times cache was hit successfully', labelNames: ['type', 'key'] }); -const CACHE_MISS = new promClient.Counter({ +const CACHE_MISS = new Counter({ name: 'queryapi_runner_cache_miss', help: 'The number of times cache was missed', labelNames: ['type', 'key'] @@ -32,15 +32,22 @@ export const METRICS = { CACHE_MISS }; +const aggregatorRegistry = new AggregatorRegistry(); +const workerMetrics: Record = {}; + +export const registerWorkerMetrics = (workerId: number, metrics: string): void => { + workerMetrics[workerId] = metrics; +}; + export const startServer = async (): Promise => { const app = express(); // https://github.com/DefinitelyTyped/DefinitelyTyped/issues/50871 // eslint-disable-next-line @typescript-eslint/no-misused-promises app.get('/metrics', async (_req, res) => { - res.set('Content-Type', promClient.register.contentType); + res.set('Content-Type', aggregatorRegistry.contentType); - const metrics = await promClient.register.metrics(); + const metrics = await AggregatorRegistry.aggregate(Object.values(workerMetrics)).metrics(); res.send(metrics); }); diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index d5401b5e2..13ffd600e 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -1,12 +1,10 @@ import path from 'path'; import { Worker, isMainThread } from 'worker_threads'; -import { type Message } from './types'; -import { METRICS } from '../metrics'; -import { Gauge, Histogram } from 'prom-client'; +import { registerWorkerMetrics } from '../metrics'; export default class StreamHandler { - private readonly worker?: Worker; + private readonly worker: Worker; constructor ( public readonly streamKey: string @@ -18,8 +16,8 @@ export default class StreamHandler { }, }); - this.worker.on('message', this.handleMessage); - this.worker.on('error', this.handleError); + this.worker.on('message', this.handleMessage.bind(this)); + this.worker.on('error', this.handleError.bind(this)); } else { throw new Error('StreamHandler should not be instantiated in a worker thread'); } @@ -27,18 +25,12 @@ export default class StreamHandler { private handleError (error: Error): void { console.log(`Encountered error processing stream: ${this.streamKey}, terminating thread`, error); - this.worker?.terminate().catch(() => { + this.worker.terminate().catch(() => { console.log(`Failed to terminate thread for stream: ${this.streamKey}`); }); } - private handleMessage (message: Message): void { - if (METRICS[message.type] instanceof Gauge) { - (METRICS[message.type] as Gauge).labels(message.labels).set(message.value); - } - - if (METRICS[message.type] instanceof Histogram) { - (METRICS[message.type] as Histogram).labels(message.labels).observe(message.value); - } + private handleMessage (message: string): void { + registerWorkerMetrics(this.worker.threadId, message); } } diff --git a/runner/src/stream-handler/types.ts b/runner/src/stream-handler/types.ts deleted file mode 100644 index 945248e1b..000000000 --- a/runner/src/stream-handler/types.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { type METRICS } from '../metrics'; - -interface Metric { - type: keyof typeof METRICS - labels: Record - value: number -}; - -export type Message = Metric; diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index a80e854ee..1cc1c531d 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -1,8 +1,9 @@ import { isMainThread, parentPort, workerData } from 'worker_threads'; +import promClient from 'prom-client'; import Indexer from '../indexer'; import RedisClient from '../redis-client'; -import { type Message } from './types'; +import { METRICS } from '../metrics'; if (isMainThread) { throw new Error('Worker should not be run on main thread'); @@ -19,11 +20,12 @@ void (async function main () { console.log('Started processing stream: ', streamKey); let indexerName = ''; + const streamType = redisClient.getStreamType(streamKey); + const isHistorical = streamType === 'historical'; while (true) { try { const startTime = performance.now(); - const streamType = redisClient.getStreamType(streamKey); const messages = await redisClient.getNextStreamMessage(streamKey); const indexerConfig = await redisClient.getStreamStorage(streamKey); @@ -46,30 +48,23 @@ void (async function main () { provisioned: false, }, }; - await indexer.runFunctions(Number(message.block_height), functions, false, { + await indexer.runFunctions(Number(message.block_height), functions, isHistorical, { provision: true, }); await redisClient.deleteStreamMessage(streamKey, id); - const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey); - - parentPort?.postMessage({ - type: 'UNPROCESSED_STREAM_MESSAGES', - labels: { indexer: indexerName, type: streamType }, - value: unprocessedMessages?.length ?? 0, - } satisfies Message); - - parentPort?.postMessage({ - type: 'EXECUTION_DURATION', - labels: { indexer: indexerName, type: streamType }, - value: performance.now() - startTime, - } satisfies Message); + METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).observe(performance.now() - startTime); console.log(`Success: ${indexerName}`); } catch (err) { await sleep(10000); console.log(`Failed: ${indexerName}`, err); + } finally { + const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey); + METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0); + + parentPort?.postMessage(await promClient.register.getMetricsAsJSON()); } } })();