Skip to content

Commit

Permalink
Merge pull request #1 from unionlabs/fixup-mempool
Browse files Browse the repository at this point in the history
fix: mempool reactor
  • Loading branch information
hussein-aitlahcen authored Nov 18, 2024
2 parents df0586c + ea0b23e commit 960ce8b
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ type PeerState interface {

// Send new mempool txs to peer.
func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
memR.Logger.Debug("BroadcastTxRoutine start")

var next *clist.CElement

// If the node is catching up, don't start this routine immediately.
Expand All @@ -216,6 +218,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
select {
case <-memR.mempool.TxsWaitChan(): // Wait until a tx is available
if next = memR.mempool.TxsFront(); next == nil {
memR.Logger.Debug("Waiting for transaction")
continue
}
case <-peer.Quit():
Expand All @@ -234,6 +237,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// will be initialized before the consensus reactor. We should wait a few
// milliseconds and retry.
time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond)
memR.Logger.Debug("Invalid peer state, rechecking.")
continue
}

Expand All @@ -252,18 +256,24 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// NOTE: Transaction batching was disabled due to
// https://github.com/tendermint/tendermint/issues/5796

// Do not send this transaction if we receive it from peer.
if memTx.isSender(peer.ID()) {
// The entry may have been removed from the mempool since it was
// chosen at the beginning of the loop. Skip it if that's the case.
if !memR.mempool.InMempool(memTx.tx.Key()) {
memR.Logger.Debug("Transaction no longer in mempool, skipping.")
time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond)
continue
}

success := peer.Send(p2p.Envelope{
ChannelID: MempoolChannel,
Message: &protomem.Txs{Txs: [][]byte{memTx.tx}},
})
if !success {
time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond)
continue
// Do not send this transaction if we receive it from peer.
if !memTx.isSender(peer.ID()) {
success := peer.Send(p2p.Envelope{
ChannelID: MempoolChannel,
Message: &protomem.Txs{Txs: [][]byte{memTx.tx}},
})
if !success {
time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond)
continue
}
}

select {
Expand Down

0 comments on commit 960ce8b

Please sign in to comment.