Skip to content

Commit

Permalink
[AV-1619] Add metrics for incoming/outgoing gossip (#835)
Browse files Browse the repository at this point in the history
* Add metrics for incoming/outgoing gossip

* dont count empty regossips

* bump version

* pr comments

* fix

* rename stats for greppable prefix

* reoder functions

* add combined interface

* move file

* add dropped/error stat for atomic tx

* pr comments

* remove tx counting
  • Loading branch information
darioush authored Jul 1, 2022
1 parent 58c4830 commit f8e5639
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 2 deletions.
106 changes: 106 additions & 0 deletions plugin/evm/gossip_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// (c) 2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm

import "github.com/ava-labs/coreth/metrics"

var _ GossipStats = &gossipStats{}

// GossipStats contains methods for updating incoming and outgoing gossip stats.
type GossipStats interface {
GossipReceivedStats
GossipSentStats
}

// GossipReceivedStats groups functions for incoming gossip stats.
type GossipReceivedStats interface {
IncAtomicGossipReceived()
IncEthTxsGossipReceived()

// new vs. known txs received
IncAtomicGossipReceivedDropped()
IncAtomicGossipReceivedError()
IncAtomicGossipReceivedKnown()
IncAtomicGossipReceivedNew()
IncEthTxsGossipReceivedKnown()
IncEthTxsGossipReceivedNew()
}

// GossipSentStats groups functions for outgoing gossip stats.
type GossipSentStats interface {
IncAtomicGossipSent()
IncEthTxsGossipSent()

// regossip
IncEthTxsRegossipQueued()
IncEthTxsRegossipQueuedLocal(count int)
IncEthTxsRegossipQueuedRemote(count int)
}

// gossipStats implements stats for incoming and outgoing gossip stats.
type gossipStats struct {
// messages
atomicGossipSent metrics.Counter
atomicGossipReceived metrics.Counter
ethTxsGossipSent metrics.Counter
ethTxsGossipReceived metrics.Counter

// regossip
ethTxsRegossipQueued metrics.Counter
ethTxsRegossipQueuedLocal metrics.Counter
ethTxsRegossipQueuedRemote metrics.Counter

// new vs. known txs received
atomicGossipReceivedDropped metrics.Counter
atomicGossipReceivedError metrics.Counter
atomicGossipReceivedKnown metrics.Counter
atomicGossipReceivedNew metrics.Counter
ethTxsGossipReceivedKnown metrics.Counter
ethTxsGossipReceivedNew metrics.Counter
}

func NewGossipStats() GossipStats {
return &gossipStats{
atomicGossipSent: metrics.GetOrRegisterCounter("gossip_atomic_sent", nil),
atomicGossipReceived: metrics.GetOrRegisterCounter("gossip_atomic_received", nil),
ethTxsGossipSent: metrics.GetOrRegisterCounter("gossip_eth_txs_sent", nil),
ethTxsGossipReceived: metrics.GetOrRegisterCounter("gossip_eth_txs_received", nil),

ethTxsRegossipQueued: metrics.GetOrRegisterCounter("regossip_eth_txs_queued_attempts", nil),
ethTxsRegossipQueuedLocal: metrics.GetOrRegisterCounter("regossip_eth_txs_queued_local_tx_count", nil),
ethTxsRegossipQueuedRemote: metrics.GetOrRegisterCounter("regossip_eth_txs_queued_remote_tx_count", nil),

atomicGossipReceivedDropped: metrics.GetOrRegisterCounter("gossip_atomic_received_dropped", nil),
atomicGossipReceivedError: metrics.GetOrRegisterCounter("gossip_atomic_received_error", nil),
atomicGossipReceivedKnown: metrics.GetOrRegisterCounter("gossip_atomic_received_known", nil),
atomicGossipReceivedNew: metrics.GetOrRegisterCounter("gossip_atomic_received_new", nil),
ethTxsGossipReceivedKnown: metrics.GetOrRegisterCounter("gossip_eth_txs_received_known", nil),
ethTxsGossipReceivedNew: metrics.GetOrRegisterCounter("gossip_eth_txs_received_new", nil),
}
}

// incoming messages
func (g *gossipStats) IncAtomicGossipReceived() { g.atomicGossipReceived.Inc(1) }
func (g *gossipStats) IncEthTxsGossipReceived() { g.ethTxsGossipReceived.Inc(1) }

// new vs. known txs received
func (g *gossipStats) IncAtomicGossipReceivedDropped() { g.atomicGossipReceivedDropped.Inc(1) }
func (g *gossipStats) IncAtomicGossipReceivedError() { g.atomicGossipReceivedError.Inc(1) }
func (g *gossipStats) IncAtomicGossipReceivedKnown() { g.atomicGossipReceivedKnown.Inc(1) }
func (g *gossipStats) IncAtomicGossipReceivedNew() { g.atomicGossipReceivedNew.Inc(1) }
func (g *gossipStats) IncEthTxsGossipReceivedKnown() { g.ethTxsGossipReceivedKnown.Inc(1) }
func (g *gossipStats) IncEthTxsGossipReceivedNew() { g.ethTxsGossipReceivedNew.Inc(1) }

// outgoing messages
func (g *gossipStats) IncAtomicGossipSent() { g.atomicGossipSent.Inc(1) }
func (g *gossipStats) IncEthTxsGossipSent() { g.ethTxsGossipSent.Inc(1) }

// regossip
func (g *gossipStats) IncEthTxsRegossipQueued() { g.ethTxsRegossipQueued.Inc(1) }
func (g *gossipStats) IncEthTxsRegossipQueuedLocal(count int) {
g.ethTxsRegossipQueuedLocal.Inc(int64(count))
}
func (g *gossipStats) IncEthTxsRegossipQueuedRemote(count int) {
g.ethTxsRegossipQueuedRemote.Inc(int64(count))
}
29 changes: 28 additions & 1 deletion plugin/evm/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type pushGossiper struct {
recentEthTxs *cache.LRU

codec codec.Manager
stats GossipSentStats
}

// createGossiper constructs and returns a pushGossiper or noopGossiper
Expand All @@ -95,6 +96,7 @@ func (vm *VM) createGossiper() Gossiper {
recentAtomicTxs: &cache.LRU{Size: recentCacheSize},
recentEthTxs: &cache.LRU{Size: recentCacheSize},
codec: vm.networkCodec,
stats: vm.gossipStats,
}
net.awaitEthTxGossip()
return net
Expand Down Expand Up @@ -198,10 +200,17 @@ func (n *pushGossiper) queueRegossipTxs() types.Transactions {
}
localQueued := n.queueExecutableTxs(state, tip.BaseFee(), localTxs, n.config.TxRegossipMaxSize)
localCount := len(localQueued)
n.stats.IncEthTxsRegossipQueuedLocal(localCount)
if localCount >= n.config.TxRegossipMaxSize {
n.stats.IncEthTxsRegossipQueued()
return localQueued
}
remoteQueued := n.queueExecutableTxs(state, tip.BaseFee(), remoteTxs, n.config.TxRegossipMaxSize-localCount)
n.stats.IncEthTxsRegossipQueuedRemote(len(remoteQueued))
if localCount+len(remoteQueued) > 0 {
// only increment the regossip stat when there are any txs queued
n.stats.IncEthTxsRegossipQueued()
}
return append(localQueued, remoteQueued...)
}

