Skip to content

Commit

Permalink
improve Pending() of txpool to reduce the latency when miner worker c…
Browse files Browse the repository at this point in the history
…ommitting transactions (#85)

Co-authored-by: andyzhang2023 <[email protected]>
Co-authored-by: Owen <[email protected]>
  • Loading branch information
3 people authored Jun 7, 2024
1 parent feb5bfe commit bc67f16
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 41 deletions.
136 changes: 136 additions & 0 deletions core/txpool/legacypool/cache_for_miner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package legacypool

import (
"math/big"
"sort"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
)

var (
pendingCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/pending/cache", nil)
localCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/local/cache", nil)
)

// copy of pending transactions
type cacheForMiner struct {
txLock sync.Mutex
pending map[common.Address]map[*types.Transaction]struct{}
locals map[common.Address]bool
addrLock sync.Mutex
}

func newCacheForMiner() *cacheForMiner {
return &cacheForMiner{
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
}
}

func (pc *cacheForMiner) add(txs types.Transactions, signer types.Signer) {
if len(txs) == 0 {
return
}
pc.txLock.Lock()
defer pc.txLock.Unlock()
pendingCacheGauge.Inc(int64(len(txs)))
for _, tx := range txs {
addr, _ := types.Sender(signer, tx)
slots, ok := pc.pending[addr]
if !ok {
slots = make(map[*types.Transaction]struct{})
pc.pending[addr] = slots
}
slots[tx] = struct{}{}
}
}

func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) {
if len(txs) == 0 {
return
}
pc.txLock.Lock()
defer pc.txLock.Unlock()
for _, tx := range txs {
addr, _ := types.Sender(signer, tx)
slots, ok := pc.pending[addr]
if !ok {
continue
}
pendingCacheGauge.Dec(1)
delete(slots, tx)
if len(slots) == 0 {
delete(pc.pending, addr)
}
}
}

func (pc *cacheForMiner) dump(pool txpool.LazyResolver, gasPrice, baseFee *big.Int, enforceTip bool) map[common.Address][]*txpool.LazyTransaction {
pending := make(map[common.Address]types.Transactions)
pc.txLock.Lock()
for addr, txlist := range pc.pending {
pending[addr] = make(types.Transactions, 0, len(txlist))
for tx := range txlist {
pending[addr] = append(pending[addr], tx)
}
}
pc.txLock.Unlock()
pendingLazy := make(map[common.Address][]*txpool.LazyTransaction)
for addr, txs := range pending {
// sorted by nonce
sort.Sort(types.TxByNonce(txs))
// If the miner requests tip enforcement, cap the lists now
if enforceTip && !pc.isLocal(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 {
txs = txs[:i]
break
}
}
}
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i, tx := range txs {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}
}
pendingLazy[addr] = lazies
}
}
return pendingLazy
}

func (pc *cacheForMiner) markLocal(addr common.Address) {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
localCacheGauge.Inc(1)
pc.locals[addr] = true
}

func (pc *cacheForMiner) isLocal(addr common.Address) bool {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
return pc.locals[addr]
}

func (pc *cacheForMiner) flattenLocals() []common.Address {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
locals := make([]common.Address, 0, len(pc.locals))
for addr := range pc.locals {
locals = append(locals, addr)
}
return locals
}
82 changes: 44 additions & 38 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ var (

staledMeter = metrics.NewRegisteredMeter("txpool/staled/count", nil) // staled transactions

// duration of miner worker fetching all pending transactions
getPendingDurationTimer = metrics.NewRegisteredTimer("txpool/getpending/time", nil)
// duration of miner worker fetching all local addresses
getLocalsDurationTimer = metrics.NewRegisteredTimer("txpool/getlocals/time", nil)
// demote metrics
// demoteDuration measures how long time a demotion takes.
demoteDurationTimer = metrics.NewRegisteredTimer("txpool/demote/duration", nil)
Expand Down Expand Up @@ -247,6 +251,9 @@ type LegacyPool struct {
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price

pendingCacheDumper func(enforceTip bool) map[common.Address][]*txpool.LazyTransaction
pendingCache *cacheForMiner //pending list cache for miner

reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
Expand Down Expand Up @@ -286,11 +293,13 @@ func New(config Config, chain BlockChain) *LegacyPool {
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
pendingCache: newCacheForMiner(),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
pool.locals.add(addr)
pool.pendingCache.markLocal(addr)
}
pool.priced = newPricedList(pool.all)

Expand Down Expand Up @@ -322,6 +331,11 @@ func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool
// Set the basic pool parameters
pool.gasTip.Store(gasTip)

// set dumper
pool.pendingCacheDumper = func(enforceTip bool) map[common.Address][]*txpool.LazyTransaction {
return pool.pendingCache.dump(pool, gasTip, pool.gasTip.Load(), enforceTip)
}

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available(might occur when node is not
// fully synced).
Expand Down Expand Up @@ -590,48 +604,19 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
// transactions and only return those whose **effective** tip is large enough in
// the next pending execution environment.
func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
pool.mu.Lock()
defer pool.mu.Unlock()

pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs := list.Flatten()

// If the miner requests tip enforcement, cap the lists now
if enforceTips && !pool.locals.contains(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(pool.gasTip.Load(), pool.priced.urgent.baseFee) < 0 {
txs = txs[:i]
break
}
}
}
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: txs[i],
Time: txs[i].Time(),
GasFeeCap: txs[i].GasFeeCap(),
GasTipCap: txs[i].GasTipCap(),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
}
pending[addr] = lazies
}
}
return pending
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
return pool.pendingCacheDumper(enforceTips)
}

