Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimization: enqueue transactions in parallel from p2p #173

Merged
merged 4 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,15 @@ func (f *TxFetcher) Stop() {
close(f.quit)
}

func (f *TxFetcher) IsWorking() (bool, error) {
select {
case <-f.quit:
return false, errTerminated
default:
return true, nil
}
}

func (f *TxFetcher) loop() {
var (
waitTimer = new(mclock.Timer)
Expand Down
37 changes: 35 additions & 2 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,31 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/fetcher"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/p2p/enode"
)

// TxQueueSize is the size of the transaction queue used to enqueue transactions
const (
TxQueueSize = 16
)

// enqueueTx is a channel to enqueue transactions in parallel.
// It is used to improve the performance of transaction enqueued.
var enqueueTx = make(chan func(), TxQueueSize)

func init() {
// run the transaction enqueuing loop
for i := 0; i < TxQueueSize; i++ {
go func() {
for enqueue := range enqueueTx {
enqueue()
}
}()
}
}

// ethHandler implements the eth.Backend interface to handle the various network
// packets that are sent as replies or broadcasts.
type ethHandler handler
Expand Down Expand Up @@ -92,16 +113,28 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
return errors.New("disallowed broadcast blob transaction")
}
}
return h.txFetcher.Enqueue(peer.ID(), *packet, false)
return asyncEnqueueTx(peer, *packet, h.txFetcher, false)

case *eth.PooledTransactionsResponse:
return h.txFetcher.Enqueue(peer.ID(), *packet, true)
return asyncEnqueueTx(peer, *packet, h.txFetcher, true)

default:
return fmt.Errorf("unexpected eth packet type: %T", packet)
}
}

func asyncEnqueueTx(peer *eth.Peer, txs []*types.Transaction, fetcher *fetcher.TxFetcher, directed bool) error {
if working, err := fetcher.IsWorking(); !working {
return err
}
enqueueTx <- func() {
if err := fetcher.Enqueue(peer.ID(), txs, directed); err != nil {
peer.Log().Warn("Failed to enqueue transaction", "err", err)
}
}
return nil
}

// handleBlockAnnounces is invoked from a peer's message handler when it transmits a
// batch of block announcements for the local node to process.
func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, numbers []uint64) error {
Expand Down
6 changes: 6 additions & 0 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
var (
txAnnounceAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/abandon", nil)
txBroadcastAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/abandon", nil)
txP2PAnnQueueGauge = metrics.NewRegisteredGauge("eth/fetcher/transaction/p2p/ann/queue", nil)
txP2PBroadQueueGauge = metrics.NewRegisteredGauge("eth/fetcher/transaction/p2p/broad/queue", nil)
)

// safeGetPeerIP
Expand Down Expand Up @@ -133,6 +135,8 @@ func (p *Peer) broadcastTransactions() {
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])]
}

txP2PBroadQueueGauge.Update(int64(len(queue)))

case <-done:
done = nil

Expand Down Expand Up @@ -208,6 +212,8 @@ func (p *Peer) announceTransactions() {
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])]
}

txP2PAnnQueueGauge.Update(int64(len(queue)))

case <-done:
done = nil

Expand Down
Loading