From 8cde68dd4985e2b11b174cd3dc78f48537114fb2 Mon Sep 17 00:00:00 2001 From: EvgenKor Date: Mon, 9 Sep 2024 12:39:46 +0300 Subject: [PATCH] Fix/pending txs (#226) * Trying to improve pending txs registering * Set proper index for local transactions * Fix indexer sync flow * Remove unneeded log line * Storing local cache by commit (instead of indexes) * Adding index to local cache * Bugfixing * Filtering pending txs * Removing tx from the cache on fail * Fix issues --- zp-relayer/lib/network/NetworkBackend.ts | 1 + zp-relayer/lib/network/evm/EvmBackend.ts | 1 + zp-relayer/lib/network/tron/TronBackend.ts | 1 + zp-relayer/pool/BasePool.ts | 93 +++++++++++----- zp-relayer/pool/DefaultPool.ts | 2 + zp-relayer/pool/FinalizerPool.ts | 2 + zp-relayer/pool/IndexerPool.ts | 2 + zp-relayer/pool/RelayPool.ts | 117 ++++++++++++++++++--- zp-relayer/services/indexer/init.ts | 4 +- zp-relayer/services/indexer/router.ts | 2 +- zp-relayer/services/relayer/endpoints.ts | 36 +++---- zp-relayer/state/PoolState.ts | 10 ++ zp-relayer/state/TxStore.ts | 34 ++++-- zp-relayer/utils/helpers.ts | 10 +- zp-relayer/utils/web3.ts | 2 +- 15 files changed, 245 insertions(+), 72 deletions(-) diff --git a/zp-relayer/lib/network/NetworkBackend.ts b/zp-relayer/lib/network/NetworkBackend.ts index 3656e5ca..873f6da5 100644 --- a/zp-relayer/lib/network/NetworkBackend.ts +++ b/zp-relayer/lib/network/NetworkBackend.ts @@ -11,6 +11,7 @@ export function isEthereum(n: NetworkBackend): n is NetworkBackend + blockNumber: number } export interface GetEventsConfig { diff --git a/zp-relayer/lib/network/evm/EvmBackend.ts b/zp-relayer/lib/network/evm/EvmBackend.ts index 8556b15f..49079918 100644 --- a/zp-relayer/lib/network/evm/EvmBackend.ts +++ b/zp-relayer/lib/network/evm/EvmBackend.ts @@ -62,6 +62,7 @@ export class EvmBackend implements INetworkBackend { events: events.map(e => ({ txHash: e.transactionHash, values: e.returnValues, + blockNumber: e.blockNumber, })), fromBlock, toBlock, diff --git a/zp-relayer/lib/network/tron/TronBackend.ts b/zp-relayer/lib/network/tron/TronBackend.ts index 75dd8437..a20de4ab 100644 --- a/zp-relayer/lib/network/tron/TronBackend.ts +++ b/zp-relayer/lib/network/tron/TronBackend.ts @@ -49,6 +49,7 @@ export class TronBackend implements INetworkBackend { yield events.map((e: any) => ({ txHash: e.transaction, values: e.result, + blockNumber: e.block, })) fingerprint = events[events.length - 1].fingerprint || null diff --git a/zp-relayer/pool/BasePool.ts b/zp-relayer/pool/BasePool.ts index fe7eb79b..ece62d18 100644 --- a/zp-relayer/pool/BasePool.ts +++ b/zp-relayer/pool/BasePool.ts @@ -29,6 +29,8 @@ export abstract class BasePool { public poolId: BN = toBN(0) public isInitialized = false + protected poolName(): string { return 'base-pool'; } + constructor(public network: NetworkBackend, private config: BasePoolConfig) { this.txVK = require(config.txVkPath) @@ -146,11 +148,11 @@ export abstract class BasePool { async syncState(startBlock?: number, lastBlock?: number, indexerUrl?: string) { logger.debug('Syncing state; starting from block %d', startBlock) - const localIndex = this.state.getNextIndex() - const localRoot = this.state.getMerkleRoot() + let localIndex = this.state.getNextIndex() + let localRoot = this.state.getMerkleRoot() - const contractIndex = await this.getContractIndex() - const contractRoot = await this.getContractMerkleRoot(contractIndex) + let contractIndex = await this.getContractIndex() + let contractRoot = await this.getContractMerkleRoot(contractIndex) logger.debug('State info', { localRoot, @@ -164,25 +166,34 @@ export abstract class BasePool { return } - if (indexerUrl) { - await this.syncStateFromIndexer(indexerUrl) - } else if (startBlock && lastBlock) { - await this.syncStateFromContract(startBlock, lastBlock, contractIndex, localIndex) - } else { - throw new Error('Either (startBlock, lastBlock) or indexerUrl should be provided for sync') - } + while (localIndex < contractIndex) { + if (indexerUrl) { + await this.syncStateFromIndexer(indexerUrl) + } else if (startBlock && lastBlock) { + const savedBlockNumberOfLastConfirmedTx = await this.getLastConfirmedTxBlock(); + const actualStartBlock = Math.max(startBlock, savedBlockNumberOfLastConfirmedTx); - const newLocalIndex = this.state.getNextIndex() - const newLocalRoot = this.state.getMerkleRoot() - logger.debug('Local state after update', { - newLocalRoot, - newLocalIndex, - }) - if (newLocalIndex < contractIndex) { - throw new Error('Indexer is not synchronized with the contract yet') + logger.debug('Syncing from contract; starting from block %d', actualStartBlock) + + await this.syncStateFromContract(actualStartBlock, lastBlock, contractIndex, localIndex); + } else { + throw new Error('Either (startBlock, lastBlock) or indexerUrl should be provided for sync') + } + + localIndex = this.state.getNextIndex() + localRoot = this.state.getMerkleRoot() + logger.debug('Local state after update', { + localRoot, + localIndex, + }) } - if (newLocalRoot !== contractRoot) { - throw new Error('State is corrupted, roots mismatch') + + if (localRoot !== contractRoot) { + await this.state.wipe(); + await this.optimisticState.wipe(); + await this.setLastConfirmedTxBlockForced(0); + + throw new Error('State is corrupted, roots mismatch. State was wiped') } } @@ -239,12 +250,12 @@ export abstract class BasePool { for (const e of batch.events) { // Filter pending txs in case of decentralized relay pool const state = toBN(e.values.index).lte(toBN(contractIndex)) ? 'all' : 'optimistic' - await this.addTxToState(e.txHash, e.values.index, e.values.message, state) + await this.addTxToState(e.txHash, e.values.index, e.values.message, state, e.blockNumber) } } } - async addTxToState(txHash: string, newPoolIndex: number, message: string, state: 'optimistic' | 'confirmed' | 'all') { + async addTxToState(txHash: string, newPoolIndex: number, message: string, state: 'optimistic' | 'confirmed' | 'all', blockNumber: number) { const transactSelector = '0xaf989083' const transactV2Selector = '0x5fd28f8c' @@ -301,10 +312,12 @@ export abstract class BasePool { memo = truncateMemoTxPrefix(memoRaw, txType) - // Save nullifier in confirmed state + // Save nullifier and tx's block number in confirmed state if (state !== 'optimistic') { const nullifier = parser.getField('nullifier') await this.state.nullifiers.add([hexToNumberString(nullifier)]) + + this.setLastConfirmedTxBlock(blockNumber); } } else if (input.startsWith(transactV2Selector)) { const calldata = Buffer.from(truncateHexPrefix(input), 'hex') @@ -320,10 +333,12 @@ export abstract class BasePool { memo = truncateMemoTxPrefixProverV2(memoRaw, txType) - // Save nullifier in confirmed state + // Save nullifier and tx's block number in confirmed state if (state !== 'optimistic') { const nullifier = parser.getField('nullifier') await this.state.nullifiers.add([hexToNumberString(nullifier)]) + + this.setLastConfirmedTxBlock(blockNumber); } } else { throw new Error(`Unknown transaction type: ${input}`) @@ -336,7 +351,7 @@ export abstract class BasePool { } } - propagateOptimisticState(index: number) { + propagateOptimisticState(index: number, blockNumber: number) { index = Math.floor(index / OUTPLUSONE) const opIndex = Math.floor(this.optimisticState.getNextIndex() / OUTPLUSONE) const stateIndex = Math.floor(this.state.getNextIndex() / OUTPLUSONE) @@ -352,6 +367,8 @@ export abstract class BasePool { const outCommit = hexToNumberString('0x' + tx.slice(0, 64)) this.state.updateState(i, outCommit, tx) } + + this.setLastConfirmedTxBlock(blockNumber); } verifyProof(proof: SnarkProof, inputs: Array) { @@ -425,4 +442,28 @@ export abstract class BasePool { } return limitsFetch } + + + // The following key in Redis DB will use to restore sync from the last confirmed tx + private lastConfirmedTxBlockRedisKey = `${this.poolName}:LastConfirmedTxBlock`; + + protected async setLastConfirmedTxBlock(blockNumber: number) { + const curValue = await this.getLastConfirmedTxBlock(); + if (blockNumber > curValue) { + this.setLastConfirmedTxBlockForced(blockNumber); + } + } + + private async setLastConfirmedTxBlockForced(blockNumber: number) { + redis.set(this.lastConfirmedTxBlockRedisKey, blockNumber); + } + + protected async getLastConfirmedTxBlock(): Promise { + const result = await redis.get(this.lastConfirmedTxBlockRedisKey); + try{ + return Number(result); + } catch(_) {}; + + return 0; + } } diff --git a/zp-relayer/pool/DefaultPool.ts b/zp-relayer/pool/DefaultPool.ts index 96d4c411..43b3c059 100644 --- a/zp-relayer/pool/DefaultPool.ts +++ b/zp-relayer/pool/DefaultPool.ts @@ -54,6 +54,8 @@ export class DefaultPool extends BasePool { treeProver!: IProver public permitRecover: PermitRecover | null = null + protected poolName(): string { return 'default-pool'; } + async init(startBlock: number | null = null, treeProver: IProver) { if (this.isInitialized) return diff --git a/zp-relayer/pool/FinalizerPool.ts b/zp-relayer/pool/FinalizerPool.ts index 3b392008..cbb93a9d 100644 --- a/zp-relayer/pool/FinalizerPool.ts +++ b/zp-relayer/pool/FinalizerPool.ts @@ -21,6 +21,8 @@ export class FinalizerPool extends BasePool { directDepositProver!: IProver indexerUrl!: string + protected poolName(): string { return 'finalizer-pool'; } + async init( treeProver: IProver, directDepositProver: IProver, diff --git a/zp-relayer/pool/IndexerPool.ts b/zp-relayer/pool/IndexerPool.ts index a673331e..074ce64c 100644 --- a/zp-relayer/pool/IndexerPool.ts +++ b/zp-relayer/pool/IndexerPool.ts @@ -6,6 +6,8 @@ import { type PermitRecover } from '@/utils/permit/types' export class IndexerPool extends BasePool { public permitRecover: PermitRecover | null = null + protected poolName(): string { return 'indexer-pool'; } + async init(startBlock: number | null = null, lastBlock: number | null = null) { if (this.isInitialized) return diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index 4ddbc013..f956825a 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -2,15 +2,16 @@ import config from '@/configs/relayerConfig' import { logger } from '@/lib/appLogger' import { Network } from '@/lib/network' import { redis } from '@/lib/redisClient' -import { JobState, PoolTx, poolTxQueue, WorkerTxType } from '@/queue/poolTxQueue' +import { JobState, PoolTx, poolTxQueue, TxPayload, WorkerTxType } from '@/queue/poolTxQueue' import { TxStore } from '@/state/TxStore' -import { ENERGY_SIZE, MOCK_CALLDATA, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants' +import { ENERGY_SIZE, MOCK_CALLDATA, OUTPLUSONE, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants' import { applyDenominator, buildPrefixedMemo, encodeProof, fetchJson, numToHex, + sleep, truncateHexPrefix, truncateMemoTxPrefixProverV2, } from '@/utils/helpers' @@ -43,6 +44,7 @@ import { bytesToHex, toBN } from 'web3-utils' import { getTxDataProverV2, TxDataProverV2, TxType } from 'zp-memo-parser' import { BasePool } from './BasePool' import { OptionalChecks, PermitConfig, ProcessResult } from './types' +import BigNumber from 'bignumber.js' const ZERO = toBN(0) @@ -50,8 +52,11 @@ export class RelayPool extends BasePool { public permitRecover: PermitRecover | null = null private proxyAddress!: string private indexerUrl!: string + private observePromise: Promise | undefined; txStore!: TxStore + protected poolName(): string { return 'relay-pool'; } + async init(permitConfig: PermitConfig, proxyAddress: string, indexerUrl: string) { if (this.isInitialized) return @@ -77,6 +82,8 @@ export class RelayPool extends BasePool { await this.permitRecover?.initializeDomain() this.isInitialized = true + + this.observePromise = undefined; } async validateTx( @@ -234,8 +241,10 @@ export class RelayPool extends BasePool { const calldata = data.join('') const memoTruncated = truncateMemoTxPrefixProverV2(memo, txType) - // TODO: we call indexer twice (during validation and tx build) - const indexerInfo = await this.getIndexerInfo() + + // Commit index should be treated as an optimistic checkpoint + // It can increase after the transaction is included into the Merkle tree + const commitIndex = await this.assumeNextPendingTxIndex(); return { data: calldata, @@ -243,9 +252,7 @@ export class RelayPool extends BasePool { outCommit, nullifier, memo: memoTruncated, - // Commit index should be treated as an optimistic checkpoint - // It can increase after the transaction is included - commitIndex: indexerInfo.optimisticDeltaIndex, + commitIndex, } } @@ -255,7 +262,11 @@ export class RelayPool extends BasePool { await this.optimisticState.nullifiers.add([nullifier]) } - await this.cacheTxLocally(commitIndex, outCommit, txHash, memo); + // cache transaction locally + const indexerOptimisticIndex = Number((await this.getIndexerInfo()).deltaIndex); + await this.cacheTxLocally(outCommit, txHash, memo, commitIndex); + // start monitoring local cache against the indexer to cleanup already indexed txs + this.startLocalCacheObserver(indexerOptimisticIndex); } async onConfirmed(res: ProcessResult, txHash: string, callback?: () => Promise, jobId?: string): Promise { @@ -268,26 +279,30 @@ export class RelayPool extends BasePool { poolJob.data.transaction.state = JobState.COMPLETED; poolJob.data.transaction.txHash = txHash; await poolJob.update(poolJob.data); - - await this.cacheTxLocally(res.commitIndex, res.outCommit, txHash, res.memo); } } } async onFailed(txHash: string, jobId: string): Promise { super.onFailed(txHash, jobId); - this.txStore.remove(jobId); const poolJob = await poolTxQueue.getJob(jobId); if (!poolJob) { logger.error('Pool job not found', { jobId }); } else { poolJob.data.transaction.state = JobState.REVERTED; poolJob.data.transaction.txHash = txHash; + + const txPayload = poolJob.data.transaction as TxPayload; + if (txPayload.proof.inputs.length > 2) { + const commit = txPayload.proof.inputs[2]; + this.txStore.remove(commit); + logger.info('Removing local cached transaction', {commit}); + } await poolJob.update(poolJob.data); } } - protected async cacheTxLocally(index: number, commit: string, txHash: string, memo: string) { + protected async cacheTxLocally(commit: string, txHash: string, memo: string, index: number) { // store or updating local tx store // (we should keep sent transaction until the indexer grab them) const prefixedMemo = buildPrefixedMemo( @@ -295,11 +310,85 @@ export class RelayPool extends BasePool { txHash, memo ); - await this.txStore.add(index, prefixedMemo); + await this.txStore.add(commit, prefixedMemo, index); + logger.info('Tx has been CACHED locally', { commit, index }); } - async getIndexerInfo() { + private async getIndexerInfo() { const info = await fetchJson(this.indexerUrl, '/info', []) return info } + + // It's just an assumption needed for internal purposes. The final index may be changed + private async assumeNextPendingTxIndex() { + const [indexerInfo, localCache] = await Promise.all([this.getIndexerInfo(), this.txStore.getAll()]); + + return Number(indexerInfo.optimisticDeltaIndex + Object.keys(localCache).length * OUTPLUSONE); + } + + private async getIndexerTxs(offset: number, limit: number): Promise { + const url = new URL('/transactions/v2', config.base.COMMON_INDEXER_URL) + url.searchParams.set('limit', limit.toString()) + url.searchParams.set('offset', offset.toString()) + + const response = await fetch(url) + if (!response.ok) { + throw new Error(`Failed to fetch transactions from indexer. Status: ${response.status}`) + } + return response.json() as Promise; + } + + // observe the current local cache and indexer to remove local record + // after adding it to the indexer's optimistic/persistent state + // return when local cache is empty + protected async startLocalCacheObserver(fromIndex: number): Promise { + if (this.observePromise == undefined) { + this.observePromise = this.localCacheObserverWorker(fromIndex).finally(() => { + this.observePromise = undefined; + }); + } + + return this.observePromise; + } + + protected async localCacheObserverWorker(fromIndex: number): Promise { + logger.debug('Local cache observer worker was started', { fromIndex }) + const CACHE_OBSERVE_INTERVAL_MS = 1000; // waiting time between checks + const EXTEND_LIMIT_TO_FETCH = 10; // taking into account non-atomic nature of /info and /transactions/v2 requests + while (true) { + const localEntries = Object.entries(await this.txStore.getAll()); + let localEntriesCnt = localEntries.length; + + if (localEntries.length == 0) { + // stop observer when localCache is empty + break; + } + + // there are entries in the local cache + try { + const indexerOptimisticIndex = Number((await this.getIndexerInfo()).optimisticDeltaIndex); + const limit = (indexerOptimisticIndex - fromIndex) / OUTPLUSONE + localEntries.length + EXTEND_LIMIT_TO_FETCH; + const indexerCommitments = (await this.getIndexerTxs(fromIndex, limit)).map(tx => BigNumber(tx.slice(65, 129), 16).toString(10)); + + // find cached commitments in the indexer's response + for (const [commit, {memo, index}] of localEntries) { + if (indexerCommitments.includes(commit) || index < indexerOptimisticIndex) { + logger.info('Deleting cached entry', { commit, index }) + await this.txStore.remove(commit) + localEntriesCnt--; + } else { + //logger.info('Cached entry is still in the local cache', { commit, index }); + } + } + } catch(e) { + logger.error(`Cannot check local cache against indexer : ${(e as Error).message}`); + } + + if (localEntriesCnt > 0) { + await sleep(CACHE_OBSERVE_INTERVAL_MS); + } + } + + logger.debug('Local cache observer worker has finished', { fromIndex }) + } } diff --git a/zp-relayer/services/indexer/init.ts b/zp-relayer/services/indexer/init.ts index d582624b..2b8e710a 100644 --- a/zp-relayer/services/indexer/init.ts +++ b/zp-relayer/services/indexer/init.ts @@ -26,10 +26,10 @@ export async function init() { for (let event of batch) { if (event.values.message) { // Message event - await pool.addTxToState(event.txHash, event.values.index, event.values.message, 'optimistic') + await pool.addTxToState(event.txHash, event.values.index, event.values.message, 'optimistic', event.blockNumber) } else if (event.values.commitment) { // RootUpdated event - pool.propagateOptimisticState(event.values.index) + pool.propagateOptimisticState(event.values.index, event.blockNumber) } } }, diff --git a/zp-relayer/services/indexer/router.ts b/zp-relayer/services/indexer/router.ts index 70588f56..5ded1ef7 100644 --- a/zp-relayer/services/indexer/router.ts +++ b/zp-relayer/services/indexer/router.ts @@ -83,7 +83,7 @@ function getRoot(req: Request, res: Response, { pool }: PoolInjection) { validateBatch([[checkGetRoot, req.query]]) const index = req.query.index as unknown as number - const root = pool.state.getMerkleRootAt(index) + const root = pool.state.getMerkleRootAt(index) ?? pool.optimisticState.getMerkleRootAt(index) res.json({ root }) } diff --git a/zp-relayer/services/relayer/endpoints.ts b/zp-relayer/services/relayer/endpoints.ts index 6f6a650f..ede301d9 100644 --- a/zp-relayer/services/relayer/endpoints.ts +++ b/zp-relayer/services/relayer/endpoints.ts @@ -20,6 +20,7 @@ import { validateCountryIP, ValidationFunction, } from '../../validation/api/validation' +import BigNumber from 'bignumber.js' interface PoolInjection { pool: BasePool @@ -93,26 +94,25 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje const response = await fetch(url) if (!response.ok) { - throw new Error(`Failed to fetch transactions from indexer. Status: ${res.status}`) + throw new Error(`Failed to fetch transactions from indexer. Status: ${response.status}`) } const indexerTxs: string[] = await response.json() - const lastIndex = offset + indexerTxs.length * OUTPLUSONE - const txStore = (pool as RelayPool).txStore - const indices = await txStore.getAll().then(keys => { - return Object.entries(keys) - .map(([i, v]) => [parseInt(i), v] as [number, string]) - .filter(([i]) => offset <= i && i <= lastIndex) - .sort(([i1], [i2]) => i1 - i2) - }) - - const indexerCommitments = indexerTxs.map(tx => tx.slice(65, 129)); + const lastRequestedIndex = offset + limit * OUTPLUSONE; + const lastReceivedIndex = offset + indexerTxs.length * OUTPLUSONE; + const txStore = (pool as RelayPool).txStore; + const localEntries = await txStore.getAll().then(entries => + Object.entries(entries) + .filter(([commit, {memo, index}]) => offset <= index && index < lastRequestedIndex) + ); + + const indexerCommitments = indexerTxs.map(tx => BigNumber(tx.slice(65, 129), 16).toString(10)); const optimisticTxs: string[] = [] - for (const [index, memo] of indices) { - const commitLocal = memo.slice(0, 64) - if (indexerCommitments.includes(commitLocal)) { - logger.info('Deleting index from optimistic state', { index }) - await txStore.remove(index.toString()) + for (const [commit, {memo, index}] of localEntries) { + if (indexerCommitments.includes(commit) || index < lastReceivedIndex) { + // !!! we shouldn't modify local cache from here. Just filter entries to return correct response + //logger.info('Deleting index from optimistic state', { index }) + //await txStore.remove(commit) } else { optimisticTxs.push(txToV2Format('0', memo)) } @@ -193,8 +193,8 @@ async function relayerInfo(req: Request, res: Response, { pool }: PoolInjection) const pendingCnt = await txStore.getAll() .then(keys => { return Object.entries(keys) - .map(([i]) => parseInt(i) as number) - .filter(i => indexerMaxIdx <= i) + .map(([commit, {memo, index}]) => index) + .filter(i => i >= indexerMaxIdx) }) .then(a => a.length); diff --git a/zp-relayer/state/PoolState.ts b/zp-relayer/state/PoolState.ts index cefafb02..44f183b7 100644 --- a/zp-relayer/state/PoolState.ts +++ b/zp-relayer/state/PoolState.ts @@ -144,6 +144,16 @@ export class PoolState { } } + wipe() { + const stateNextIndex = this.tree.getNextIndex(); + this.tree.wipe(); + for (let i = 0; i < stateNextIndex; i += OUTPLUSONE) { + this.txs.delete(i) + } + this.jobIdsMapping.clear(); + this.nullifiers.clear(); + } + async getTransactions(limit: number, offset: number) { // Round offset to OUTPLUSONE offset = Math.floor(offset / OUTPLUSONE) * OUTPLUSONE diff --git a/zp-relayer/state/TxStore.ts b/zp-relayer/state/TxStore.ts index 2b5f735d..faccb10b 100644 --- a/zp-relayer/state/TxStore.ts +++ b/zp-relayer/state/TxStore.ts @@ -1,23 +1,39 @@ +import { hexToNumber, numberToHexPadded } from '@/utils/helpers'; import type { Redis } from 'ioredis' +const INDEX_BYTES = 6; + export class TxStore { constructor(public name: string, private redis: Redis) {} - async add(index: number, memo: string) { - await this.redis.hset(this.name, { [index]: memo }) + async add(commitment: string, memo: string, index: number) { + await this.redis.hset(this.name, { [commitment]: `${numberToHexPadded(index, INDEX_BYTES)}${memo}` }) } - async remove(index: string) { - await this.redis.hdel(this.name, index) + async remove(commitment: string) { + await this.redis.hdel(this.name, commitment) } - async get(index: string) { - const memo = await this.redis.hget(this.name, index) - return memo + async get(commitment: string): Promise<{memo: string, index: number} | null> { + const data = await this.redis.hget(this.name, commitment); + + return data ? { + memo: data.slice(INDEX_BYTES * 2), + index: hexToNumber(data.slice(0, INDEX_BYTES * 2)), + } : null; } - async getAll() { - return await this.redis.hgetall(this.name) + async getAll(): Promise> { + return this.redis.hgetall(this.name).then(keys => Object.fromEntries( + Object.entries(keys) + .map(([commit, data]) => + [commit, + { + memo: data.slice(INDEX_BYTES * 2), + index: hexToNumber(data.slice(0, INDEX_BYTES * 2)), + }] as [string, {memo: string, index: number}] + ) + )); } async removeAll() { diff --git a/zp-relayer/utils/helpers.ts b/zp-relayer/utils/helpers.ts index 4d6f3368..d969c7c5 100644 --- a/zp-relayer/utils/helpers.ts +++ b/zp-relayer/utils/helpers.ts @@ -8,7 +8,7 @@ import type { SnarkProof } from 'libzkbob-rs-node' import promiseRetry from 'promise-retry' import type Web3 from 'web3' import type { Contract } from 'web3-eth-contract' -import { padLeft, toBN } from 'web3-utils' +import { padLeft, toBN, numberToHex } from 'web3-utils' import { TxType } from 'zp-memo-parser' import { isContractCallError } from './web3Errors' @@ -316,3 +316,11 @@ export async function fetchJson(serverUrl: string, path: string, query: [string, return await res.json() } + +export function numberToHexPadded(num: number, numBytes: number): string { + return padLeft(numberToHex(num).slice(2), numBytes * 2); +} + +export function hexToNumber(hex: string): number { + return parseInt(hex, 16); +} \ No newline at end of file diff --git a/zp-relayer/utils/web3.ts b/zp-relayer/utils/web3.ts index 7442e124..768a98bc 100644 --- a/zp-relayer/utils/web3.ts +++ b/zp-relayer/utils/web3.ts @@ -64,7 +64,7 @@ export async function getChainId(web3: Web3) { export async function getBlockNumber(network: NetworkBackend) { try { - logger.debug('Getting block number') + //logger.debug('Getting block number') const blockNumber = await network.getBlockNumber() logger.debug('Block number obtained', { blockNumber }) return blockNumber