// Locals retrieves the accounts currently considered local by the pool.
func (pool *LegacyPool) Locals() []common.Address {
pool.mu.Lock()
defer pool.mu.Unlock()
defer func(t0 time.Time) {
getLocalsDurationTimer.Update(time.Since(t0))
}(time.Now())

return pool.locals.flatten()
return pool.pendingCache.flattenLocals()
}

// toJournal retrieves all transactions that should be included in the journal,
Expand Down Expand Up @@ -857,7 +842,9 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
return false, txpool.ErrReplaceUnderpriced
}
// New transaction is better, replace old one
pool.pendingCache.add([]*types.Transaction{tx}, pool.signer)
if old != nil {
pool.pendingCache.del([]*types.Transaction{old}, pool.signer)
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
Expand All @@ -881,6 +868,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
if local && !pool.locals.contains(from) {
log.Info("Setting new local account", "address", from)
pool.locals.add(from)
pool.pendingCache.markLocal(from)
pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
}
if isLocal {
Expand Down Expand Up @@ -989,10 +977,12 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
return false
}
// Otherwise discard any previous transaction and mark this
pool.pendingCache.add([]*types.Transaction{tx}, pool.signer)
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
pool.pendingCache.del([]*types.Transaction{old}, pool.signer)
} else {
// Nothing was replaced, bump the pending counter
pendingGauge.Inc(1)
Expand Down Expand Up @@ -1213,6 +1203,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
// Update the account nonce if needed
pool.pendingNonces.setIfLower(addr, tx.Nonce())
// Reduce the pending counter
pool.pendingCache.del(append(invalids, tx), pool.signer)
pendingGauge.Dec(int64(1 + len(invalids)))
return 1 + len(invalids)
}
Expand Down Expand Up @@ -1374,15 +1365,20 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
var t0 = time.Now()
if reset != nil {
pool.demoteUnexecutables(demoteAddrs)
var pendingBaseFee = pool.priced.urgent.baseFee
demoteDurationTimer.Update(time.Since(t0))
if reset.newHead != nil {
if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
pendingBaseFee := eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1)
pendingBaseFee = eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1)
pool.priced.SetBaseFee(pendingBaseFee)
} else {
pool.priced.Reheap()
}
}
gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee
pool.pendingCacheDumper = func(enforceTip bool) map[common.Address][]*txpool.LazyTransaction {
return pool.pendingCache.dump(pool, gasTip, baseFee, enforceTip)
}
// Update all accounts to the latest known pending nonce
nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
Expand Down Expand Up @@ -1658,6 +1654,7 @@ func (pool *LegacyPool) truncatePending() {
}
// Gradually drop transactions from offenders
offenders := []common.Address{}
var dropPendingCache []*types.Transaction
for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address
offender, _ := spammers.Pop()
Expand All @@ -1684,6 +1681,7 @@ func (pool *LegacyPool) truncatePending() {
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pool.priced.Removed(len(caps))
dropPendingCache = append(dropPendingCache, caps...)
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(offenders[i]) {
localGauge.Dec(int64(len(caps)))
Expand All @@ -1710,6 +1708,7 @@ func (pool *LegacyPool) truncatePending() {
pool.pendingNonces.setIfLower(addr, tx.Nonce())
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
dropPendingCache = append(dropPendingCache, caps...)
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(addr) {
Expand All @@ -1719,6 +1718,7 @@ func (pool *LegacyPool) truncatePending() {
}
}
}
pool.pendingCache.del(dropPendingCache, pool.signer)
pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
}

Expand Down Expand Up @@ -1790,6 +1790,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
if list == nil {
continue
}
var dropPendingCache []*types.Transaction
nonce := pool.currentState.GetNonce(addr)

// Drop all transactions that are deemed too old (low nonce)
Expand Down Expand Up @@ -1823,6 +1824,9 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
}
dropPendingCache = append(dropPendingCache, olds...)
dropPendingCache = append(dropPendingCache, invalids...)
dropPendingCache = append(dropPendingCache, drops...)
pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
Expand All @@ -1837,6 +1841,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
}
dropPendingCache = append(dropPendingCache, gapped...)
pendingGauge.Dec(int64(len(gapped)))
}
// Delete the entire pending entry if it became empty.
Expand All @@ -1846,6 +1851,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
pool.reserve(addr, false)
}
}
pool.pendingCache.del(dropPendingCache, pool.signer)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/txpool/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ type ValidationOptionsWithState struct {
// rules without duplicating code and running the risk of missed updates.
func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, opts *ValidationOptionsWithState) error {
// Ensure the transaction adheres to nonce ordering
from, err := signer.Sender(tx) // already validated (and cached), but cleaner to check
from, err := types.Sender(signer, tx) // reuse the sender cache
if err != nil {
log.Error("Transaction sender recovery failed", "err", err)
return err
Expand Down
Loading

0 comments on commit bc67f16

Please sign in to comment.