Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prepare for v0.5.1 release #197

Merged
merged 8 commits into from
Oct 15, 2024
2 changes: 1 addition & 1 deletion .github/workflows/docker-release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ jobs:
provenance: false
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

platforms: linux/amd64,linux/arm64
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
# Changelog

## v0.5.1

This release includes various optimizations and improvements to transaction processing, CI support, and network infrastructure.

This is a minor release for opBNB Mainnet and Testnet.
Upgrading is optional.

### What's Changed

* fix(ci): support building arm64 architecture (#165)
* optimization: enqueue transactions in parallel from p2p (#173)
* optimization: enlarge p2p buffer size and add some metrics for performance monitor (#171)
* optimization: txpool pricedlist only reheap when pool is full (#175)
* optimization: txpool pending cache improvement (#177)
* chore: add bootnode in us region(testnet) (#194)

### Docker Images
ghcr.io/bnb-chain/op-geth:v0.5.1

**Full Changelog**: https://github.com/bnb-chain/op-geth/compare/v0.5.0...v0.5.1

## v0.5.0
This release includes code merging from the upstream version v1.101315.2 along with several fixs and improvements. Fjord fork from upstream is included.
Fjord fork is scheduled to launch on the opBNB:
Expand Down
3 changes: 3 additions & 0 deletions accounts/abi/bind/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestWaitDeployed(t *testing.T) {

// Send and mine the transaction.
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()

select {
Expand Down Expand Up @@ -117,6 +118,7 @@ func TestWaitDeployedCornerCases(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()
notContractCreation := errors.New("tx is not contract creation")
if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() {
Expand All @@ -135,5 +137,6 @@ func TestWaitDeployedCornerCases(t *testing.T) {
}()

backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
cancel()
}
55 changes: 50 additions & 5 deletions core/txpool/legacypool/cache_for_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
)

var (
Expand All @@ -20,12 +22,18 @@ type cacheForMiner struct {
pending map[common.Address]map[*types.Transaction]struct{}
locals map[common.Address]bool
addrLock sync.Mutex

allCache map[common.Address][]*txpool.LazyTransaction
filteredCache map[common.Address][]*txpool.LazyTransaction
cacheLock sync.Mutex
}

func newCacheForMiner() *cacheForMiner {
return &cacheForMiner{
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
allCache: make(map[common.Address][]*txpool.LazyTransaction),
filteredCache: make(map[common.Address][]*txpool.LazyTransaction),
}
}

Expand Down Expand Up @@ -67,8 +75,9 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) {
}
}

func (pc *cacheForMiner) dump() map[common.Address]types.Transactions {
func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) {
pending := make(map[common.Address]types.Transactions)

pc.txLock.Lock()
for addr, txlist := range pc.pending {
pending[addr] = make(types.Transactions, 0, len(txlist))
Expand All @@ -77,10 +86,46 @@ func (pc *cacheForMiner) dump() map[common.Address]types.Transactions {
}
}
pc.txLock.Unlock()
for _, txs := range pending {

// convert pending to lazyTransactions
filteredLazy := make(map[common.Address][]*txpool.LazyTransaction)
allLazy := make(map[common.Address][]*txpool.LazyTransaction)
for addr, txs := range pending {
// sorted by nonce
sort.Sort(types.TxByNonce(txs))
filterd := filter(txs, addr)
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i, tx := range txs {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}
}
allLazy[addr] = lazies
filteredLazy[addr] = lazies[:len(filterd)]
}
}

pc.cacheLock.Lock()
pc.filteredCache = filteredLazy
pc.allCache = allLazy
pc.cacheLock.Unlock()
}

func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction {
pc.cacheLock.Lock()
pending := pc.allCache
if filtered {
pending = pc.filteredCache
}
pc.cacheLock.Unlock()
return pending
}

Expand All @@ -91,7 +136,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) {
pc.locals[addr] = true
}

func (pc *cacheForMiner) isLocal(addr common.Address) bool {
func (pc *cacheForMiner) IsLocal(addr common.Address) bool {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
return pc.locals[addr]
Expand Down
102 changes: 60 additions & 42 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
// Set the basic pool parameters
pool.gasTip.Store(uint256.NewInt(gasTip))

// set dumper
pool.pendingCache.sync2cache(pool, pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee))

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available (might occur when node is not
// fully synced).
Expand Down Expand Up @@ -383,9 +386,27 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
}
pool.wg.Add(1)
go pool.loop()
go pool.loopOfSync()
return nil
}

func (pool *LegacyPool) loopOfSync() {
ticker := time.NewTicker(200 * time.Millisecond)
for {
select {
case <-pool.reorgShutdownCh:
return
case <-ticker.C:
gasTip := pool.gasTip.Load()
currHead := pool.currentHead.Load()
if gasTip == nil || currHead == nil {
continue
}
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), currHead.BaseFee))
}
}
}

// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
Expand Down Expand Up @@ -624,57 +645,35 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
// TODO need to confirm
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
empty := txpool.PendingFilter{}
if filter == empty {
// return all pending transactions, no filtering
return pool.pendingCache.dump(false)
}
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyBlobTxs {
return nil
}
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
// It is a bit tricky here, we don't do the filtering here.
return pool.pendingCache.dump(true)
}

// Convert the new uint256.Int types to the old big.Int ones used by the legacy pool
var (
minTipBig *big.Int
baseFeeBig *big.Int
)
if filter.MinTip != nil {
minTipBig = filter.MinTip.ToBig()
}
if filter.BaseFee != nil {
baseFeeBig = filter.BaseFee.ToBig()
}
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, txs := range pool.pendingCache.dump() {

// If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) {
func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions {
return func(txs types.Transactions, addr common.Address) types.Transactions {
if !pool.pendingCache.IsLocal(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 {
txs = txs[:i]
break
}
}
}
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: txs[i],
Time: txs[i].Time(),
GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()),
GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
}
pending[addr] = lazies
}
return txs
}
return pending
}

