Skip to content

Commit

Permalink
Refactor metric controller
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaroslav Kirillov committed Jul 7, 2023
1 parent 9187373 commit fba676d
Show file tree
Hide file tree
Showing 23 changed files with 211 additions and 170 deletions.
7 changes: 3 additions & 4 deletions fd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ type FileD struct {
metricCtl *metric.Ctl

// file_d metrics

longPanicMetric *prometheus.CounterVec
longPanicMetric prometheus.Counter
versionMetric *prometheus.CounterVec
}

Expand Down Expand Up @@ -63,10 +62,10 @@ func (f *FileD) Start() {
func (f *FileD) initMetrics() {
f.metricCtl = metric.NewCtl("file_d", f.registry)
f.longPanicMetric = f.metricCtl.RegisterCounter("long_panic", "Count of panics in the LongPanic")
f.versionMetric = f.metricCtl.RegisterCounter("version", "", "version")
f.versionMetric = f.metricCtl.RegisterCounterVec("version", "", "version")
f.versionMetric.WithLabelValues(buildinfo.Version).Inc()
longpanic.SetOnPanicHandler(func(_ error) {
f.longPanicMetric.WithLabelValues().Inc()
f.longPanicMetric.Inc()
})
}

Expand Down
179 changes: 117 additions & 62 deletions metric/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,90 +19,145 @@ type Ctl struct {
subsystem string
register *prometheus.Registry

counters map[string]*prometheus.CounterVec
counterMx *sync.Mutex
counters map[string]prometheus.Counter
counterMx *sync.RWMutex

gauges map[string]*prometheus.GaugeVec
gaugeMx *sync.Mutex
counterVecs map[string]*prometheus.CounterVec
counterVecMx *sync.RWMutex

histograms map[string]*prometheus.HistogramVec
histogramMx *sync.Mutex
gauges map[string]prometheus.Gauge
gaugeMx *sync.RWMutex

gaugeVecs map[string]*prometheus.GaugeVec
gaugeVecMx *sync.RWMutex

histograms map[string]prometheus.Histogram
histogramMx *sync.RWMutex

histogramVecs map[string]*prometheus.HistogramVec
histogramVecMx *sync.RWMutex
}

func NewCtl(subsystem string, registry *prometheus.Registry) *Ctl {
ctl := &Ctl{
subsystem: subsystem,
counters: make(map[string]*prometheus.CounterVec),
counterMx: new(sync.Mutex),
gauges: make(map[string]*prometheus.GaugeVec),
gaugeMx: new(sync.Mutex),
histograms: make(map[string]*prometheus.HistogramVec),
histogramMx: new(sync.Mutex),
register: registry,
subsystem: subsystem,
register: registry,
counters: make(map[string]prometheus.Counter),
counterMx: new(sync.RWMutex),
counterVecs: make(map[string]*prometheus.CounterVec),
counterVecMx: new(sync.RWMutex),
gauges: make(map[string]prometheus.Gauge),
gaugeMx: new(sync.RWMutex),
gaugeVecs: make(map[string]*prometheus.GaugeVec),
gaugeVecMx: new(sync.RWMutex),
histograms: make(map[string]prometheus.Histogram),
histogramMx: new(sync.RWMutex),
histogramVecs: make(map[string]*prometheus.HistogramVec),
histogramVecMx: new(sync.RWMutex),
}
return ctl
}

func (mc *Ctl) RegisterCounter(name, help string, labels ...string) *prometheus.CounterVec {
mc.counterMx.Lock()
defer mc.counterMx.Unlock()
func (mc *Ctl) RegisterCounter(name, help string) prometheus.Counter {
newCounter := func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
})
}

if metric, hasCounter := mc.counters[name]; hasCounter {
return metric
return registerMetric(mc.register, name, mc.counters, mc.counterMx, newCounter)
}

func (mc *Ctl) RegisterCounterVec(name, help string, labels ...string) *prometheus.CounterVec {
newCounterVec := func() *prometheus.CounterVec {
return prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
}, labels)
}

promCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
}, labels)

mc.counters[name] = promCounter
mc.register.Unregister(promCounter)
mc.register.MustRegister(promCounter)
return promCounter
return registerMetric(mc.register, name, mc.counterVecs, mc.counterVecMx, newCounterVec)
}

func (mc *Ctl) RegisterGauge(name, help string, labels ...string) *prometheus.GaugeVec {
mc.gaugeMx.Lock()
defer mc.gaugeMx.Unlock()
func (mc *Ctl) RegisterGauge(name, help string) prometheus.Gauge {
newGauge := func() prometheus.Gauge {
return prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
})
}

if metric, hasGauge := mc.gauges[name]; hasGauge {
return metric
return registerMetric(mc.register, name, mc.gauges, mc.gaugeMx, newGauge)
}

func (mc *Ctl) RegisterGaugeVec(name, help string, labels ...string) *prometheus.GaugeVec {
newGaugeVec := func() *prometheus.GaugeVec {
return prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
}, labels)
}

promGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
}, labels)

