From 49b64ab8dbc884b403727ee7acf73ae242b81368 Mon Sep 17 00:00:00 2001 From: Evgen Date: Tue, 27 Aug 2024 20:24:27 +0300 Subject: [PATCH] Trying to improve pending txs registering --- zp-relayer/pool/RelayPool.ts | 85 +++++++++++++++++++++++- zp-relayer/services/relayer/endpoints.ts | 7 +- 2 files changed, 87 insertions(+), 5 deletions(-) diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index 4ddbc01..2609214 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -4,13 +4,14 @@ import { Network } from '@/lib/network' import { redis } from '@/lib/redisClient' import { JobState, PoolTx, poolTxQueue, WorkerTxType } from '@/queue/poolTxQueue' import { TxStore } from '@/state/TxStore' -import { ENERGY_SIZE, MOCK_CALLDATA, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants' +import { ENERGY_SIZE, MOCK_CALLDATA, OUTPLUSONE, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants' import { applyDenominator, buildPrefixedMemo, encodeProof, fetchJson, numToHex, + sleep, truncateHexPrefix, truncateMemoTxPrefixProverV2, } from '@/utils/helpers' @@ -50,6 +51,7 @@ export class RelayPool extends BasePool { public permitRecover: PermitRecover | null = null private proxyAddress!: string private indexerUrl!: string + private observePromise: Promise | undefined; txStore!: TxStore async init(permitConfig: PermitConfig, proxyAddress: string, indexerUrl: string) { @@ -77,6 +79,8 @@ export class RelayPool extends BasePool { await this.permitRecover?.initializeDomain() this.isInitialized = true + + this.observePromise = undefined; } async validateTx( @@ -255,7 +259,10 @@ export class RelayPool extends BasePool { await this.optimisticState.nullifiers.add([nullifier]) } + // cache transaction locally await this.cacheTxLocally(commitIndex, outCommit, txHash, memo); + // start monitoring local cache against the indexer to cleanup already indexed txs + this.startLocalCacheObserver(commitIndex); } async onConfirmed(res: ProcessResult, txHash: string, callback?: () => Promise, jobId?: string): Promise { @@ -296,10 +303,84 @@ export class RelayPool extends BasePool { memo ); await this.txStore.add(index, prefixedMemo); + logger.info(`Tx @${index} with commit ${commit} has been CACHED locally`); } - async getIndexerInfo() { + private async getIndexerInfo() { const info = await fetchJson(this.indexerUrl, '/info', []) return info } + + private async assumeNextPendingTxIndex() { + const [indexerInfo, localCache] = await Promise.all([this.getIndexerInfo(), this.txStore.getAll()]); + + return Number(indexerInfo.optimisticDeltaIndex + Object.keys(localCache).length); + } + + private async getIndexerTxs(offset: number, limit: number): Promise { + const url = new URL('/transactions/v2', config.base.COMMON_INDEXER_URL) + url.searchParams.set('limit', limit.toString()) + url.searchParams.set('offset', offset.toString()) + + const response = await fetch(url) + if (!response.ok) { + throw new Error(`Failed to fetch transactions from indexer. Status: ${response.status}`) + } + return response.json() as Promise; + } + + // observe the current local cache and indexer to remove local record + // after adding it to the indexer's optimistic/persistent state + // return when local cache is empty + protected async startLocalCacheObserver(fromIndex: number): Promise { + if (this.observePromise == undefined) { + this.observePromise = this.localCacheObserverWorker(fromIndex).finally(() => { + this.observePromise = undefined; + }); + } + + return this.observePromise; + } + + protected async localCacheObserverWorker(fromIndex: number): Promise { + logger.debug('Local cache observer worker was started', { fromIndex }) + const CACHE_OBSERVE_INTERVAL_MS = 1000; // waiting time between checks + const EXTEND_LIMIT_TO_FETCH = 10; // taking into account non-atomic nature of /info and /transactions/v2 requests + while (true) { + const localEntries = Object.entries(await this.txStore.getAll()) + .map(([i, v]) => [parseInt(i), v] as [number, string]) + .sort(([i1], [i2]) => i1 - i2) + let localEntriesCnt = localEntries.length; + + if (localEntries.length == 0) { + // stop observer when localCache is empty + break; + } + + // there are entries in the local cache + try { + const indexerOptimisticIndex = Number((await this.getIndexerInfo()).optimisticDeltaIndex); + const limit = (indexerOptimisticIndex - fromIndex) / OUTPLUSONE + localEntries.length + EXTEND_LIMIT_TO_FETCH; + const indexerCommitments = (await this.getIndexerTxs(fromIndex, limit)).map(tx => tx.slice(65, 129)); + + // find cached commitments in the indexer's response + for (const [index, memo] of localEntries) { + const commitLocal = memo.slice(0, 64) + if (indexerCommitments.includes(commitLocal)) { + logger.info('Deleting index from optimistic state', { index, commitLocal }) + await this.txStore.remove(index.toString()) + localEntriesCnt--; + } + } + } catch(e) { + logger.error(`Cannot check local cache against indexer : ${(e as Error).message}`); + } + + if (localEntriesCnt > 0) { + sleep(CACHE_OBSERVE_INTERVAL_MS); + } + } + + logger.debug('Local cache observer worker has finished', { fromIndex }) + } } diff --git a/zp-relayer/services/relayer/endpoints.ts b/zp-relayer/services/relayer/endpoints.ts index 6f6a650..9463791 100644 --- a/zp-relayer/services/relayer/endpoints.ts +++ b/zp-relayer/services/relayer/endpoints.ts @@ -93,7 +93,7 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje const response = await fetch(url) if (!response.ok) { - throw new Error(`Failed to fetch transactions from indexer. Status: ${res.status}`) + throw new Error(`Failed to fetch transactions from indexer. Status: ${response.status}`) } const indexerTxs: string[] = await response.json() @@ -111,8 +111,9 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje for (const [index, memo] of indices) { const commitLocal = memo.slice(0, 64) if (indexerCommitments.includes(commitLocal)) { - logger.info('Deleting index from optimistic state', { index }) - await txStore.remove(index.toString()) + // !!! we shouldn't modify local cache from here. Just filter entries to return correct response + //logger.info('Deleting index from optimistic state', { index }) + //await txStore.remove(index.toString()) } else { optimisticTxs.push(txToV2Format('0', memo)) }