Skip to content

Commit

Permalink
Extend metrics in the code base (#684)
Browse files Browse the repository at this point in the history
* Adding metrics to other services

* Adding block proposed and received metrics

* Enable custom ghcr images (#683)

* feat: allow devs to publish images from branches

* feat: publshing docker images on master commits, pre-release, and release

* fix: typo

* Update .github/workflows/on-master-commit.yaml

Co-authored-by: Pedro Gomes <[email protected]>

* Update .github/workflows/on-master-commit.yaml

Co-authored-by: Pedro Gomes <[email protected]>

* fix: remove duplication

* chore: update actions versions

* feat: orchestract pull request CI from one file

* feat: orchestract pull request CI from one file

* fix: typos

* fix: remove requirement for test to pass before testing docker build

* Update .github/workflows/on-master-commit.yaml

Co-authored-by: libotony <[email protected]>

* Update .github/workflows/on-pre-release.yaml

Co-authored-by: libotony <[email protected]>

* chore: add linter to pre-master publish

* feat: throw an error for invalid release tag

* feat: throw an error for invalid release tag

* fix: publish only rc for pre-release"

* fix: pre-release regex

* fix: remove duplication

* feat: verify tags/releases

* fix: validate step

* chore: update to use ref_name instead of ref

* Update .github/workflows/on-release.yaml

Co-authored-by: libotony <[email protected]>

* Update .github/workflows/on-pull-request.yaml

Co-authored-by: libotony <[email protected]>

* chore: updates for PR comments

---------

Co-authored-by: Pedro Gomes <[email protected]>
Co-authored-by: libotony <[email protected]>

* adding p2p, txpool, bft metrics

* lazyloading metrics

* adding lazyloading tests + quality of life funcs

* Fixing histogram buckets

* minor cleanups

* fix the no label in the gauge

---------

Co-authored-by: Darren Kelly <[email protected]>
Co-authored-by: libotony <[email protected]>
  • Loading branch information
3 people authored Apr 22, 2024
1 parent f500ae7 commit 06c6e78
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 49 deletions.
4 changes: 3 additions & 1 deletion api/utils/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ type HandlerFunc func(http.ResponseWriter, *http.Request) error
func MetricsWrapHandlerFunc(pathPrefix, endpoint string, f HandlerFunc) http.HandlerFunc {
fixedPath := strings.ReplaceAll(pathPrefix, "/", "_") // ensure no unexpected slashes
httpReqCounter := telemetry.CounterVec(fixedPath+"_request_count", []string{"path", "code", "method"})
httpReqDuration := telemetry.HistogramVecWithHTTPBuckets(fixedPath+"_duration_ms", []string{"path", "code", "method"})
httpReqDuration := telemetry.HistogramVec(
fixedPath+"_duration_ms", []string{"path", "code", "method"}, telemetry.BucketHTTPReqs,
)

return func(w http.ResponseWriter, r *http.Request) {
now := time.Now()
Expand Down
10 changes: 9 additions & 1 deletion bft/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sort"
"sync/atomic"

lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/vechain/thor/v2/block"
"github.com/vechain/thor/v2/builtin"
Expand All @@ -17,13 +16,20 @@ import (
"github.com/vechain/thor/v2/kv"
"github.com/vechain/thor/v2/muxdb"
"github.com/vechain/thor/v2/state"
"github.com/vechain/thor/v2/telemetry"
"github.com/vechain/thor/v2/thor"

lru "github.com/hashicorp/golang-lru"
)

const dataStoreName = "bft.engine"

var finalizedKey = []byte("finalized")

var (
metricsBlocksCommitted = telemetry.LazyLoadCounterVec("block_bft_committed_count", []string{"status"})
)

// BFTEngine tracks all votes of blocks, computes the finalized checkpoint.
// Not thread-safe!
type BFTEngine struct {
Expand Down Expand Up @@ -127,6 +133,7 @@ func (engine *BFTEngine) CommitBlock(header *block.Header, isPacking bool) error
return err
}
engine.finalized.Store(id)
metricsBlocksCommitted().AddWithLabel(1, map[string]string{"status": "finalized"})
}
}

Expand All @@ -142,6 +149,7 @@ func (engine *BFTEngine) CommitBlock(header *block.Header, isPacking bool) error
return err
}
engine.casts.Mark(checkpoint, state.Quality)
metricsBlocksCommitted().AddWithLabel(1, map[string]string{"status": "proposed"})
}

