From f8e563976e8a36c5f0af492bf37e4d977c9e3abc Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Fri, 1 Jul 2022 12:08:32 -0700 Subject: [PATCH] [AV-1619] Add metrics for incoming/outgoing gossip (#835) * Add metrics for incoming/outgoing gossip * dont count empty regossips * bump version * pr comments * fix * rename stats for greppable prefix * reoder functions * add combined interface * move file * add dropped/error stat for atomic tx * pr comments * remove tx counting --- plugin/evm/gossip_stats.go | 106 +++++++++++++++++++++++++++++++++++++ plugin/evm/gossiper.go | 29 +++++++++- plugin/evm/vm.go | 4 +- 3 files changed, 137 insertions(+), 2 deletions(-) create mode 100644 plugin/evm/gossip_stats.go diff --git a/plugin/evm/gossip_stats.go b/plugin/evm/gossip_stats.go new file mode 100644 index 0000000000..ab12c607c7 --- /dev/null +++ b/plugin/evm/gossip_stats.go @@ -0,0 +1,106 @@ +// (c) 2022, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package evm + +import "github.com/ava-labs/coreth/metrics" + +var _ GossipStats = &gossipStats{} + +// GossipStats contains methods for updating incoming and outgoing gossip stats. +type GossipStats interface { + GossipReceivedStats + GossipSentStats +} + +// GossipReceivedStats groups functions for incoming gossip stats. +type GossipReceivedStats interface { + IncAtomicGossipReceived() + IncEthTxsGossipReceived() + + // new vs. known txs received + IncAtomicGossipReceivedDropped() + IncAtomicGossipReceivedError() + IncAtomicGossipReceivedKnown() + IncAtomicGossipReceivedNew() + IncEthTxsGossipReceivedKnown() + IncEthTxsGossipReceivedNew() +} + +// GossipSentStats groups functions for outgoing gossip stats. +type GossipSentStats interface { + IncAtomicGossipSent() + IncEthTxsGossipSent() + + // regossip + IncEthTxsRegossipQueued() + IncEthTxsRegossipQueuedLocal(count int) + IncEthTxsRegossipQueuedRemote(count int) +} + +// gossipStats implements stats for incoming and outgoing gossip stats. +type gossipStats struct { + // messages + atomicGossipSent metrics.Counter + atomicGossipReceived metrics.Counter + ethTxsGossipSent metrics.Counter + ethTxsGossipReceived metrics.Counter + + // regossip + ethTxsRegossipQueued metrics.Counter + ethTxsRegossipQueuedLocal metrics.Counter + ethTxsRegossipQueuedRemote metrics.Counter + + // new vs. known txs received + atomicGossipReceivedDropped metrics.Counter + atomicGossipReceivedError metrics.Counter + atomicGossipReceivedKnown metrics.Counter + atomicGossipReceivedNew metrics.Counter + ethTxsGossipReceivedKnown metrics.Counter + ethTxsGossipReceivedNew metrics.Counter +} + +func NewGossipStats() GossipStats { + return &gossipStats{ + atomicGossipSent: metrics.GetOrRegisterCounter("gossip_atomic_sent", nil), + atomicGossipReceived: metrics.GetOrRegisterCounter("gossip_atomic_received", nil), + ethTxsGossipSent: metrics.GetOrRegisterCounter("gossip_eth_txs_sent", nil), + ethTxsGossipReceived: metrics.GetOrRegisterCounter("gossip_eth_txs_received", nil), + + ethTxsRegossipQueued: metrics.GetOrRegisterCounter("regossip_eth_txs_queued_attempts", nil), + ethTxsRegossipQueuedLocal: metrics.GetOrRegisterCounter("regossip_eth_txs_queued_local_tx_count", nil), + ethTxsRegossipQueuedRemote: metrics.GetOrRegisterCounter("regossip_eth_txs_queued_remote_tx_count", nil), + + atomicGossipReceivedDropped: metrics.GetOrRegisterCounter("gossip_atomic_received_dropped", nil), + atomicGossipReceivedError: metrics.GetOrRegisterCounter("gossip_atomic_received_error", nil), + atomicGossipReceivedKnown: metrics.GetOrRegisterCounter("gossip_atomic_received_known", nil), + atomicGossipReceivedNew: metrics.GetOrRegisterCounter("gossip_atomic_received_new", nil), + ethTxsGossipReceivedKnown: metrics.GetOrRegisterCounter("gossip_eth_txs_received_known", nil), + ethTxsGossipReceivedNew: metrics.GetOrRegisterCounter("gossip_eth_txs_received_new", nil), + } +} + +// incoming messages +func (g *gossipStats) IncAtomicGossipReceived() { g.atomicGossipReceived.Inc(1) } +func (g *gossipStats) IncEthTxsGossipReceived() { g.ethTxsGossipReceived.Inc(1) } + +// new vs. known txs received +func (g *gossipStats) IncAtomicGossipReceivedDropped() { g.atomicGossipReceivedDropped.Inc(1) } +func (g *gossipStats) IncAtomicGossipReceivedError() { g.atomicGossipReceivedError.Inc(1) } +func (g *gossipStats) IncAtomicGossipReceivedKnown() { g.atomicGossipReceivedKnown.Inc(1) } +func (g *gossipStats) IncAtomicGossipReceivedNew() { g.atomicGossipReceivedNew.Inc(1) } +func (g *gossipStats) IncEthTxsGossipReceivedKnown() { g.ethTxsGossipReceivedKnown.Inc(1) } +func (g *gossipStats) IncEthTxsGossipReceivedNew() { g.ethTxsGossipReceivedNew.Inc(1) } + +// outgoing messages +func (g *gossipStats) IncAtomicGossipSent() { g.atomicGossipSent.Inc(1) } +func (g *gossipStats) IncEthTxsGossipSent() { g.ethTxsGossipSent.Inc(1) } + +// regossip +func (g *gossipStats) IncEthTxsRegossipQueued() { g.ethTxsRegossipQueued.Inc(1) } +func (g *gossipStats) IncEthTxsRegossipQueuedLocal(count int) { + g.ethTxsRegossipQueuedLocal.Inc(int64(count)) +} +func (g *gossipStats) IncEthTxsRegossipQueuedRemote(count int) { + g.ethTxsRegossipQueuedRemote.Inc(int64(count)) +} diff --git a/plugin/evm/gossiper.go b/plugin/evm/gossiper.go index 915afa6c94..b2988a6559 100644 --- a/plugin/evm/gossiper.go +++ b/plugin/evm/gossiper.go @@ -71,6 +71,7 @@ type pushGossiper struct { recentEthTxs *cache.LRU codec codec.Manager + stats GossipSentStats } // createGossiper constructs and returns a pushGossiper or noopGossiper @@ -95,6 +96,7 @@ func (vm *VM) createGossiper() Gossiper { recentAtomicTxs: &cache.LRU{Size: recentCacheSize}, recentEthTxs: &cache.LRU{Size: recentCacheSize}, codec: vm.networkCodec, + stats: vm.gossipStats, } net.awaitEthTxGossip() return net @@ -198,10 +200,17 @@ func (n *pushGossiper) queueRegossipTxs() types.Transactions { } localQueued := n.queueExecutableTxs(state, tip.BaseFee(), localTxs, n.config.TxRegossipMaxSize) localCount := len(localQueued) + n.stats.IncEthTxsRegossipQueuedLocal(localCount) if localCount >= n.config.TxRegossipMaxSize { + n.stats.IncEthTxsRegossipQueued() return localQueued } remoteQueued := n.queueExecutableTxs(state, tip.BaseFee(), remoteTxs, n.config.TxRegossipMaxSize-localCount) + n.stats.IncEthTxsRegossipQueuedRemote(len(remoteQueued)) + if localCount+len(remoteQueued) > 0 { + // only increment the regossip stat when there are any txs queued + n.stats.IncEthTxsRegossipQueued() + } return append(localQueued, remoteQueued...) } @@ -297,6 +306,7 @@ func (n *pushGossiper) gossipAtomicTx(tx *Tx) error { "gossiping atomic tx", "txID", txID, ) + n.stats.IncAtomicGossipSent() return n.client.Gossip(msgBytes) } @@ -322,6 +332,7 @@ func (n *pushGossiper) sendEthTxs(txs []*types.Transaction) error { "len(txs)", len(txs), "size(txs)", len(msg.Txs), ) + n.stats.IncEthTxsGossipSent() return n.client.Gossip(msgBytes) } @@ -411,6 +422,7 @@ type GossipHandler struct { vm *VM atomicMempool *Mempool txPool *core.TxPool + stats GossipReceivedStats } func NewGossipHandler(vm *VM) *GossipHandler { @@ -418,6 +430,7 @@ func NewGossipHandler(vm *VM) *GossipHandler { vm: vm, atomicMempool: vm.mempool, txPool: vm.chain.GetTxPool(), + stats: vm.gossipStats, } } @@ -456,10 +469,16 @@ func (h *GossipHandler) HandleAtomicTx(nodeID ids.NodeID, msg message.AtomicTxGo tx.Initialize(unsignedBytes, msg.Tx) txID := tx.ID() - if _, dropped, found := h.atomicMempool.GetTx(txID); found || dropped { + h.stats.IncAtomicGossipReceived() + if _, dropped, found := h.atomicMempool.GetTx(txID); found { + h.stats.IncAtomicGossipReceivedKnown() + return nil + } else if dropped { + h.stats.IncAtomicGossipReceivedDropped() return nil } + h.stats.IncAtomicGossipReceivedNew() if err := h.vm.issueTx(&tx, false /*=local*/); err != nil { log.Trace( "AppGossip provided invalid transaction", @@ -496,6 +515,7 @@ func (h *GossipHandler) HandleEthTxs(nodeID ids.NodeID, msg message.EthTxsGossip ) return nil } + h.stats.IncEthTxsGossipReceived() errs := h.txPool.AddRemotes(txs) for i, err := range errs { if err != nil { @@ -504,7 +524,14 @@ func (h *GossipHandler) HandleEthTxs(nodeID ids.NodeID, msg message.EthTxsGossip "err", err, "tx", txs[i].Hash(), ) + if err == core.ErrAlreadyKnown { + h.stats.IncEthTxsGossipReceivedKnown() + } else { + h.stats.IncAtomicGossipReceivedError() + } + continue } + h.stats.IncEthTxsGossipReceivedNew() } return nil } diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index ae2313cb70..bf74d53e82 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -231,7 +231,8 @@ type VM struct { builder *blockBuilder - gossiper Gossiper + gossiper Gossiper + gossipStats GossipStats baseCodec codec.Registry codec codec.Manager @@ -551,6 +552,7 @@ func (vm *VM) initializeChain(lastAcceptedHash common.Hash) error { // // NOTE: gossip network must be initialized first otherwise ETH tx gossip will // not work. + vm.gossipStats = NewGossipStats() vm.gossiper = vm.createGossiper() vm.builder = vm.NewBlockBuilder(vm.toEngine) vm.builder.awaitSubmittedTxs()