Skip to content

Commit

Permalink
Trying to improve pending txs registering
Browse files Browse the repository at this point in the history
  • Loading branch information
EvgenKor committed Aug 27, 2024
1 parent 2c8842c commit 49b64ab
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 5 deletions.
85 changes: 83 additions & 2 deletions zp-relayer/pool/RelayPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import { Network } from '@/lib/network'
import { redis } from '@/lib/redisClient'
import { JobState, PoolTx, poolTxQueue, WorkerTxType } from '@/queue/poolTxQueue'
import { TxStore } from '@/state/TxStore'
import { ENERGY_SIZE, MOCK_CALLDATA, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants'
import { ENERGY_SIZE, MOCK_CALLDATA, OUTPLUSONE, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants'
import {
applyDenominator,
buildPrefixedMemo,
encodeProof,
fetchJson,
numToHex,
sleep,
truncateHexPrefix,
truncateMemoTxPrefixProverV2,
} from '@/utils/helpers'
Expand Down Expand Up @@ -50,6 +51,7 @@ export class RelayPool extends BasePool<Network> {
public permitRecover: PermitRecover | null = null
private proxyAddress!: string
private indexerUrl!: string
private observePromise: Promise<void> | undefined;
txStore!: TxStore

async init(permitConfig: PermitConfig, proxyAddress: string, indexerUrl: string) {
Expand Down Expand Up @@ -77,6 +79,8 @@ export class RelayPool extends BasePool<Network> {
await this.permitRecover?.initializeDomain()

this.isInitialized = true

this.observePromise = undefined;
}

async validateTx(
Expand Down Expand Up @@ -255,7 +259,10 @@ export class RelayPool extends BasePool<Network> {
await this.optimisticState.nullifiers.add([nullifier])
}

// cache transaction locally
await this.cacheTxLocally(commitIndex, outCommit, txHash, memo);
// start monitoring local cache against the indexer to cleanup already indexed txs
this.startLocalCacheObserver(commitIndex);
}

async onConfirmed(res: ProcessResult<RelayPool>, txHash: string, callback?: () => Promise<void>, jobId?: string): Promise<void> {
Expand Down Expand Up @@ -296,10 +303,84 @@ export class RelayPool extends BasePool<Network> {
memo
);
await this.txStore.add(index, prefixedMemo);
logger.info(`Tx @${index} with commit ${commit} has been CACHED locally`);
}

async getIndexerInfo() {
private async getIndexerInfo() {
const info = await fetchJson(this.indexerUrl, '/info', [])
return info
}

private async assumeNextPendingTxIndex() {
const [indexerInfo, localCache] = await Promise.all([this.getIndexerInfo(), this.txStore.getAll()]);

return Number(indexerInfo.optimisticDeltaIndex + Object.keys(localCache).length);
}

private async getIndexerTxs(offset: number, limit: number): Promise<string[]> {
const url = new URL('/transactions/v2', config.base.COMMON_INDEXER_URL)
url.searchParams.set('limit', limit.toString())
url.searchParams.set('offset', offset.toString())

const response = await fetch(url)
if (!response.ok) {
throw new Error(`Failed to fetch transactions from indexer. Status: ${response.status}`)
}
return response.json() as Promise<string[]>;
}

// observe the current local cache and indexer to remove local record
// after adding it to the indexer's optimistic/persistent state
// return when local cache is empty
protected async startLocalCacheObserver(fromIndex: number): Promise<void> {
if (this.observePromise == undefined) {
this.observePromise = this.localCacheObserverWorker(fromIndex).finally(() => {
this.observePromise = undefined;
});
}

return this.observePromise;
}

protected async localCacheObserverWorker(fromIndex: number): Promise<void> {
logger.debug('Local cache observer worker was started', { fromIndex })
const CACHE_OBSERVE_INTERVAL_MS = 1000; // waiting time between checks
const EXTEND_LIMIT_TO_FETCH = 10; // taking into account non-atomic nature of /info and /transactions/v2 requests
while (true) {
const localEntries = Object.entries(await this.txStore.getAll())
.map(([i, v]) => [parseInt(i), v] as [number, string])
.sort(([i1], [i2]) => i1 - i2)
let localEntriesCnt = localEntries.length;

if (localEntries.length == 0) {
// stop observer when localCache is empty
break;
}

// there are entries in the local cache
try {
const indexerOptimisticIndex = Number((await this.getIndexerInfo()).optimisticDeltaIndex);
const limit = (indexerOptimisticIndex - fromIndex) / OUTPLUSONE + localEntries.length + EXTEND_LIMIT_TO_FETCH;
const indexerCommitments = (await this.getIndexerTxs(fromIndex, limit)).map(tx => tx.slice(65, 129));

// find cached commitments in the indexer's response
for (const [index, memo] of localEntries) {
const commitLocal = memo.slice(0, 64)
if (indexerCommitments.includes(commitLocal)) {
logger.info('Deleting index from optimistic state', { index, commitLocal })
await this.txStore.remove(index.toString())
localEntriesCnt--;
}
}
} catch(e) {
logger.error(`Cannot check local cache against indexer : ${(e as Error).message}`);
}

if (localEntriesCnt > 0) {
sleep(CACHE_OBSERVE_INTERVAL_MS);
}
}

logger.debug('Local cache observer worker has finished', { fromIndex })
}
}
7 changes: 4 additions & 3 deletions zp-relayer/services/relayer/endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje

const response = await fetch(url)
if (!response.ok) {
throw new Error(`Failed to fetch transactions from indexer. Status: ${res.status}`)
throw new Error(`Failed to fetch transactions from indexer. Status: ${response.status}`)
}
const indexerTxs: string[] = await response.json()

Expand All @@ -111,8 +111,9 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje
for (const [index, memo] of indices) {
const commitLocal = memo.slice(0, 64)
if (indexerCommitments.includes(commitLocal)) {
logger.info('Deleting index from optimistic state', { index })
await txStore.remove(index.toString())
// !!! we shouldn't modify local cache from here. Just filter entries to return correct response
//logger.info('Deleting index from optimistic state', { index })
//await txStore.remove(index.toString())
} else {
optimisticTxs.push(txToV2Format('0', memo))
}
Expand Down

0 comments on commit 49b64ab

Please sign in to comment.