return nil
Expand Down
65 changes: 65 additions & 0 deletions cmd/thor/node/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package node

import (
"time"

"github.com/vechain/thor/v2/telemetry"
)

var (
metricBlockProposedCount = telemetry.LazyLoadCounterVec("block_proposed_count", []string{"status"})
metricBlockProposedTxs = telemetry.LazyLoadCounterVec("block_proposed_tx_count", []string{"status"})
metricBlockProposedDuration = telemetry.LazyLoadHistogramVec(
"block_proposed_duration_ms", []string{"status"}, telemetry.Bucket10s,
)

metricBlockReceivedCount = telemetry.LazyLoadCounterVec("block_received_count", []string{"status"})
metricBlockReceivedProcessedTxs = telemetry.LazyLoadCounterVec("block_received_processed_tx_count", []string{"status"})
metricBlockReceivedDuration = telemetry.LazyLoadHistogramVec(
"block_received_duration_ms", []string{"status"}, telemetry.Bucket10s,
)

metricChainForkCount = telemetry.LazyLoadCounter("chain_fork_count")
metricChainForkSize = telemetry.LazyLoadGauge("chain_fork_size")
)

func evalBlockReceivedMetrics(f func() error) error {
startTime := time.Now()

if err := f(); err != nil {
status := map[string]string{
"status": "failed",
}
metricBlockReceivedCount().AddWithLabel(1, status)
metricBlockReceivedDuration().ObserveWithLabels(time.Since(startTime).Milliseconds(), status)
return err
}

status := map[string]string{
"status": "received",
}
metricBlockReceivedCount().AddWithLabel(1, status)
metricBlockReceivedDuration().ObserveWithLabels(time.Since(startTime).Milliseconds(), status)
return nil
}

// evalBlockProposeMetrics captures block proposing metrics
func evalBlockProposeMetrics(f func() error) error {
startTime := time.Now()

if err := f(); err != nil {
status := map[string]string{
"status": "failed",
}
metricBlockProposedCount().AddWithLabel(1, status)
metricBlockProposedDuration().ObserveWithLabels(time.Since(startTime).Milliseconds(), status)
return err
}

status := map[string]string{
"status": "proposed",
}
metricBlockProposedCount().AddWithLabel(1, status)
metricBlockProposedDuration().ObserveWithLabels(time.Since(startTime).Milliseconds(), status)
return nil
}
39 changes: 22 additions & 17 deletions cmd/thor/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,30 +273,32 @@ func (n *Node) txStashLoop(ctx context.Context) {
}

