Skip to content

Commit

Permalink
txpool pending cache dump when needed by miner
Browse files Browse the repository at this point in the history
  • Loading branch information
andyzhang2023 committed May 23, 2024
1 parent d26ba44 commit ed0ffc0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 38 deletions.
33 changes: 8 additions & 25 deletions core/txpool/legacypool/cache_for_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"math/big"
"sort"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
Expand All @@ -19,23 +18,17 @@ var (

// copy of pending transactions
type cacheForMiner struct {
txLock sync.Mutex
pending map[common.Address]map[*types.Transaction]struct{}
locals map[common.Address]bool
pendingWithoutTips atomic.Value
pendingWithTips atomic.Value
addrLock sync.Mutex
txLock sync.Mutex
pending map[common.Address]map[*types.Transaction]struct{}
locals map[common.Address]bool
addrLock sync.Mutex
}

func newCacheForMiner() *cacheForMiner {
cm := &cacheForMiner{
return &cacheForMiner{
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
}
lazyPendingWithTips, lazyPendingWithoutTips := make(map[common.Address][]*txpool.LazyTransaction), make(map[common.Address][]*txpool.LazyTransaction)
cm.pendingWithTips.Store(lazyPendingWithTips)
cm.pendingWithoutTips.Store(lazyPendingWithoutTips)
return cm
}

func (pc *cacheForMiner) add(txs types.Transactions, signer types.Signer) {
Expand Down Expand Up @@ -76,7 +69,7 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) {
}
}

func (pc *cacheForMiner) dump(pool txpool.LazyResolver, gasPrice, baseFee *big.Int) {
func (pc *cacheForMiner) dump(pool txpool.LazyResolver, gasPrice, baseFee *big.Int) (map[common.Address][]*txpool.LazyTransaction, map[common.Address][]*txpool.LazyTransaction) {
pending := make(map[common.Address]types.Transactions)
pc.txLock.Lock()
for addr, txlist := range pc.pending {
Expand Down Expand Up @@ -107,6 +100,7 @@ func (pc *cacheForMiner) dump(pool txpool.LazyResolver, gasPrice, baseFee *big.I
}

// convert into LazyTransaction
//lazyPendingWithTips, lazyPendingWithoutTips := make(map[common.Address][]*txpool.LazyTransaction), make(map[common.Address][]*txpool.LazyTransaction)
lazyPendingWithTips, lazyPendingWithoutTips := make(map[common.Address][]*txpool.LazyTransaction), make(map[common.Address][]*txpool.LazyTransaction)
for addr, txs := range pending {
for i, tx := range txs {
Expand All @@ -126,18 +120,7 @@ func (pc *cacheForMiner) dump(pool txpool.LazyResolver, gasPrice, baseFee *big.I
}
}
}

// store pending
pc.pendingWithTips.Store(lazyPendingWithTips)
pc.pendingWithoutTips.Store(lazyPendingWithoutTips)
}

func (pc *cacheForMiner) pendingTxs(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
if enforceTips {
return pc.pendingWithTips.Load().(map[common.Address][]*txpool.LazyTransaction)
} else {
return pc.pendingWithoutTips.Load().(map[common.Address][]*txpool.LazyTransaction)
}
return lazyPendingWithTips, lazyPendingWithoutTips
}

func (pc *cacheForMiner) markLocal(addr common.Address) {
Expand Down
30 changes: 17 additions & 13 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ const (

// txReannoMaxNum is the maximum number of transactions a reannounce action can include.
txReannoMaxNum = 1024

// minPendingCacheDuration is the minimum duration between two pending cache updates.
minPendingCacheDuration = 250 * time.Millisecond
)

var (
Expand Down Expand Up @@ -253,7 +250,8 @@ type LegacyPool struct {
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price

pendingCache *cacheForMiner //pending list cache for miner
pendingCacheDumper func() (map[common.Address][]*txpool.LazyTransaction, map[common.Address][]*txpool.LazyTransaction)
pendingCache *cacheForMiner //pending list cache for miner

reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
Expand All @@ -266,8 +264,6 @@ type LegacyPool struct {
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

l1CostFn txpool.L1CostFunc // To apply L1 costs as rollup, optional field, may be nil.

lastPendingCacheTime time.Time // Last time pending cache was updated
}

type txpoolResetRequest struct {
Expand Down Expand Up @@ -334,6 +330,11 @@ func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool
// Set the basic pool parameters
pool.gasTip.Store(gasTip)

// set dumper
pool.pendingCacheDumper = func() (map[common.Address][]*txpool.LazyTransaction, map[common.Address][]*txpool.LazyTransaction) {
return pool.pendingCache.dump(pool, gasTip, pool.gasTip.Load())
}

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available(might occur when node is not
// fully synced).
Expand Down Expand Up @@ -605,7 +606,12 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.L
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
return pool.pendingCache.pendingTxs(enforceTips)
txsWithTips, txsWithoutTips := pool.pendingCacheDumper()
if enforceTips {
return txsWithTips
} else {
return txsWithoutTips
}
}

// Locals retrieves the accounts currently considered local by the pool.
Expand Down Expand Up @@ -1372,7 +1378,10 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
pool.priced.Reheap()
}
}
go pool.pendingCache.dump(pool, pool.gasTip.Load(), pendingBaseFee)
gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee
pool.pendingCacheDumper = func() (map[common.Address][]*txpool.LazyTransaction, map[common.Address][]*txpool.LazyTransaction) {
return pool.pendingCache.dump(pool, gasTip, baseFee)
}
// Update all accounts to the latest known pending nonce
nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
Expand All @@ -1381,11 +1390,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}
pool.pendingNonces.setAll(nonces)
}
// keep updating pending cache in a minimal frequency
if reset == nil && time.Since(pool.lastPendingCacheTime) > minPendingCacheDuration {
pool.lastPendingCacheTime = time.Now()
go pool.pendingCache.dump(pool, pool.gasTip.Load(), pool.priced.urgent.baseFee)
}
// Ensure pool.queue and pool.pending sizes stay within the configured limits.
pool.truncatePending()
pool.truncateQueue()
Expand Down

0 comments on commit ed0ffc0

Please sign in to comment.