Skip to content

Commit

Permalink
feat(txpool): improve demotion unexecutable transactions (#84)
Browse files Browse the repository at this point in the history
Co-authored-by: andyzhang2023 <[email protected]>
Co-authored-by: Owen <[email protected]>
  • Loading branch information
3 people authored May 30, 2024
1 parent 3949d08 commit d0fbab5
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 8 deletions.
69 changes: 62 additions & 7 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ var (
reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)

staledMeter = metrics.NewRegisteredMeter("txpool/staled/count", nil) // staled transactions

// demote metrics
// demoteDuration measures how long time a demotion takes.
demoteDurationTimer = metrics.NewRegisteredTimer("txpool/demote/duration", nil)
demoteTxMeter = metrics.NewRegisteredMeter("txpool/demote/tx", nil)
resetDepthMeter = metrics.NewRegisteredMeter("txpool/reset/depth", nil) //reorg depth of blocks which causes demote
)

// BlockChain defines the minimal set of methods needed to back a tx pool with
Expand Down Expand Up @@ -1334,7 +1340,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}(time.Now())
defer close(done)

var promoteAddrs []common.Address
var promoteAddrs, demoteAddrs []common.Address
if dirtyAccounts != nil && reset == nil {
// Only dirty accounts need to be promoted, unless we're resetting.
// For resets, all addresses in the tx queue will be promoted and
Expand All @@ -1344,7 +1350,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
pool.mu.Lock()
if reset != nil {
// Reset from the old head to the new, rescheduling any reorged transactions
pool.reset(reset.oldHead, reset.newHead)
demoteAddrs = pool.reset(reset.oldHead, reset.newHead)

// Nonces were reset, discard any events that became stale
for addr := range events {
Expand All @@ -1365,8 +1371,10 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
// If a new block appeared, validate the pool of pending transactions. This will
// remove any transaction that has been included in the block or was invalidated
// because of another transaction (e.g. higher gas price).
var t0 = time.Now()
if reset != nil {
pool.demoteUnexecutables()
pool.demoteUnexecutables(demoteAddrs)
demoteDurationTimer.Update(time.Since(t0))
if reset.newHead != nil {
if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
pendingBaseFee := eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1)
Expand Down Expand Up @@ -1410,16 +1418,41 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,

// reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []common.Address) {
// If we're reorging an old state, reinject all dropped transactions
var reinject types.Transactions
// collect demote addresses
var collectAddr = func(txs types.Transactions) {
addrs := make(map[common.Address]struct{})
for _, tx := range txs {
if !pool.Filter(tx) {
continue
}
// it is heavy to get sender from tx, so we try to get it from the pool
if oldtx := pool.all.Get(tx.Hash()); oldtx != nil {
tx = oldtx
}
addr, err := types.Sender(pool.signer, tx)
//it might come from other pool, by other signer
if err != nil {
continue
}
addrs[addr] = struct{}{}
}
demoteAddrs = make([]common.Address, 0, len(addrs))
for addr := range addrs {
demoteAddrs = append(demoteAddrs, addr)
}
}

var depth uint64 = 1

if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
// If the reorg is too deep, avoid doing it (will happen during fast sync)
oldNum := oldHead.Number.Uint64()
newNum := newHead.Number.Uint64()

if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
if depth = uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
log.Debug("Skipping deep transaction reorg", "depth", depth)
} else {
// Reorg seems shallow enough to pull in all transactions into memory
Expand Down Expand Up @@ -1485,9 +1518,18 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
}
}
reinject = lost

collectAddr(append(discarded, included...))
}
}
} else if newHead != nil && oldHead.Hash() == newHead.ParentHash {
block := pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
if block != nil {
collectAddr(block.Transactions())
}
}
resetDepthMeter.Mark(int64(depth))
log.Info("reset block depth", "depth", depth)
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock() // Special case during testing
Expand All @@ -1511,6 +1553,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
log.Debug("Reinjecting stale transactions", "count", len(reinject))
core.SenderCacher.Recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
return
}

// promoteExecutables moves transactions that have become processable from the
Expand Down Expand Up @@ -1731,10 +1774,22 @@ func (pool *LegacyPool) truncateQueue() {
// Note: transactions are not marked as removed in the priced list because re-heaping
// is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful
// to trigger a re-heap is this function
func (pool *LegacyPool) demoteUnexecutables() {
func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
if demoteAddrs == nil {
demoteAddrs = make([]common.Address, 0, len(pool.pending))
for addr := range pool.pending {
demoteAddrs = append(demoteAddrs, addr)
}
}
demoteTxMeter.Mark(int64(len(demoteAddrs)))

// Iterate over all accounts and demote any non-executable transactions
gasLimit := txpool.EffectiveGasLimit(pool.chainconfig, pool.currentHead.Load().GasLimit)
for addr, list := range pool.pending {
for _, addr := range demoteAddrs {
list := pool.pending[addr]
if list == nil {
continue
}
nonce := pool.currentState.GetNonce(addr)

// Drop all transactions that are deemed too old (low nonce)
Expand Down
2 changes: 1 addition & 1 deletion core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2567,7 +2567,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
// Benchmark the speed of pool validation
b.ResetTimer()
for i := 0; i < b.N; i++ {
pool.demoteUnexecutables()
pool.demoteUnexecutables(nil)
}
}

Expand Down

0 comments on commit d0fbab5

Please sign in to comment.