// guardBlockProcessing adds lock on block processing and maintains block conflicts.
func (n *Node) guardBlockProcessing(blockNum uint32, process func(conflicts uint32) error) error {
n.processLock.Lock()
defer n.processLock.Unlock()

if blockNum > n.maxBlockNum {
if blockNum > n.maxBlockNum+1 {
// the block is surely unprocessable now
return errBlockTemporaryUnprocessable
func (n *Node) guardBlockProcessing(blockNum uint32, process func(conflicts uint32) error) func() error {
return func() error {
n.processLock.Lock()
defer n.processLock.Unlock()

if blockNum > n.maxBlockNum {
if blockNum > n.maxBlockNum+1 {
// the block is surely unprocessable now
return errBlockTemporaryUnprocessable
}
n.maxBlockNum = blockNum
return process(0)
}
n.maxBlockNum = blockNum
return process(0)
}

conflicts, err := n.repo.ScanConflicts(blockNum)
if err != nil {
return err
conflicts, err := n.repo.ScanConflicts(blockNum)
if err != nil {
return err
}
return process(conflicts)
}
return process(conflicts)
}

func (n *Node) processBlock(newBlock *block.Block, stats *blockStats) (bool, error) {
var isTrunk *bool

if err := n.guardBlockProcessing(newBlock.Header().Number(), func(conflicts uint32) error {
if err := evalBlockReceivedMetrics(n.guardBlockProcessing(newBlock.Header().Number(), func(conflicts uint32) error {
// Check whether the block was already there.
// It can be skipped if no conflicts.
if conflicts > 0 {
Expand Down Expand Up @@ -395,9 +397,10 @@ func (n *Node) processBlock(newBlock *block.Block, stats *blockStats) (bool, err
log.Debug("bandwidth updated", "gps", v)
}

metricBlockReceivedProcessedTxs().AddWithLabel(int64(len(receipts)), map[string]string{"status": "receivedBlock"})
stats.UpdateProcessed(1, len(receipts), execElapsed, commitElapsed, realElapsed, newBlock.Header().GasUsed())
return nil
}); err != nil {
})); err != nil {
switch {
case err == errKnownBlock || err == errBFTRejected:
stats.UpdateIgnored(1)
Expand Down Expand Up @@ -486,6 +489,8 @@ func (n *Node) processFork(newBlock *block.Block, oldBestBlockID thor.Bytes32) {
}

if n := len(sideIds); n >= 2 {
metricChainForkCount().Add(1)
metricChainForkSize().Gauge(int64(len(sideIds)))
log.Warn(fmt.Sprintf(
`⑂⑂⑂⑂⑂⑂⑂⑂ FORK HAPPENED ⑂⑂⑂⑂⑂⑂⑂⑂
side-chain: %v %v`,
Expand Down
5 changes: 3 additions & 2 deletions cmd/thor/node/packer_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (n *Node) pack(flow *packer.Flow) error {
}
}()

return n.guardBlockProcessing(flow.Number(), func(conflicts uint32) error {
return evalBlockProposeMetrics(n.guardBlockProcessing(flow.Number(), func(conflicts uint32) error {
var (
startTime = mclock.Now()
logEnabled = !n.skipLogs && !n.logDBFailed
Expand Down Expand Up @@ -191,6 +191,7 @@ func (n *Node) pack(flow *packer.Flow) error {
n.processFork(newBlock, oldBest.Header.ID())
commitElapsed := mclock.Now() - startTime - execElapsed

metricBlockProposedTxs().AddWithLabel(int64(len(receipts)), map[string]string{"status": "proposedBlock"})
n.comm.BroadcastBlock(newBlock)
log.Info("📦 new block packed",
"txs", len(receipts),
Expand All @@ -203,5 +204,5 @@ func (n *Node) pack(flow *packer.Flow) error {
log.Debug("bandwidth updated", "gps", v)
}
return nil
})
}))
}
9 changes: 9 additions & 0 deletions p2psrv/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package p2psrv

import "github.com/vechain/thor/v2/telemetry"

var (
metricConnectedPeers = telemetry.LazyLoadGauge("p2p_connected_peers_count")
metricDiscoveredNodes = telemetry.LazyLoadGauge("p2p_discovered_node_count")
metricDialNewNode = telemetry.LazyLoadCounter("p2p_dial_new_node_count")
)
2 changes: 2 additions & 0 deletions p2psrv/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ func (nm *nodeMap) Add(node *discover.Node) {
nm.lock.Lock()
defer nm.lock.Unlock()
nm.m[node.ID] = node
metricConnectedPeers().Gauge(1)
}

func (nm *nodeMap) Remove(id discover.NodeID) *discover.Node {
nm.lock.Lock()
defer nm.lock.Unlock()
if node, ok := nm.m[id]; ok {
delete(nm.m, id)
metricConnectedPeers().Gauge(-1)
return node
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions p2psrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func (s *Server) discoverLoop(topic discv5.Topic) {
case v5node := <-discNodes:
node := discover.NewNode(discover.NodeID(v5node.ID), v5node.IP, v5node.UDP, v5node.TCP)
if _, found := s.discoveredNodes.Get(node.ID); !found {
metricDiscoveredNodes().Gauge(1)
s.discoveredNodes.Set(node.ID, node)
log.Debug("discovered node", "node", node)
}
Expand Down Expand Up @@ -300,6 +301,7 @@ func (s *Server) dialLoop() {
s.dialingNodes.Add(node)
// don't use goes.Go, since the dial process can't be interrupted
go func() {
metricDialNewNode().Add(1)
if err := s.tryDial(node); err != nil {
s.dialingNodes.Remove(node.ID)
log.Debug("failed to dial node", "err", err)
Expand Down
15 changes: 10 additions & 5 deletions telemetry/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@ import "net/http"
// noopTelemetry implements a no operations telemetry service
type noopTelemetry struct{}

func (n *noopTelemetry) GetOrCreateHistogramVecMeter(string, []string, []int64) HistogramVecMeter {
return &noopTelemetry{}
}

func defaultNoopTelemetry() Telemetry { return &noopTelemetry{} }

func (n *noopTelemetry) GetOrCreateHistogramMeter(string, []int64) HistogramMeter { return &noopMetric }

func (n *noopTelemetry) GetOrCreateHistogramVecMeter(string, []string, []int64) HistogramVecMeter {
return &noopMetric
}
func (n *noopTelemetry) GetOrCreateCountMeter(string) CountMeter { return &noopMetric }

func (n *noopTelemetry) GetOrCreateCountVecMeter(string, []string) CountVecMeter {
return &noopMetric
}

func (n *noopTelemetry) GetOrCreateGaugeMeter(string) GaugeMeter {
return &noopMetric
}
func (n *noopTelemetry) GetOrCreateGaugeVecMeter(string, []string) GaugeVecMeter {
return &noopMetric
}
Expand All @@ -29,6 +30,10 @@ var noopMetric = noopMeters{}

type noopMeters struct{}

func (n noopMeters) ObserveWithLabels(i int64, m map[string]string) {}

func (n noopMeters) Gauge(int64) {}

func (n noopMeters) GaugeWithLabel(int64, map[string]string) {}

func (n noopMeters) AddWithLabel(int64, map[string]string) {}
Expand Down
4 changes: 2 additions & 2 deletions telemetry/noop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func TestNoopTelemetry(t *testing.T) {
Counter("count2").Add(1)
}

hist := Histogram("hist1")
histVect := HistogramVec("hist2", []string{"zeroOrOne"})
hist := Histogram("hist1", nil)
histVect := HistogramVec("hist2", []string{"zeroOrOne"}, nil)
for i := 0; i < rand.Intn(100)+1; i++ {
hist.Observe(int64(i))
histVect.ObserveWithLabels(int64(i), map[string]string{"thisIsNonsense": "butDoesntBreak"})
Expand Down
42 changes: 40 additions & 2 deletions telemetry/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"sync"

"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const namespace = "node_telemetry"
Expand All @@ -27,6 +26,7 @@ type prometheusTelemetry struct {
histograms sync.Map
histogramVecs sync.Map
gaugeVecs sync.Map
gauges sync.Map
}

func newPrometheusTelemetry() Telemetry {
Expand All @@ -36,6 +36,7 @@ func newPrometheusTelemetry() Telemetry {
histograms: sync.Map{},
histogramVecs: sync.Map{},
gaugeVecs: sync.Map{},
gauges: sync.Map{},
}
}

Expand Down Expand Up @@ -91,6 +92,18 @@ func (o *prometheusTelemetry) GetOrCreateHistogramVecMeter(name string, labels [
return meter
}

func (o *prometheusTelemetry) GetOrCreateGaugeMeter(name string) GaugeMeter {
var meter GaugeMeter
mapItem, ok := o.gauges.Load(name)
if !ok {
meter = o.newGaugeMeter(name)
o.gauges.Store(name, meter)
} else {
meter = mapItem.(GaugeMeter)
}
return meter
}

func (o *prometheusTelemetry) GetOrCreateGaugeVecMeter(name string, labels []string) GaugeVecMeter {
var meter GaugeVecMeter
mapItem, ok := o.gaugeVecs.Load(name)
Expand Down Expand Up @@ -203,6 +216,23 @@ func (o *prometheusTelemetry) newCountVecMeter(name string, labels []string) Cou
}
}

func (o *prometheusTelemetry) newGaugeMeter(name string) GaugeMeter {
meter := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: name,
},
)

err := prometheus.Register(meter)
if err != nil {
log.Warn("unable to register metric", "err", err)
}
return &promGaugeMeter{
gauge: meter,
}
}

func (o *prometheusTelemetry) newGaugeVecMeter(name string, labels []string) GaugeVecMeter {
meter := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -237,6 +267,14 @@ func (c *promCountVecMeter) AddWithLabel(i int64, labels map[string]string) {
c.counter.With(labels).Add(float64(i))
}

type promGaugeMeter struct {
gauge prometheus.Gauge
}

func (c *promGaugeMeter) Gauge(i int64) {
c.gauge.Add(float64(i))
}

type promGaugeVecMeter struct {
gauge *prometheus.GaugeVec
}
Expand Down
Loading

0 comments on commit 06c6e78

Please sign in to comment.