Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve Pending() of txpool to reduce the latency when miner worker committing transactions #85

Merged
136 changes: 136 additions & 0 deletions core/txpool/legacypool/cache_for_miner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package legacypool

import (
"math/big"
"sort"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
)

var (
pendingCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/pending/cache", nil)
localCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/local/cache", nil)
)

// copy of pending transactions
type cacheForMiner struct {
txLock sync.Mutex
pending map[common.Address]map[*types.Transaction]struct{}
locals map[common.Address]bool
addrLock sync.Mutex
}

func newCacheForMiner() *cacheForMiner {
return &cacheForMiner{
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
}
}

func (pc *cacheForMiner) add(txs types.Transactions, signer types.Signer) {
andyzhang2023 marked this conversation as resolved.
Show resolved Hide resolved
if len(txs) == 0 {
return
}
pc.txLock.Lock()
defer pc.txLock.Unlock()
pendingCacheGauge.Inc(int64(len(txs)))
for _, tx := range txs {
addr, _ := types.Sender(signer, tx)
slots, ok := pc.pending[addr]
if !ok {
slots = make(map[*types.Transaction]struct{})
pc.pending[addr] = slots
}
slots[tx] = struct{}{}
}
}

func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) {
if len(txs) == 0 {
return
}
pc.txLock.Lock()
defer pc.txLock.Unlock()
for _, tx := range txs {
addr, _ := types.Sender(signer, tx)
slots, ok := pc.pending[addr]
if !ok {
continue
}
pendingCacheGauge.Dec(1)
delete(slots, tx)
if len(slots) == 0 {
delete(pc.pending, addr)
}
}
}

func (pc *cacheForMiner) dump(pool txpool.LazyResolver, gasPrice, baseFee *big.Int, enforceTip bool) map[common.Address][]*txpool.LazyTransaction {
pending := make(map[common.Address]types.Transactions)
pc.txLock.Lock()
for addr, txlist := range pc.pending {
pending[addr] = make(types.Transactions, 0, len(txlist))
for tx := range txlist {
pending[addr] = append(pending[addr], tx)
}
}
pc.txLock.Unlock()
pendingLazy := make(map[common.Address][]*txpool.LazyTransaction)
for addr, txs := range pending {
// sorted by nonce
sort.Sort(types.TxByNonce(txs))
// If the miner requests tip enforcement, cap the lists now
if enforceTip && !pc.isLocal(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 {
txs = txs[:i]
break
}
}
}
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i, tx := range txs {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}
}
pendingLazy[addr] = lazies
}
}
return pendingLazy
}

func (pc *cacheForMiner) markLocal(addr common.Address) {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
localCacheGauge.Inc(1)
pc.locals[addr] = true
}

func (pc *cacheForMiner) isLocal(addr common.Address) bool {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
return pc.locals[addr]
}

func (pc *cacheForMiner) flattenLocals() []common.Address {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
locals := make([]common.Address, 0, len(pc.locals))
for addr := range pc.locals {
locals = append(locals, addr)
}
return locals
}
82 changes: 44 additions & 38 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ var (

staledMeter = metrics.NewRegisteredMeter("txpool/staled/count", nil) // staled transactions

// duration of miner worker fetching all pending transactions
getPendingDurationTimer = metrics.NewRegisteredTimer("txpool/getpending/time", nil)
// duration of miner worker fetching all local addresses
getLocalsDurationTimer = metrics.NewRegisteredTimer("txpool/getlocals/time", nil)
// demote metrics
// demoteDuration measures how long time a demotion takes.
demoteDurationTimer = metrics.NewRegisteredTimer("txpool/demote/duration", nil)
Expand Down Expand Up @@ -247,6 +251,9 @@ type LegacyPool struct {
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price

pendingCacheDumper func(enforceTip bool) map[common.Address][]*txpool.LazyTransaction
pendingCache *cacheForMiner //pending list cache for miner

reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
Expand Down Expand Up @@ -286,11 +293,13 @@ func New(config Config, chain BlockChain) *LegacyPool {
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
pendingCache: newCacheForMiner(),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
pool.locals.add(addr)
pool.pendingCache.markLocal(addr)
}
pool.priced = newPricedList(pool.all)

Expand Down Expand Up @@ -322,6 +331,11 @@ func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool
// Set the basic pool parameters
pool.gasTip.Store(gasTip)

// set dumper
pool.pendingCacheDumper = func(enforceTip bool) map[common.Address][]*txpool.LazyTransaction {
return pool.pendingCache.dump(pool, gasTip, pool.gasTip.Load(), enforceTip)
}

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available(might occur when node is not
// fully synced).
Expand Down Expand Up @@ -590,48 +604,19 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
// transactions and only return those whose **effective** tip is large enough in
// the next pending execution environment.
func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
pool.mu.Lock()
defer pool.mu.Unlock()

pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs := list.Flatten()

// If the miner requests tip enforcement, cap the lists now
if enforceTips && !pool.locals.contains(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(pool.gasTip.Load(), pool.priced.urgent.baseFee) < 0 {
txs = txs[:i]
break
}
}
}
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: txs[i],
Time: txs[i].Time(),
GasFeeCap: txs[i].GasFeeCap(),
GasTipCap: txs[i].GasTipCap(),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
}
pending[addr] = lazies
}
}
return pending
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
return pool.pendingCacheDumper(enforceTips)
}

