Skip to content

Commit

Permalink
Fix/pending txs (#226)
Browse files Browse the repository at this point in the history
* Trying to improve pending txs registering

* Set proper index for local transactions

* Fix indexer sync flow

* Remove unneeded log line

* Storing local cache by commit (instead of indexes)

* Adding index to local cache

* Bugfixing

* Filtering pending txs

* Removing tx from the cache on fail

* Fix issues
  • Loading branch information
EvgenKor authored Sep 9, 2024
1 parent db95af3 commit 8cde68d
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 72 deletions.
1 change: 1 addition & 0 deletions zp-relayer/lib/network/NetworkBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export function isEthereum(n: NetworkBackend<Network>): n is NetworkBackend<Netw
export interface Event {
txHash: string
values: Record<string, any>
blockNumber: number
}

export interface GetEventsConfig<N extends Network> {
Expand Down
1 change: 1 addition & 0 deletions zp-relayer/lib/network/evm/EvmBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export class EvmBackend implements INetworkBackend<Network.Ethereum> {
events: events.map(e => ({
txHash: e.transactionHash,
values: e.returnValues,
blockNumber: e.blockNumber,
})),
fromBlock,
toBlock,
Expand Down
1 change: 1 addition & 0 deletions zp-relayer/lib/network/tron/TronBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export class TronBackend implements INetworkBackend<Network.Tron> {
yield events.map((e: any) => ({
txHash: e.transaction,
values: e.result,
blockNumber: e.block,
}))

fingerprint = events[events.length - 1].fingerprint || null
Expand Down
93 changes: 67 additions & 26 deletions zp-relayer/pool/BasePool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export abstract class BasePool<N extends Network = Network> {
public poolId: BN = toBN(0)
public isInitialized = false

protected poolName(): string { return 'base-pool'; }

constructor(public network: NetworkBackend<N>, private config: BasePoolConfig) {
this.txVK = require(config.txVkPath)

Expand Down Expand Up @@ -146,11 +148,11 @@ export abstract class BasePool<N extends Network = Network> {
async syncState(startBlock?: number, lastBlock?: number, indexerUrl?: string) {
logger.debug('Syncing state; starting from block %d', startBlock)

const localIndex = this.state.getNextIndex()
const localRoot = this.state.getMerkleRoot()
let localIndex = this.state.getNextIndex()
let localRoot = this.state.getMerkleRoot()

const contractIndex = await this.getContractIndex()
const contractRoot = await this.getContractMerkleRoot(contractIndex)
let contractIndex = await this.getContractIndex()
let contractRoot = await this.getContractMerkleRoot(contractIndex)

logger.debug('State info', {
localRoot,
Expand All @@ -164,25 +166,34 @@ export abstract class BasePool<N extends Network = Network> {
return
}

if (indexerUrl) {
await this.syncStateFromIndexer(indexerUrl)
} else if (startBlock && lastBlock) {
await this.syncStateFromContract(startBlock, lastBlock, contractIndex, localIndex)
} else {
throw new Error('Either (startBlock, lastBlock) or indexerUrl should be provided for sync')
}
while (localIndex < contractIndex) {
if (indexerUrl) {
await this.syncStateFromIndexer(indexerUrl)
} else if (startBlock && lastBlock) {
const savedBlockNumberOfLastConfirmedTx = await this.getLastConfirmedTxBlock();
const actualStartBlock = Math.max(startBlock, savedBlockNumberOfLastConfirmedTx);

const newLocalIndex = this.state.getNextIndex()
const newLocalRoot = this.state.getMerkleRoot()
logger.debug('Local state after update', {
newLocalRoot,
newLocalIndex,
})
if (newLocalIndex < contractIndex) {
throw new Error('Indexer is not synchronized with the contract yet')
logger.debug('Syncing from contract; starting from block %d', actualStartBlock)

await this.syncStateFromContract(actualStartBlock, lastBlock, contractIndex, localIndex);
} else {
throw new Error('Either (startBlock, lastBlock) or indexerUrl should be provided for sync')
}

localIndex = this.state.getNextIndex()
localRoot = this.state.getMerkleRoot()
logger.debug('Local state after update', {
localRoot,
localIndex,
})
}
if (newLocalRoot !== contractRoot) {
throw new Error('State is corrupted, roots mismatch')

if (localRoot !== contractRoot) {
await this.state.wipe();
await this.optimisticState.wipe();
await this.setLastConfirmedTxBlockForced(0);

throw new Error('State is corrupted, roots mismatch. State was wiped')
}
}

Expand Down Expand Up @@ -239,12 +250,12 @@ export abstract class BasePool<N extends Network = Network> {
for (const e of batch.events) {
// Filter pending txs in case of decentralized relay pool
const state = toBN(e.values.index).lte(toBN(contractIndex)) ? 'all' : 'optimistic'
await this.addTxToState(e.txHash, e.values.index, e.values.message, state)
await this.addTxToState(e.txHash, e.values.index, e.values.message, state, e.blockNumber)
}
}
}

async addTxToState(txHash: string, newPoolIndex: number, message: string, state: 'optimistic' | 'confirmed' | 'all') {
async addTxToState(txHash: string, newPoolIndex: number, message: string, state: 'optimistic' | 'confirmed' | 'all', blockNumber: number) {
const transactSelector = '0xaf989083'
const transactV2Selector = '0x5fd28f8c'

Expand Down Expand Up @@ -301,10 +312,12 @@ export abstract class BasePool<N extends Network = Network> {

memo = truncateMemoTxPrefix(memoRaw, txType)

// Save nullifier in confirmed state
// Save nullifier and tx's block number in confirmed state
if (state !== 'optimistic') {
const nullifier = parser.getField('nullifier')
await this.state.nullifiers.add([hexToNumberString(nullifier)])

this.setLastConfirmedTxBlock(blockNumber);
}
} else if (input.startsWith(transactV2Selector)) {
const calldata = Buffer.from(truncateHexPrefix(input), 'hex')
Expand All @@ -320,10 +333,12 @@ export abstract class BasePool<N extends Network = Network> {

memo = truncateMemoTxPrefixProverV2(memoRaw, txType)

// Save nullifier in confirmed state
// Save nullifier and tx's block number in confirmed state
if (state !== 'optimistic') {
const nullifier = parser.getField('nullifier')
await this.state.nullifiers.add([hexToNumberString(nullifier)])

this.setLastConfirmedTxBlock(blockNumber);
}
} else {
throw new Error(`Unknown transaction type: ${input}`)
Expand All @@ -336,7 +351,7 @@ export abstract class BasePool<N extends Network = Network> {
}
}

propagateOptimisticState(index: number) {
propagateOptimisticState(index: number, blockNumber: number) {
index = Math.floor(index / OUTPLUSONE)
const opIndex = Math.floor(this.optimisticState.getNextIndex() / OUTPLUSONE)
const stateIndex = Math.floor(this.state.getNextIndex() / OUTPLUSONE)
Expand All @@ -352,6 +367,8 @@ export abstract class BasePool<N extends Network = Network> {
const outCommit = hexToNumberString('0x' + tx.slice(0, 64))
this.state.updateState(i, outCommit, tx)
}

this.setLastConfirmedTxBlock(blockNumber);
}

verifyProof(proof: SnarkProof, inputs: Array<string>) {
Expand Down Expand Up @@ -425,4 +442,28 @@ export abstract class BasePool<N extends Network = Network> {
}
return limitsFetch
}


// The following key in Redis DB will use to restore sync from the last confirmed tx
private lastConfirmedTxBlockRedisKey = `${this.poolName}:LastConfirmedTxBlock`;

protected async setLastConfirmedTxBlock(blockNumber: number) {
const curValue = await this.getLastConfirmedTxBlock();
if (blockNumber > curValue) {
this.setLastConfirmedTxBlockForced(blockNumber);
}
}

private async setLastConfirmedTxBlockForced(blockNumber: number) {
redis.set(this.lastConfirmedTxBlockRedisKey, blockNumber);
}

protected async getLastConfirmedTxBlock(): Promise<number> {
const result = await redis.get(this.lastConfirmedTxBlockRedisKey);
try{
return Number(result);
} catch(_) {};

return 0;
}
}
2 changes: 2 additions & 0 deletions zp-relayer/pool/DefaultPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ export class DefaultPool extends BasePool {
treeProver!: IProver<Circuit.Tree>
public permitRecover: PermitRecover | null = null

protected poolName(): string { return 'default-pool'; }

async init(startBlock: number | null = null, treeProver: IProver<Circuit.Tree>) {
if (this.isInitialized) return

Expand Down
2 changes: 2 additions & 0 deletions zp-relayer/pool/FinalizerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export class FinalizerPool extends BasePool {
directDepositProver!: IProver<Circuit.DirectDeposit>
indexerUrl!: string

protected poolName(): string { return 'finalizer-pool'; }

async init(
treeProver: IProver<Circuit.Tree>,
directDepositProver: IProver<Circuit.DirectDeposit>,
Expand Down
2 changes: 2 additions & 0 deletions zp-relayer/pool/IndexerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { type PermitRecover } from '@/utils/permit/types'
export class IndexerPool extends BasePool {
public permitRecover: PermitRecover | null = null

protected poolName(): string { return 'indexer-pool'; }

async init(startBlock: number | null = null, lastBlock: number | null = null) {
if (this.isInitialized) return

Expand Down
Loading

0 comments on commit 8cde68d

Please sign in to comment.