Skip to content

Commit

Permalink
Merge pull request #2482 from iotaledger/metrics-pipe-fix
Browse files Browse the repository at this point in the history
Unregister pipe metrics properly.
  • Loading branch information
kape1395 authored May 17, 2023
2 parents baa9288 + d10123f commit 53e90cf
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 18 deletions.
2 changes: 1 addition & 1 deletion components/webapi/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewEcho(params *ParametersWebAPI, metrics *metrics.ChainMetricsProvider, lo
if !ok {
return err
}
metrics.NewChainMetrics(chainID).WebAPIRequest(operation, status, time.Since(start))
metrics.GetChainMetrics(chainID).WebAPIRequest(operation, status, time.Since(start))

return err
}
Expand Down
2 changes: 1 addition & 1 deletion packages/chains/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (c *Chains) activateWithoutLocking(chainID isc.ChainID) error {
return fmt.Errorf("error when creating chain KV store: %w", err)
}

chainMetrics := c.chainMetricsProvider.NewChainMetrics(chainID)
chainMetrics := c.chainMetricsProvider.GetChainMetrics(chainID)

// Initialize WAL
chainLog := c.log.Named(chainID.ShortString())
Expand Down
41 changes: 29 additions & 12 deletions packages/metrics/chain.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package metrics