mc.gauges[name] = promGauge
mc.register.Unregister(promGauge)
mc.register.MustRegister(promGauge)
return promGauge
return registerMetric(mc.register, name, mc.gaugeVecs, mc.gaugeVecMx, newGaugeVec)
}

func (mc *Ctl) RegisterHistogram(name, help string, buckets []float64, labels ...string) *prometheus.HistogramVec {
mc.histogramMx.Lock()
defer mc.histogramMx.Unlock()
func (mc *Ctl) RegisterHistogram(name, help string, buckets []float64) prometheus.Histogram {
newHistogram := func() prometheus.Histogram {
return prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
Buckets: buckets,
})
}

return registerMetric(mc.register, name, mc.histograms, mc.histogramMx, newHistogram)
}

func (mc *Ctl) RegisterHistogramVec(name, help string, buckets []float64, labels ...string) *prometheus.HistogramVec {
newHistogramVec := func() *prometheus.HistogramVec {
return prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
Buckets: buckets,
}, labels)
}

if metric, hasHistogram := mc.histograms[name]; hasHistogram {
return registerMetric(mc.register, name, mc.histogramVecs, mc.histogramVecMx, newHistogramVec)
}

func registerMetric[T prometheus.Collector](register *prometheus.Registry, name string,
metrics map[string]T, mx *sync.RWMutex, newMetric func() T,
) T {
mx.RLock()
metric, has := metrics[name]
mx.RUnlock()
if has {
return metric
}

promHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
Buckets: buckets,
}, labels)

mc.histograms[name] = promHistogram
mc.register.Unregister(promHistogram)
mc.register.MustRegister(promHistogram)
return promHistogram
mx.Lock()
defer mx.Unlock()
metric, has = metrics[name]
if !has {
metric = newMetric()

metrics[name] = metric
register.Unregister(metric)
register.MustRegister(metric)
}

return metric
}
16 changes: 8 additions & 8 deletions pipeline/antispam/antispammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type Antispammer struct {
logger *zap.Logger

// antispammer metrics
activeMetric *prometheus.GaugeVec
banMetric *prometheus.GaugeVec
activeMetric prometheus.Gauge
banMetric prometheus.Gauge
exceptionMetric *prometheus.CounterVec
}

Expand Down Expand Up @@ -66,14 +66,14 @@ func NewAntispammer(o Options) *Antispammer {
banMetric: o.MetricsController.RegisterGauge("antispam_banned",
"How many times a source was banned",
),
exceptionMetric: o.MetricsController.RegisterCounter("antispam_exceptions",
exceptionMetric: o.MetricsController.RegisterCounterVec("antispam_exceptions",
"How many times an exception match with an event",
"name",
),
}

// not enabled by default
a.activeMetric.WithLabelValues().Set(0)
a.activeMetric.Set(0)

return a
}
Expand Down Expand Up @@ -119,8 +119,8 @@ func (a *Antispammer) IsSpam(id uint64, name string, isNewSource bool, event []b
x := src.counter.Inc()
if x == int32(a.threshold) {
src.counter.Swap(int32(a.unbanIterations * a.threshold))
a.activeMetric.WithLabelValues().Set(1)
a.banMetric.WithLabelValues().Inc()
a.activeMetric.Set(1)
a.banMetric.Inc()
a.logger.Warn("source has been banned",
zap.Uint64("id", id), zap.String("name", name))
}
Expand All @@ -147,7 +147,7 @@ func (a *Antispammer) Maintenance() {
}

if isMore && x < a.threshold {
a.banMetric.WithLabelValues().Dec()
a.banMetric.Dec()
a.logger.Info("source has been unbanned", zap.Uint64("id", sourceID))
}

Expand All @@ -163,7 +163,7 @@ func (a *Antispammer) Maintenance() {
}

if allUnbanned {
a.activeMetric.WithLabelValues().Set(0)
a.activeMetric.Set(0)
} else {
a.logger.Info("there are banned sources")
}
Expand Down
8 changes: 4 additions & 4 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ type Batcher struct {
outSeq int64
commitSeq int64

batchOutFnSeconds *prometheus.HistogramVec
commitWaitingSeconds *prometheus.HistogramVec
batchOutFnSeconds prometheus.Histogram
commitWaitingSeconds prometheus.Histogram
}

type (
Expand Down Expand Up @@ -138,7 +138,7 @@ func (b *Batcher) work() {
for batch := range b.fullBatches {
now := time.Now()
b.opts.OutFn(&data, batch)
b.batchOutFnSeconds.WithLabelValues().Observe(time.Since(now).Seconds())
b.batchOutFnSeconds.Observe(time.Since(now).Seconds())

events = b.commitBatch(events, batch)

Expand All @@ -164,7 +164,7 @@ func (b *Batcher) commitBatch(events []*Event, batch *Batch) []*Event {
b.cond.Wait()
}
b.commitSeq++
b.commitWaitingSeconds.WithLabelValues().Observe(time.Since(now).Seconds())
b.commitWaitingSeconds.Observe(time.Since(now).Seconds())

for _, e := range events {
b.opts.Controller.Commit(e)
Expand Down
Loading

0 comments on commit fba676d

Please sign in to comment.