Expand Down Expand Up @@ -297,6 +306,7 @@ func (n *pushGossiper) gossipAtomicTx(tx *Tx) error {
"gossiping atomic tx",
"txID", txID,
)
n.stats.IncAtomicGossipSent()
return n.client.Gossip(msgBytes)
}

Expand All @@ -322,6 +332,7 @@ func (n *pushGossiper) sendEthTxs(txs []*types.Transaction) error {
"len(txs)", len(txs),
"size(txs)", len(msg.Txs),
)
n.stats.IncEthTxsGossipSent()
return n.client.Gossip(msgBytes)
}

Expand Down Expand Up @@ -411,13 +422,15 @@ type GossipHandler struct {
vm *VM
atomicMempool *Mempool
txPool *core.TxPool
stats GossipReceivedStats
}

func NewGossipHandler(vm *VM) *GossipHandler {
return &GossipHandler{
vm: vm,
atomicMempool: vm.mempool,
txPool: vm.chain.GetTxPool(),
stats: vm.gossipStats,
}
}

Expand Down Expand Up @@ -456,10 +469,16 @@ func (h *GossipHandler) HandleAtomicTx(nodeID ids.NodeID, msg message.AtomicTxGo
tx.Initialize(unsignedBytes, msg.Tx)

txID := tx.ID()
if _, dropped, found := h.atomicMempool.GetTx(txID); found || dropped {
h.stats.IncAtomicGossipReceived()
if _, dropped, found := h.atomicMempool.GetTx(txID); found {
h.stats.IncAtomicGossipReceivedKnown()
return nil
} else if dropped {
h.stats.IncAtomicGossipReceivedDropped()
return nil
}

h.stats.IncAtomicGossipReceivedNew()
if err := h.vm.issueTx(&tx, false /*=local*/); err != nil {
log.Trace(
"AppGossip provided invalid transaction",
Expand Down Expand Up @@ -496,6 +515,7 @@ func (h *GossipHandler) HandleEthTxs(nodeID ids.NodeID, msg message.EthTxsGossip
)
return nil
}
h.stats.IncEthTxsGossipReceived()
errs := h.txPool.AddRemotes(txs)
for i, err := range errs {
if err != nil {
Expand All @@ -504,7 +524,14 @@ func (h *GossipHandler) HandleEthTxs(nodeID ids.NodeID, msg message.EthTxsGossip
"err", err,
"tx", txs[i].Hash(),
)
if err == core.ErrAlreadyKnown {
h.stats.IncEthTxsGossipReceivedKnown()
} else {
h.stats.IncAtomicGossipReceivedError()
}
continue
}
h.stats.IncEthTxsGossipReceivedNew()
}
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ type VM struct {

builder *blockBuilder

gossiper Gossiper
gossiper Gossiper
gossipStats GossipStats

baseCodec codec.Registry
codec codec.Manager
Expand Down Expand Up @@ -551,6 +552,7 @@ func (vm *VM) initializeChain(lastAcceptedHash common.Hash) error {
//
// NOTE: gossip network must be initialized first otherwise ETH tx gossip will
// not work.
vm.gossipStats = NewGossipStats()
vm.gossiper = vm.createGossiper()
vm.builder = vm.NewBlockBuilder(vm.toEngine)
vm.builder.awaitSubmittedTxs()
Expand Down

0 comments on commit f8e5639

Please sign in to comment.