import (
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"

iotago "github.com/iotaledger/iota.go/v3"
"github.com/iotaledger/iota.go/v3/nodeclient"
Expand Down Expand Up @@ -152,6 +154,9 @@ func newChainMetrics(provider *ChainMetricsProvider, chainID isc.ChainID) *chain

// ChainMetricsProvider holds all metrics for all chains per chain
type ChainMetricsProvider struct {
chainsLock *sync.RWMutex
chainsRegistered map[isc.ChainID]*chainMetrics

// We use Func variant of a metric here, thus we register them
// explicitly when they are created. Therefore we need a registry here.
pipeLenRegistry *prometheus.Registry
Expand Down Expand Up @@ -184,7 +189,6 @@ type ChainMetricsProvider struct {
mempoolMissingReqs *prometheus.GaugeVec

// messages
chainsRegistered []isc.ChainID
messagesL1 *prometheus.CounterVec
lastL1MessageTime *prometheus.GaugeVec
messagesL1Chain *prometheus.CounterVec
Expand Down Expand Up @@ -239,6 +243,9 @@ func NewChainMetricsProvider() *ChainMetricsProvider {
recCountBuckets := prometheus.ExponentialBucketsRange(1, 1000, 16)

m := &ChainMetricsProvider{
chainsLock: &sync.RWMutex{},
chainsRegistered: map[isc.ChainID]*chainMetrics{},

//
// blockWAL
//
Expand Down Expand Up @@ -379,7 +386,6 @@ func NewChainMetricsProvider() *ChainMetricsProvider {
//
// messages // TODO: Review, if they are used/needed.
//
chainsRegistered: make([]isc.ChainID, 0),

messagesL1: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "iota_wasp",
Expand Down Expand Up @@ -573,8 +579,16 @@ func NewChainMetricsProvider() *ChainMetricsProvider {
return m
}

func (m *ChainMetricsProvider) NewChainMetrics(chainID isc.ChainID) IChainMetrics {
return newChainMetrics(m, chainID)
func (m *ChainMetricsProvider) GetChainMetrics(chainID isc.ChainID) IChainMetrics {
m.chainsLock.Lock()
defer m.chainsLock.Unlock()

if cm, ok := m.chainsRegistered[chainID]; ok {
return cm
}
cm := newChainMetrics(m, chainID)
m.chainsRegistered[chainID] = cm
return cm
}

func (m *ChainMetricsProvider) PrometheusCollectorsBlockWAL() []prometheus.Collector {
Expand Down Expand Up @@ -667,21 +681,24 @@ func (m *ChainMetricsProvider) PrometheusCollectorsWebAPI() []prometheus.Collect
}

func (m *ChainMetricsProvider) RegisterChain(chainID isc.ChainID) {
m.chainsRegistered = append(m.chainsRegistered, chainID)
m.GetChainMetrics(chainID)
}

func (m *ChainMetricsProvider) UnregisterChain(chainID isc.ChainID) {
for i := 0; i < len(m.chainsRegistered); i++ {
if m.chainsRegistered[i] == chainID {
// remove the found chain from the slice and return
m.chainsRegistered = append(m.chainsRegistered[:i], m.chainsRegistered[i+1:]...)
return
}
m.chainsLock.Lock()
defer m.chainsLock.Unlock()

if cm, ok := m.chainsRegistered[chainID]; ok {
cm.cleanup()
delete(m.chainsRegistered, chainID)
}
}

func (m *ChainMetricsProvider) RegisteredChains() []isc.ChainID {
return m.chainsRegistered
m.chainsLock.RLock()
defer m.chainsLock.RUnlock()

return lo.Keys(m.chainsRegistered)
}

func (m *ChainMetricsProvider) InMilestone() IMessageMetric[*nodeclient.MilestoneInfo] {
Expand Down
38 changes: 37 additions & 1 deletion packages/metrics/chain_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"

Expand All @@ -26,6 +27,7 @@ type chainPipeMetrics struct {
provider *ChainMetricsProvider
lenMetrics map[string]prometheus.Collector
maxMetrics map[string]*chainPipeMaxCollector
regLock *sync.RWMutex
}

type chainPipeMaxCollector struct {
Expand All @@ -39,9 +41,30 @@ func newChainPipeMetric(provider *ChainMetricsProvider, chainID isc.ChainID) *ch
provider: provider,
lenMetrics: map[string]prometheus.Collector{},
maxMetrics: map[string]*chainPipeMaxCollector{},
regLock: &sync.RWMutex{},
}
}

func (m *chainPipeMetrics) cleanup() {
m.regLock.Lock()
defer m.regLock.Unlock()

reg := m.provider.pipeLenRegistry
if reg == nil {
return
}

for _, collector := range m.lenMetrics {
reg.Unregister(collector)
}
m.lenMetrics = map[string]prometheus.Collector{}

for _, maxCollector := range m.maxMetrics {
reg.Unregister(maxCollector.collector)
}
m.maxMetrics = map[string]*chainPipeMaxCollector{}
}

func (m *chainPipeMetrics) makeLabels(pipeName string) prometheus.Labels {
return prometheus.Labels{
labelNameChain: m.chainID.String(),
Expand All @@ -50,6 +73,9 @@ func (m *chainPipeMetrics) makeLabels(pipeName string) prometheus.Labels {
}

func (m *chainPipeMetrics) TrackPipeLen(name string, lenFunc func() int) {
m.regLock.Lock()
defer m.regLock.Unlock()

reg := m.provider.pipeLenRegistry
if reg == nil {
return
Expand All @@ -65,13 +91,17 @@ func (m *chainPipeMetrics) TrackPipeLen(name string, lenFunc func() int) {
Help: "Length of a pipe",
ConstLabels: m.makeLabels(name),
}, func() float64 { return float64(lenFunc()) })
m.lenMetrics[name] = collector

if err := reg.Register(collector); err != nil {
panic(fmt.Errorf("failed to register pipe len metric: %w", err))
panic(fmt.Errorf("failed to register pipe %v len metric for chain %v: %w", name, m.chainID, err))
}
}

func (m *chainPipeMetrics) TrackPipeLenMax(name string, key string, lenFunc func() int) {
m.regLock.Lock()
defer m.regLock.Unlock()

reg := m.provider.pipeLenRegistry
if reg == nil {
return
Expand All @@ -96,6 +126,9 @@ func (m *chainPipeMetrics) TrackPipeLenMax(name string, key string, lenFunc func
}
return float64(max)
})
if err := reg.Register(collector); err != nil {
panic(fmt.Errorf("failed to register pipe %v max len metric for chain %v: %w", name, m.chainID, err))
}
maxCollector = &chainPipeMaxCollector{
collector: collector,
valueFuncs: valueFuncs,
Expand All @@ -106,6 +139,9 @@ func (m *chainPipeMetrics) TrackPipeLenMax(name string, key string, lenFunc func
}

func (m *chainPipeMetrics) ForgetPipeLenMax(name string, key string) {
m.regLock.Lock()
defer m.regLock.Unlock()

reg := m.provider.pipeLenRegistry
if reg == nil {
return
Expand Down
4 changes: 2 additions & 2 deletions packages/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func createOnLedgerRequest() isc.OnLedgerRequest {

func TestMessageMetrics(t *testing.T) {
ncm := NewChainMetricsProvider()
cncm1 := ncm.NewChainMetrics(isc.RandomChainID())
cncm2 := ncm.NewChainMetrics(isc.RandomChainID())
cncm1 := ncm.GetChainMetrics(isc.RandomChainID())
cncm2 := ncm.GetChainMetrics(isc.RandomChainID())

// IN State output
outputID1 := &InStateOutput{OutputID: iotago.OutputID{1}}
Expand Down
2 changes: 1 addition & 1 deletion packages/webapi/services/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (e *EVMService) getEVMBackend(chainID isc.ChainID) (*chainServer, error) {
srv, err := jsonrpc.NewServer(
jsonrpc.NewEVMChain(backend, e.publisher, e.log.Named("EVMChain")),
jsonrpc.NewAccountManager(nil),
e.metrics.NewChainMetrics(chainID),
e.metrics.GetChainMetrics(chainID),
)
if err != nil {
return nil, err
Expand Down

0 comments on commit 53e90cf

Please sign in to comment.