From 6f2413441eeace454c9a80e38e49b55f9ec7637b Mon Sep 17 00:00:00 2001 From: Evgen Date: Thu, 29 Aug 2024 17:40:50 +0300 Subject: [PATCH] Fix indexer sync flow --- zp-relayer/lib/network/NetworkBackend.ts | 1 + zp-relayer/lib/network/evm/EvmBackend.ts | 1 + zp-relayer/lib/network/tron/TronBackend.ts | 1 + zp-relayer/pool/BasePool.ts | 93 ++++++++++++++++------ zp-relayer/pool/DefaultPool.ts | 2 + zp-relayer/pool/FinalizerPool.ts | 2 + zp-relayer/pool/IndexerPool.ts | 2 + zp-relayer/pool/RelayPool.ts | 2 + zp-relayer/services/indexer/init.ts | 4 +- zp-relayer/state/PoolState.ts | 10 +++ 10 files changed, 90 insertions(+), 28 deletions(-) diff --git a/zp-relayer/lib/network/NetworkBackend.ts b/zp-relayer/lib/network/NetworkBackend.ts index 3656e5ca..873f6da5 100644 --- a/zp-relayer/lib/network/NetworkBackend.ts +++ b/zp-relayer/lib/network/NetworkBackend.ts @@ -11,6 +11,7 @@ export function isEthereum(n: NetworkBackend): n is NetworkBackend + blockNumber: number } export interface GetEventsConfig { diff --git a/zp-relayer/lib/network/evm/EvmBackend.ts b/zp-relayer/lib/network/evm/EvmBackend.ts index 8556b15f..49079918 100644 --- a/zp-relayer/lib/network/evm/EvmBackend.ts +++ b/zp-relayer/lib/network/evm/EvmBackend.ts @@ -62,6 +62,7 @@ export class EvmBackend implements INetworkBackend { events: events.map(e => ({ txHash: e.transactionHash, values: e.returnValues, + blockNumber: e.blockNumber, })), fromBlock, toBlock, diff --git a/zp-relayer/lib/network/tron/TronBackend.ts b/zp-relayer/lib/network/tron/TronBackend.ts index 75dd8437..a20de4ab 100644 --- a/zp-relayer/lib/network/tron/TronBackend.ts +++ b/zp-relayer/lib/network/tron/TronBackend.ts @@ -49,6 +49,7 @@ export class TronBackend implements INetworkBackend { yield events.map((e: any) => ({ txHash: e.transaction, values: e.result, + blockNumber: e.block, })) fingerprint = events[events.length - 1].fingerprint || null diff --git a/zp-relayer/pool/BasePool.ts b/zp-relayer/pool/BasePool.ts index fe7eb79b..ece62d18 100644 --- a/zp-relayer/pool/BasePool.ts +++ b/zp-relayer/pool/BasePool.ts @@ -29,6 +29,8 @@ export abstract class BasePool { public poolId: BN = toBN(0) public isInitialized = false + protected poolName(): string { return 'base-pool'; } + constructor(public network: NetworkBackend, private config: BasePoolConfig) { this.txVK = require(config.txVkPath) @@ -146,11 +148,11 @@ export abstract class BasePool { async syncState(startBlock?: number, lastBlock?: number, indexerUrl?: string) { logger.debug('Syncing state; starting from block %d', startBlock) - const localIndex = this.state.getNextIndex() - const localRoot = this.state.getMerkleRoot() + let localIndex = this.state.getNextIndex() + let localRoot = this.state.getMerkleRoot() - const contractIndex = await this.getContractIndex() - const contractRoot = await this.getContractMerkleRoot(contractIndex) + let contractIndex = await this.getContractIndex() + let contractRoot = await this.getContractMerkleRoot(contractIndex) logger.debug('State info', { localRoot, @@ -164,25 +166,34 @@ export abstract class BasePool { return } - if (indexerUrl) { - await this.syncStateFromIndexer(indexerUrl) - } else if (startBlock && lastBlock) { - await this.syncStateFromContract(startBlock, lastBlock, contractIndex, localIndex) - } else { - throw new Error('Either (startBlock, lastBlock) or indexerUrl should be provided for sync') - } + while (localIndex < contractIndex) { + if (indexerUrl) { + await this.syncStateFromIndexer(indexerUrl) + } else if (startBlock && lastBlock) { + const savedBlockNumberOfLastConfirmedTx = await this.getLastConfirmedTxBlock(); + const actualStartBlock = Math.max(startBlock, savedBlockNumberOfLastConfirmedTx); - const newLocalIndex = this.state.getNextIndex() - const newLocalRoot = this.state.getMerkleRoot() - logger.debug('Local state after update', { - newLocalRoot, - newLocalIndex, - }) - if (newLocalIndex < contractIndex) { - throw new Error('Indexer is not synchronized with the contract yet') + logger.debug('Syncing from contract; starting from block %d', actualStartBlock) + + await this.syncStateFromContract(actualStartBlock, lastBlock, contractIndex, localIndex); + } else { + throw new Error('Either (startBlock, lastBlock) or indexerUrl should be provided for sync') + } + + localIndex = this.state.getNextIndex() + localRoot = this.state.getMerkleRoot() + logger.debug('Local state after update', { + localRoot, + localIndex, + }) } - if (newLocalRoot !== contractRoot) { - throw new Error('State is corrupted, roots mismatch') + + if (localRoot !== contractRoot) { + await this.state.wipe(); + await this.optimisticState.wipe(); + await this.setLastConfirmedTxBlockForced(0); + + throw new Error('State is corrupted, roots mismatch. State was wiped') } } @@ -239,12 +250,12 @@ export abstract class BasePool { for (const e of batch.events) { // Filter pending txs in case of decentralized relay pool const state = toBN(e.values.index).lte(toBN(contractIndex)) ? 'all' : 'optimistic' - await this.addTxToState(e.txHash, e.values.index, e.values.message, state) + await this.addTxToState(e.txHash, e.values.index, e.values.message, state, e.blockNumber) } } } - async addTxToState(txHash: string, newPoolIndex: number, message: string, state: 'optimistic' | 'confirmed' | 'all') { + async addTxToState(txHash: string, newPoolIndex: number, message: string, state: 'optimistic' | 'confirmed' | 'all', blockNumber: number) { const transactSelector = '0xaf989083' const transactV2Selector = '0x5fd28f8c' @@ -301,10 +312,12 @@ export abstract class BasePool { memo = truncateMemoTxPrefix(memoRaw, txType) - // Save nullifier in confirmed state + // Save nullifier and tx's block number in confirmed state if (state !== 'optimistic') { const nullifier = parser.getField('nullifier') await this.state.nullifiers.add([hexToNumberString(nullifier)]) + + this.setLastConfirmedTxBlock(blockNumber); } } else if (input.startsWith(transactV2Selector)) { const calldata = Buffer.from(truncateHexPrefix(input), 'hex') @@ -320,10 +333,12 @@ export abstract class BasePool { memo = truncateMemoTxPrefixProverV2(memoRaw, txType) - // Save nullifier in confirmed state + // Save nullifier and tx's block number in confirmed state if (state !== 'optimistic') { const nullifier = parser.getField('nullifier') await this.state.nullifiers.add([hexToNumberString(nullifier)]) + + this.setLastConfirmedTxBlock(blockNumber); } } else { throw new Error(`Unknown transaction type: ${input}`) @@ -336,7 +351,7 @@ export abstract class BasePool { } } - propagateOptimisticState(index: number) { + propagateOptimisticState(index: number, blockNumber: number) { index = Math.floor(index / OUTPLUSONE) const opIndex = Math.floor(this.optimisticState.getNextIndex() / OUTPLUSONE) const stateIndex = Math.floor(this.state.getNextIndex() / OUTPLUSONE) @@ -352,6 +367,8 @@ export abstract class BasePool { const outCommit = hexToNumberString('0x' + tx.slice(0, 64)) this.state.updateState(i, outCommit, tx) } + + this.setLastConfirmedTxBlock(blockNumber); } verifyProof(proof: SnarkProof, inputs: Array) { @@ -425,4 +442,28 @@ export abstract class BasePool { } return limitsFetch } + + + // The following key in Redis DB will use to restore sync from the last confirmed tx + private lastConfirmedTxBlockRedisKey = `${this.poolName}:LastConfirmedTxBlock`; + + protected async setLastConfirmedTxBlock(blockNumber: number) { + const curValue = await this.getLastConfirmedTxBlock(); + if (blockNumber > curValue) { + this.setLastConfirmedTxBlockForced(blockNumber); + } + } + + private async setLastConfirmedTxBlockForced(blockNumber: number) { + redis.set(this.lastConfirmedTxBlockRedisKey, blockNumber); + } + + protected async getLastConfirmedTxBlock(): Promise { + const result = await redis.get(this.lastConfirmedTxBlockRedisKey); + try{ + return Number(result); + } catch(_) {}; + + return 0; + } } diff --git a/zp-relayer/pool/DefaultPool.ts b/zp-relayer/pool/DefaultPool.ts index 96d4c411..43b3c059 100644 --- a/zp-relayer/pool/DefaultPool.ts +++ b/zp-relayer/pool/DefaultPool.ts @@ -54,6 +54,8 @@ export class DefaultPool extends BasePool { treeProver!: IProver public permitRecover: PermitRecover | null = null + protected poolName(): string { return 'default-pool'; } + async init(startBlock: number | null = null, treeProver: IProver) { if (this.isInitialized) return diff --git a/zp-relayer/pool/FinalizerPool.ts b/zp-relayer/pool/FinalizerPool.ts index 3b392008..cbb93a9d 100644 --- a/zp-relayer/pool/FinalizerPool.ts +++ b/zp-relayer/pool/FinalizerPool.ts @@ -21,6 +21,8 @@ export class FinalizerPool extends BasePool { directDepositProver!: IProver indexerUrl!: string + protected poolName(): string { return 'finalizer-pool'; } + async init( treeProver: IProver, directDepositProver: IProver, diff --git a/zp-relayer/pool/IndexerPool.ts b/zp-relayer/pool/IndexerPool.ts index a673331e..074ce64c 100644 --- a/zp-relayer/pool/IndexerPool.ts +++ b/zp-relayer/pool/IndexerPool.ts @@ -6,6 +6,8 @@ import { type PermitRecover } from '@/utils/permit/types' export class IndexerPool extends BasePool { public permitRecover: PermitRecover | null = null + protected poolName(): string { return 'indexer-pool'; } + async init(startBlock: number | null = null, lastBlock: number | null = null) { if (this.isInitialized) return diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index ea251e28..f3a3a525 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -54,6 +54,8 @@ export class RelayPool extends BasePool { private observePromise: Promise | undefined; txStore!: TxStore + protected poolName(): string { return 'relay-pool'; } + async init(permitConfig: PermitConfig, proxyAddress: string, indexerUrl: string) { if (this.isInitialized) return diff --git a/zp-relayer/services/indexer/init.ts b/zp-relayer/services/indexer/init.ts index d582624b..2b8e710a 100644 --- a/zp-relayer/services/indexer/init.ts +++ b/zp-relayer/services/indexer/init.ts @@ -26,10 +26,10 @@ export async function init() { for (let event of batch) { if (event.values.message) { // Message event - await pool.addTxToState(event.txHash, event.values.index, event.values.message, 'optimistic') + await pool.addTxToState(event.txHash, event.values.index, event.values.message, 'optimistic', event.blockNumber) } else if (event.values.commitment) { // RootUpdated event - pool.propagateOptimisticState(event.values.index) + pool.propagateOptimisticState(event.values.index, event.blockNumber) } } }, diff --git a/zp-relayer/state/PoolState.ts b/zp-relayer/state/PoolState.ts index cefafb02..44f183b7 100644 --- a/zp-relayer/state/PoolState.ts +++ b/zp-relayer/state/PoolState.ts @@ -144,6 +144,16 @@ export class PoolState { } } + wipe() { + const stateNextIndex = this.tree.getNextIndex(); + this.tree.wipe(); + for (let i = 0; i < stateNextIndex; i += OUTPLUSONE) { + this.txs.delete(i) + } + this.jobIdsMapping.clear(); + this.nullifiers.clear(); + } + async getTransactions(limit: number, offset: number) { // Round offset to OUTPLUSONE offset = Math.floor(offset / OUTPLUSONE) * OUTPLUSONE