// Locals retrieves the accounts currently considered local by the pool.
Expand Down Expand Up @@ -840,6 +839,16 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
currHead := pool.currentHead.Load()
if currHead != nil && currHead.BaseFee != nil && pool.priced.NeedReheap(currHead) {
if pool.chainconfig.IsLondon(new(big.Int).Add(currHead.Number, big.NewInt(1))) {
baseFee := eip1559.CalcBaseFee(pool.chainconfig, currHead, currHead.Time+1)
pool.priced.SetBaseFee(baseFee)
}
pool.priced.Reheap()
pool.priced.currHead = currHead
}

// If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
Expand Down Expand Up @@ -1110,7 +1119,9 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
// to the add is finished. Only use this during tests for determinism!
func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error {
defer func(t0 time.Time) {
addTimer.UpdateSince(t0)
if len(txs) > 0 {
addTimer.Update(time.Since(t0) / time.Duration(len(txs)))
}
}(time.Now())
// Do not treat as local if local transactions have been disabled
local = local && !pool.config.NoLocals
Expand Down Expand Up @@ -1147,7 +1158,9 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
pool.mu.Lock()
t0 := time.Now()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
addWithLockTimer.UpdateSince(t0)
if len(news) > 0 {
addWithLockTimer.Update(time.Since(t0) / time.Duration(len(news)))
}
pool.mu.Unlock()

var nilSlot = 0
Expand Down Expand Up @@ -1403,6 +1416,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
reorgDurationTimer.Update(time.Since(t0))
if reset != nil {
reorgresetTimer.UpdateSince(t0)
if reset.newHead != nil {
log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64())
}
}
}(time.Now())
defer close(done)
Expand Down Expand Up @@ -1451,10 +1467,12 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
pendingBaseFee = eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1)
pool.priced.SetBaseFee(pendingBaseFee)
} else {
pool.priced.Reheap()
}
}
gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee
go func() {
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), baseFee))
}()
// Update all accounts to the latest known pending nonce
nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
Expand Down
1 change: 1 addition & 0 deletions core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2073,6 +2073,7 @@ func TestDualHeapEviction(t *testing.T) {
add(false)
for baseFee = 0; baseFee <= 1000; baseFee += 100 {
pool.priced.SetBaseFee(big.NewInt(int64(baseFee)))
pool.priced.Reheap()
add(true)
check(highCap, "fee cap")
add(false)
Expand Down
6 changes: 5 additions & 1 deletion core/txpool/legacypool/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ func (h *priceHeap) Pop() interface{} {
// better candidates for inclusion while in other cases (at the top of the baseFee peak)
// the floating heap is better. When baseFee is decreasing they behave similarly.
type pricedList struct {
currHead *types.Header // Current block header for effective tip calculation
// Number of stale price points to (re-heap trigger).
stales atomic.Int64

Expand Down Expand Up @@ -667,6 +668,10 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
return drop, true
}

func (l *pricedList) NeedReheap(currHead *types.Header) bool {
return l.currHead == nil || currHead == nil || currHead.Hash().Cmp(l.currHead.Hash()) != 0
}

// Reheap forcibly rebuilds the heap based on the current remote transaction set.
func (l *pricedList) Reheap() {
l.reheapMu.Lock()
Expand Down Expand Up @@ -698,5 +703,4 @@ func (l *pricedList) Reheap() {
// necessary to call right before SetBaseFee when processing a new block.
func (l *pricedList) SetBaseFee(baseFee *big.Int) {
l.urgent.baseFee = baseFee
l.Reheap()
}
9 changes: 9 additions & 0 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,15 @@ func (f *TxFetcher) Stop() {
close(f.quit)
}

func (f *TxFetcher) IsWorking() (bool, error) {
select {
case <-f.quit:
return false, errTerminated
default:
return true, nil
}
}

func (f *TxFetcher) loop() {
var (
waitTimer = new(mclock.Timer)
Expand Down
Loading
Loading