diff --git a/core/txpool/legacypool/cache_for_miner.go b/core/txpool/legacypool/cache_for_miner.go new file mode 100644 index 0000000000..b02c06d832 --- /dev/null +++ b/core/txpool/legacypool/cache_for_miner.go @@ -0,0 +1,136 @@ +package legacypool + +import ( + "math/big" + "sort" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/metrics" +) + +var ( + pendingCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/pending/cache", nil) + localCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/local/cache", nil) +) + +// copy of pending transactions +type cacheForMiner struct { + txLock sync.Mutex + pending map[common.Address]map[*types.Transaction]struct{} + locals map[common.Address]bool + addrLock sync.Mutex +} + +func newCacheForMiner() *cacheForMiner { + return &cacheForMiner{ + pending: make(map[common.Address]map[*types.Transaction]struct{}), + locals: make(map[common.Address]bool), + } +} + +func (pc *cacheForMiner) add(txs types.Transactions, signer types.Signer) { + if len(txs) == 0 { + return + } + pc.txLock.Lock() + defer pc.txLock.Unlock() + pendingCacheGauge.Inc(int64(len(txs))) + for _, tx := range txs { + addr, _ := types.Sender(signer, tx) + slots, ok := pc.pending[addr] + if !ok { + slots = make(map[*types.Transaction]struct{}) + pc.pending[addr] = slots + } + slots[tx] = struct{}{} + } +} + +func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) { + if len(txs) == 0 { + return + } + pc.txLock.Lock() + defer pc.txLock.Unlock() + for _, tx := range txs { + addr, _ := types.Sender(signer, tx) + slots, ok := pc.pending[addr] + if !ok { + continue + } + pendingCacheGauge.Dec(1) + delete(slots, tx) + if len(slots) == 0 { + delete(pc.pending, addr) + } + } +} + +func (pc *cacheForMiner) dump(pool txpool.LazyResolver, gasPrice, baseFee *big.Int, enforceTip bool) map[common.Address][]*txpool.LazyTransaction { + pending := make(map[common.Address]types.Transactions) + pc.txLock.Lock() + for addr, txlist := range pc.pending { + pending[addr] = make(types.Transactions, 0, len(txlist)) + for tx := range txlist { + pending[addr] = append(pending[addr], tx) + } + } + pc.txLock.Unlock() + pendingLazy := make(map[common.Address][]*txpool.LazyTransaction) + for addr, txs := range pending { + // sorted by nonce + sort.Sort(types.TxByNonce(txs)) + // If the miner requests tip enforcement, cap the lists now + if enforceTip && !pc.isLocal(addr) { + for i, tx := range txs { + if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 { + txs = txs[:i] + break + } + } + } + if len(txs) > 0 { + lazies := make([]*txpool.LazyTransaction, len(txs)) + for i, tx := range txs { + lazies[i] = &txpool.LazyTransaction{ + Pool: pool, + Hash: tx.Hash(), + Tx: tx, + Time: tx.Time(), + GasFeeCap: tx.GasFeeCap(), + GasTipCap: tx.GasTipCap(), + Gas: tx.Gas(), + BlobGas: tx.BlobGas(), + } + } + pendingLazy[addr] = lazies + } + } + return pendingLazy +} + +func (pc *cacheForMiner) markLocal(addr common.Address) { + pc.addrLock.Lock() + defer pc.addrLock.Unlock() + localCacheGauge.Inc(1) + pc.locals[addr] = true +} + +func (pc *cacheForMiner) isLocal(addr common.Address) bool { + pc.addrLock.Lock() + defer pc.addrLock.Unlock() + return pc.locals[addr] +} + +func (pc *cacheForMiner) flattenLocals() []common.Address { + pc.addrLock.Lock() + defer pc.addrLock.Unlock() + locals := make([]common.Address, 0, len(pc.locals)) + for addr := range pc.locals { + locals = append(locals, addr) + } + return locals +} diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 2a7e561671..b50dde30aa 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -107,6 +107,10 @@ var ( staledMeter = metrics.NewRegisteredMeter("txpool/staled/count", nil) // staled transactions + // duration of miner worker fetching all pending transactions + getPendingDurationTimer = metrics.NewRegisteredTimer("txpool/getpending/time", nil) + // duration of miner worker fetching all local addresses + getLocalsDurationTimer = metrics.NewRegisteredTimer("txpool/getlocals/time", nil) // demote metrics // demoteDuration measures how long time a demotion takes. demoteDurationTimer = metrics.NewRegisteredTimer("txpool/demote/duration", nil) @@ -247,6 +251,9 @@ type LegacyPool struct { all *lookup // All transactions to allow lookups priced *pricedList // All transactions sorted by price + pendingCacheDumper func(enforceTip bool) map[common.Address][]*txpool.LazyTransaction + pendingCache *cacheForMiner //pending list cache for miner + reqResetCh chan *txpoolResetRequest reqPromoteCh chan *accountSet queueTxEventCh chan *types.Transaction @@ -286,11 +293,13 @@ func New(config Config, chain BlockChain) *LegacyPool { reorgDoneCh: make(chan chan struct{}), reorgShutdownCh: make(chan struct{}), initDoneCh: make(chan struct{}), + pendingCache: newCacheForMiner(), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { log.Info("Setting new local account", "address", addr) pool.locals.add(addr) + pool.pendingCache.markLocal(addr) } pool.priced = newPricedList(pool.all) @@ -322,6 +331,11 @@ func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool // Set the basic pool parameters pool.gasTip.Store(gasTip) + // set dumper + pool.pendingCacheDumper = func(enforceTip bool) map[common.Address][]*txpool.LazyTransaction { + return pool.pendingCache.dump(pool, gasTip, pool.gasTip.Load(), enforceTip) + } + // Initialize the state with head block, or fallback to empty one in // case the head state is not available(might occur when node is not // fully synced). @@ -590,48 +604,19 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, // transactions and only return those whose **effective** tip is large enough in // the next pending execution environment. func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction { - pool.mu.Lock() - defer pool.mu.Unlock() - - pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) - for addr, list := range pool.pending { - txs := list.Flatten() - - // If the miner requests tip enforcement, cap the lists now - if enforceTips && !pool.locals.contains(addr) { - for i, tx := range txs { - if tx.EffectiveGasTipIntCmp(pool.gasTip.Load(), pool.priced.urgent.baseFee) < 0 { - txs = txs[:i] - break - } - } - } - if len(txs) > 0 { - lazies := make([]*txpool.LazyTransaction, len(txs)) - for i := 0; i < len(txs); i++ { - lazies[i] = &txpool.LazyTransaction{ - Pool: pool, - Hash: txs[i].Hash(), - Tx: txs[i], - Time: txs[i].Time(), - GasFeeCap: txs[i].GasFeeCap(), - GasTipCap: txs[i].GasTipCap(), - Gas: txs[i].Gas(), - BlobGas: txs[i].BlobGas(), - } - } - pending[addr] = lazies - } - } - return pending + defer func(t0 time.Time) { + getPendingDurationTimer.Update(time.Since(t0)) + }(time.Now()) + return pool.pendingCacheDumper(enforceTips) } // Locals retrieves the accounts currently considered local by the pool. func (pool *LegacyPool) Locals() []common.Address { - pool.mu.Lock() - defer pool.mu.Unlock() + defer func(t0 time.Time) { + getLocalsDurationTimer.Update(time.Since(t0)) + }(time.Now()) - return pool.locals.flatten() + return pool.pendingCache.flattenLocals() } // toJournal retrieves all transactions that should be included in the journal, @@ -857,7 +842,9 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e return false, txpool.ErrReplaceUnderpriced } // New transaction is better, replace old one + pool.pendingCache.add([]*types.Transaction{tx}, pool.signer) if old != nil { + pool.pendingCache.del([]*types.Transaction{old}, pool.signer) pool.all.Remove(old.Hash()) pool.priced.Removed(1) pendingReplaceMeter.Mark(1) @@ -881,6 +868,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e if local && !pool.locals.contains(from) { log.Info("Setting new local account", "address", from) pool.locals.add(from) + pool.pendingCache.markLocal(from) pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time. } if isLocal { @@ -989,10 +977,12 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ return false } // Otherwise discard any previous transaction and mark this + pool.pendingCache.add([]*types.Transaction{tx}, pool.signer) if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed(1) pendingReplaceMeter.Mark(1) + pool.pendingCache.del([]*types.Transaction{old}, pool.signer) } else { // Nothing was replaced, bump the pending counter pendingGauge.Inc(1) @@ -1213,6 +1203,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo // Update the account nonce if needed pool.pendingNonces.setIfLower(addr, tx.Nonce()) // Reduce the pending counter + pool.pendingCache.del(append(invalids, tx), pool.signer) pendingGauge.Dec(int64(1 + len(invalids))) return 1 + len(invalids) } @@ -1374,15 +1365,20 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, var t0 = time.Now() if reset != nil { pool.demoteUnexecutables(demoteAddrs) + var pendingBaseFee = pool.priced.urgent.baseFee demoteDurationTimer.Update(time.Since(t0)) if reset.newHead != nil { if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { - pendingBaseFee := eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1) + pendingBaseFee = eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1) pool.priced.SetBaseFee(pendingBaseFee) } else { pool.priced.Reheap() } } + gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee + pool.pendingCacheDumper = func(enforceTip bool) map[common.Address][]*txpool.LazyTransaction { + return pool.pendingCache.dump(pool, gasTip, baseFee, enforceTip) + } // Update all accounts to the latest known pending nonce nonces := make(map[common.Address]uint64, len(pool.pending)) for addr, list := range pool.pending { @@ -1658,6 +1654,7 @@ func (pool *LegacyPool) truncatePending() { } // Gradually drop transactions from offenders offenders := []common.Address{} + var dropPendingCache []*types.Transaction for pending > pool.config.GlobalSlots && !spammers.Empty() { // Retrieve the next offender if not local address offender, _ := spammers.Pop() @@ -1684,6 +1681,7 @@ func (pool *LegacyPool) truncatePending() { log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pool.priced.Removed(len(caps)) + dropPendingCache = append(dropPendingCache, caps...) pendingGauge.Dec(int64(len(caps))) if pool.locals.contains(offenders[i]) { localGauge.Dec(int64(len(caps))) @@ -1710,6 +1708,7 @@ func (pool *LegacyPool) truncatePending() { pool.pendingNonces.setIfLower(addr, tx.Nonce()) log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } + dropPendingCache = append(dropPendingCache, caps...) pool.priced.Removed(len(caps)) pendingGauge.Dec(int64(len(caps))) if pool.locals.contains(addr) { @@ -1719,6 +1718,7 @@ func (pool *LegacyPool) truncatePending() { } } } + pool.pendingCache.del(dropPendingCache, pool.signer) pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending)) } @@ -1790,6 +1790,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { if list == nil { continue } + var dropPendingCache []*types.Transaction nonce := pool.currentState.GetNonce(addr) // Drop all transactions that are deemed too old (low nonce) @@ -1823,6 +1824,9 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { // Internal shuffle shouldn't touch the lookup set. pool.enqueueTx(hash, tx, false, false) } + dropPendingCache = append(dropPendingCache, olds...) + dropPendingCache = append(dropPendingCache, invalids...) + dropPendingCache = append(dropPendingCache, drops...) pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) if pool.locals.contains(addr) { localGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) @@ -1837,6 +1841,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { // Internal shuffle shouldn't touch the lookup set. pool.enqueueTx(hash, tx, false, false) } + dropPendingCache = append(dropPendingCache, gapped...) pendingGauge.Dec(int64(len(gapped))) } // Delete the entire pending entry if it became empty. @@ -1846,6 +1851,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { pool.reserve(addr, false) } } + pool.pendingCache.del(dropPendingCache, pool.signer) } } diff --git a/core/txpool/validation.go b/core/txpool/validation.go index 90561eaa4c..30ece38b78 100644 --- a/core/txpool/validation.go +++ b/core/txpool/validation.go @@ -238,7 +238,7 @@ type ValidationOptionsWithState struct { // rules without duplicating code and running the risk of missed updates. func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, opts *ValidationOptionsWithState) error { // Ensure the transaction adheres to nonce ordering - from, err := signer.Sender(tx) // already validated (and cached), but cleaner to check + from, err := types.Sender(signer, tx) // reuse the sender cache if err != nil { log.Error("Transaction sender recovery failed", "err", err) return err diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index 28fcce838b..cbf744cd6d 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -115,6 +115,8 @@ func TestEth2AssembleBlock(t *testing.T) { t.Fatalf("error signing transaction, err=%v", err) } ethservice.TxPool().Add([]*types.Transaction{tx}, true, false) + // we wait for the tx to be promoted into pending list in the txpool + time.Sleep(500 * time.Millisecond) blockParams := engine.PayloadAttributes{ Timestamp: blocks[9].Time() + 5, } @@ -152,6 +154,8 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) { // Put the 10th block's tx in the pool and produce a new block txs := blocks[9].Transactions() api.eth.TxPool().Add(txs, false, true) + // we wait for the tx to be promoted into pending list in the txpool + time.Sleep(500 * time.Millisecond) blockParams := engine.PayloadAttributes{ Timestamp: blocks[8].Time() + 5, } @@ -192,6 +196,8 @@ func TestEth2PrepareAndGetPayload(t *testing.T) { // Put the 10th block's tx in the pool and produce a new block txs := blocks[9].Transactions() ethservice.TxPool().Add(txs, true, false) + // we wait for the txs to be promoted into pending list in the txpool + time.Sleep(500 * time.Millisecond) blockParams := engine.PayloadAttributes{ Timestamp: blocks[8].Time() + 5, } @@ -316,6 +322,9 @@ func TestEth2NewBlock(t *testing.T) { tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) ethservice.TxPool().Add([]*types.Transaction{tx}, true, false) + //we wait for the tx to be promoted into pending list in the txpool + time.Sleep(500 * time.Millisecond) + execData, err := assembleWithTransactions(api, parent.Hash(), &engine.PayloadAttributes{ Timestamp: parent.Time() + 5, }, 1) @@ -489,6 +498,8 @@ func TestFullAPI(t *testing.T) { nonce := statedb.GetNonce(testAddr) tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) ethservice.TxPool().Add([]*types.Transaction{tx}, true, false) + // we wait for the tx to be promoted into pending list in the txpool + time.Sleep(500 * time.Millisecond) } setupBlocks(t, ethservice, 10, parent, callback, nil) @@ -615,6 +626,8 @@ func TestNewPayloadOnInvalidChain(t *testing.T) { Data: logCode, }) ethservice.TxPool().Add([]*types.Transaction{tx}, false, true) + // we wait for the tx to be promoted into pending list in the txpool + time.Sleep(500 * time.Millisecond) var ( params = engine.PayloadAttributes{ Timestamp: parent.Time + 1, @@ -1286,8 +1299,8 @@ func TestNilWithdrawals(t *testing.T) { func setupBodies(t *testing.T) (*node.Node, *eth.Ethereum, []*types.Block) { genesis, blocks := generateMergeChain(10, true) // enable shanghai on the last block - time := blocks[len(blocks)-1].Header().Time + 1 - genesis.Config.ShanghaiTime = &time + blocktime := blocks[len(blocks)-1].Header().Time + 1 + genesis.Config.ShanghaiTime = &blocktime n, ethservice := startEthService(t, genesis, blocks) var ( @@ -1301,6 +1314,8 @@ func setupBodies(t *testing.T) (*node.Node, *eth.Ethereum, []*types.Block) { nonce := statedb.GetNonce(testAddr) tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) ethservice.TxPool().Add([]*types.Transaction{tx}, false, false) + // we wait for the tx to be promoted into pending list in the txpool + time.Sleep(500 * time.Millisecond) } withdrawals := make([][]*types.Withdrawal, 10) diff --git a/miner/payload_building_test.go b/miner/payload_building_test.go index ce09de635e..7eea54a25d 100644 --- a/miner/payload_building_test.go +++ b/miner/payload_building_test.go @@ -53,6 +53,8 @@ func testBuildPayload(t *testing.T, noTxPool, interrupt bool) { // definitely be visible. txs := genTxs(1, numInterruptTxs) b.txPool.Add(txs, true, false) + // we wait for the txs to be promoted + time.Sleep(500 * time.Millisecond) } timestamp := uint64(time.Now().Unix()) diff --git a/miner/worker.go b/miner/worker.go index fd9fa7e043..fade5b084f 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" ) @@ -83,6 +84,17 @@ var ( errBlockInterruptedByResolve = errors.New("payload resolution while building block") ) +var ( + txTotalMeter = metrics.NewRegisteredMeter("miner/tx/total", nil) + txSuccMeter = metrics.NewRegisteredMeter("miner/tx/succ", nil) + txErrUnknownMeter = metrics.NewRegisteredMeter("miner/tx/unknown", nil) + txErrNoncetoolowMeter = metrics.NewRegisteredMeter("miner/tx/err/noncetoolow", nil) + txErrNotenoughgasMeter = metrics.NewRegisteredMeter("miner/tx/err/notenoughgas", nil) + txErrNotenoughblobgasMeter = metrics.NewRegisteredMeter("miner/tx/err/notenoughblobgas", nil) + txErrEvitedMeter = metrics.NewRegisteredMeter("miner/tx/evited", nil) + txErrReplayMeter = metrics.NewRegisteredMeter("miner/tx/replay", nil) +) + // environment is the worker's current environment and holds all // information of the sealing block generation. type environment struct { @@ -872,15 +884,18 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn if ltx == nil { break } + txTotalMeter.Mark(1) // If we don't have enough space for the next transaction, skip the account. if env.gasPool.Gas() < ltx.Gas { log.Trace("Not enough gas left for transaction", "hash", ltx.Hash, "left", env.gasPool.Gas(), "needed", ltx.Gas) txs.Pop() + txErrNotenoughgasMeter.Mark(1) continue } if left := uint64(params.MaxBlobGasPerBlock - env.blobs*params.BlobTxBlobGasPerBlob); left < ltx.BlobGas { log.Trace("Not enough blob gas left for transaction", "hash", ltx.Hash, "left", left, "needed", ltx.BlobGas) txs.Pop() + txErrNotenoughblobgasMeter.Mark(1) continue } // Transaction seems to fit, pull it up from the pool @@ -888,6 +903,7 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn if tx == nil { log.Trace("Ignoring evicted transaction", "hash", ltx.Hash) txs.Pop() + txErrEvitedMeter.Mark(1) continue } // Error may be ignored here. The error has already been checked @@ -899,6 +915,7 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { log.Trace("Ignoring replay protected transaction", "hash", ltx.Hash, "eip155", w.chainConfig.EIP155Block) txs.Pop() + txErrReplayMeter.Mark(1) continue } // Start executing the transaction @@ -910,18 +927,21 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn // New head notification data race between the transaction pool and miner, shift log.Trace("Skipping transaction with low nonce", "hash", ltx.Hash, "sender", from, "nonce", tx.Nonce()) txs.Shift() + txErrNoncetoolowMeter.Mark(1) case errors.Is(err, nil): // Everything ok, collect the logs and shift in the next transaction from the same account coalescedLogs = append(coalescedLogs, logs...) env.tcount++ txs.Shift() + txSuccMeter.Mark(1) default: // Transaction is regarded as invalid, drop all consecutive transactions from // the same sender because of `nonce-too-high` clause. log.Debug("Transaction failed, account skipped", "hash", ltx.Hash, "err", err) txs.Pop() + txErrUnknownMeter.Mark(1) } } if !w.isRunning() && len(coalescedLogs) > 0 { diff --git a/miner/worker_test.go b/miner/worker_test.go index 948ce43a34..aa05565301 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -161,6 +161,7 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend) { backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks) backend.txPool.Add(pendingTxs, true, false) + time.Sleep(500 * time.Millisecond) // Wait for txs to be promoted w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false) w.setEtherbase(testBankAddress) return w, backend @@ -197,6 +198,7 @@ func TestGenerateAndImportBlock(t *testing.T) { for i := 0; i < 5; i++ { b.txPool.Add([]*types.Transaction{b.newRandomTx(true)}, true, false) b.txPool.Add([]*types.Transaction{b.newRandomTx(false)}, true, false) + time.Sleep(1 * time.Second) // Wait for txs to be promoted select { case ev := <-sub.Chan():