// Locals retrieves the accounts currently considered local by the pool.
func (pool *LegacyPool) Locals() []common.Address {
pool.mu.Lock()
defer pool.mu.Unlock()
defer func(t0 time.Time) {
getLocalsDurationTimer.Update(time.Since(t0))
}(time.Now())

return pool.locals.flatten()
return pool.pendingCache.flattenLocals()
}

// toJournal retrieves all transactions that should be included in the journal,
Expand Down Expand Up @@ -857,7 +842,9 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
return false, txpool.ErrReplaceUnderpriced
}
// New transaction is better, replace old one
pool.pendingCache.add([]*types.Transaction{tx}, pool.signer)
if old != nil {
pool.pendingCache.del([]*types.Transaction{old}, pool.signer)
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
Expand All @@ -881,6 +868,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
if local && !pool.locals.contains(from) {
log.Info("Setting new local account", "address", from)
pool.locals.add(from)
pool.pendingCache.markLocal(from)
pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
}
if isLocal {
Expand Down Expand Up @@ -989,10 +977,12 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
return false
}
// Otherwise discard any previous transaction and mark this
pool.pendingCache.add([]*types.Transaction{tx}, pool.signer)
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
pool.pendingCache.del([]*types.Transaction{old}, pool.signer)
} else {
// Nothing was replaced, bump the pending counter
pendingGauge.Inc(1)
Expand Down Expand Up @@ -1213,6 +1203,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
// Update the account nonce if needed
pool.pendingNonces.setIfLower(addr, tx.Nonce())
// Reduce the pending counter
pool.pendingCache.del(append(invalids, tx), pool.signer)
pendingGauge.Dec(int64(1 + len(invalids)))
return 1 + len(invalids)
}
Expand Down Expand Up @@ -1374,15 +1365,20 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
var t0 = time.Now()
if reset != nil {
pool.demoteUnexecutables(demoteAddrs)
var pendingBaseFee = pool.priced.urgent.baseFee
demoteDurationTimer.Update(time.Since(t0))
if reset.newHead != nil {
if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
pendingBaseFee := eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1)
pendingBaseFee = eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1)
pool.priced.SetBaseFee(pendingBaseFee)
} else {
pool.priced.Reheap()
}
}
gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee
pool.pendingCacheDumper = func(enforceTip bool) map[common.Address][]*txpool.LazyTransaction {
return pool.pendingCache.dump(pool, gasTip, baseFee, enforceTip)
}
// Update all accounts to the latest known pending nonce
nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
Expand Down Expand Up @@ -1658,6 +1654,7 @@ func (pool *LegacyPool) truncatePending() {
}
// Gradually drop transactions from offenders
offenders := []common.Address{}
var dropPendingCache []*types.Transaction
for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address
offender, _ := spammers.Pop()
Expand All @@ -1684,6 +1681,7 @@ func (pool *LegacyPool) truncatePending() {
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pool.priced.Removed(len(caps))
dropPendingCache = append(dropPendingCache, caps...)
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(offenders[i]) {
localGauge.Dec(int64(len(caps)))
Expand All @@ -1710,6 +1708,7 @@ func (pool *LegacyPool) truncatePending() {
pool.pendingNonces.setIfLower(addr, tx.Nonce())
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
dropPendingCache = append(dropPendingCache, caps...)
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(addr) {
Expand All @@ -1719,6 +1718,7 @@ func (pool *LegacyPool) truncatePending() {
}
}
}
pool.pendingCache.del(dropPendingCache, pool.signer)
pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
}

Expand Down Expand Up @@ -1790,6 +1790,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
if list == nil {
continue
}
var dropPendingCache []*types.Transaction
nonce := pool.currentState.GetNonce(addr)

// Drop all transactions that are deemed too old (low nonce)
Expand Down Expand Up @@ -1823,6 +1824,9 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
}
dropPendingCache = append(dropPendingCache, olds...)
dropPendingCache = append(dropPendingCache, invalids...)
dropPendingCache = append(dropPendingCache, drops...)
pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
Expand All @@ -1837,6 +1841,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
}
dropPendingCache = append(dropPendingCache, gapped...)
pendingGauge.Dec(int64(len(gapped)))
}
// Delete the entire pending entry if it became empty.
Expand All @@ -1846,6 +1851,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
pool.reserve(addr, false)
}
}
pool.pendingCache.del(dropPendingCache, pool.signer)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/txpool/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ type ValidationOptionsWithState struct {
// rules without duplicating code and running the risk of missed updates.
func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, opts *ValidationOptionsWithState) error {
// Ensure the transaction adheres to nonce ordering
from, err := signer.Sender(tx) // already validated (and cached), but cleaner to check
from, err := types.Sender(signer, tx) // reuse the sender cache
if err != nil {
log.Error("Transaction sender recovery failed", "err", err)
return err
Expand Down
Loading
Loading