From fba676d6848f2da1d4425e5ad13592188d99dbc5 Mon Sep 17 00:00:00 2001 From: Yaroslav Kirillov Date: Fri, 7 Jul 2023 19:15:10 +0500 Subject: [PATCH] Refactor metric controller --- fd/file.d.go | 7 +- metric/controller.go | 179 ++++++++++++------- pipeline/antispam/antispammer.go | 16 +- pipeline/batch.go | 8 +- pipeline/pipeline.go | 40 ++--- plugin/action/mask/mask.go | 3 +- plugin/action/parse_re2/parse_re2.go | 4 +- plugin/action/throttle/limiters_map.go | 6 +- plugin/action/throttle/throttle.go | 3 +- plugin/input/dmesg/dmesg.go | 7 +- plugin/input/file/file.go | 7 +- plugin/input/file/provider.go | 7 +- plugin/input/http/http.go | 7 +- plugin/input/journalctl/journalctl.go | 15 +- plugin/input/journalctl/reader.go | 8 +- plugin/input/kafka/kafka.go | 9 +- plugin/output/clickhouse/clickhouse.go | 9 +- plugin/output/elasticsearch/elasticsearch.go | 9 +- plugin/output/gelf/gelf.go | 7 +- plugin/output/kafka/kafka.go | 5 +- plugin/output/postgres/postgres.go | 13 +- plugin/output/s3/s3.go | 7 +- plugin/output/splunk/splunk.go | 5 +- 23 files changed, 211 insertions(+), 170 deletions(-) diff --git a/fd/file.d.go b/fd/file.d.go index 2afa50643..0f557f256 100644 --- a/fd/file.d.go +++ b/fd/file.d.go @@ -32,8 +32,7 @@ type FileD struct { metricCtl *metric.Ctl // file_d metrics - - longPanicMetric *prometheus.CounterVec + longPanicMetric prometheus.Counter versionMetric *prometheus.CounterVec } @@ -63,10 +62,10 @@ func (f *FileD) Start() { func (f *FileD) initMetrics() { f.metricCtl = metric.NewCtl("file_d", f.registry) f.longPanicMetric = f.metricCtl.RegisterCounter("long_panic", "Count of panics in the LongPanic") - f.versionMetric = f.metricCtl.RegisterCounter("version", "", "version") + f.versionMetric = f.metricCtl.RegisterCounterVec("version", "", "version") f.versionMetric.WithLabelValues(buildinfo.Version).Inc() longpanic.SetOnPanicHandler(func(_ error) { - f.longPanicMetric.WithLabelValues().Inc() + f.longPanicMetric.Inc() }) } diff --git a/metric/controller.go b/metric/controller.go index cb58258c4..23af56d4c 100644 --- a/metric/controller.go +++ b/metric/controller.go @@ -19,90 +19,145 @@ type Ctl struct { subsystem string register *prometheus.Registry - counters map[string]*prometheus.CounterVec - counterMx *sync.Mutex + counters map[string]prometheus.Counter + counterMx *sync.RWMutex - gauges map[string]*prometheus.GaugeVec - gaugeMx *sync.Mutex + counterVecs map[string]*prometheus.CounterVec + counterVecMx *sync.RWMutex - histograms map[string]*prometheus.HistogramVec - histogramMx *sync.Mutex + gauges map[string]prometheus.Gauge + gaugeMx *sync.RWMutex + + gaugeVecs map[string]*prometheus.GaugeVec + gaugeVecMx *sync.RWMutex + + histograms map[string]prometheus.Histogram + histogramMx *sync.RWMutex + + histogramVecs map[string]*prometheus.HistogramVec + histogramVecMx *sync.RWMutex } func NewCtl(subsystem string, registry *prometheus.Registry) *Ctl { ctl := &Ctl{ - subsystem: subsystem, - counters: make(map[string]*prometheus.CounterVec), - counterMx: new(sync.Mutex), - gauges: make(map[string]*prometheus.GaugeVec), - gaugeMx: new(sync.Mutex), - histograms: make(map[string]*prometheus.HistogramVec), - histogramMx: new(sync.Mutex), - register: registry, + subsystem: subsystem, + register: registry, + counters: make(map[string]prometheus.Counter), + counterMx: new(sync.RWMutex), + counterVecs: make(map[string]*prometheus.CounterVec), + counterVecMx: new(sync.RWMutex), + gauges: make(map[string]prometheus.Gauge), + gaugeMx: new(sync.RWMutex), + gaugeVecs: make(map[string]*prometheus.GaugeVec), + gaugeVecMx: new(sync.RWMutex), + histograms: make(map[string]prometheus.Histogram), + histogramMx: new(sync.RWMutex), + histogramVecs: make(map[string]*prometheus.HistogramVec), + histogramVecMx: new(sync.RWMutex), } return ctl } -func (mc *Ctl) RegisterCounter(name, help string, labels ...string) *prometheus.CounterVec { - mc.counterMx.Lock() - defer mc.counterMx.Unlock() +func (mc *Ctl) RegisterCounter(name, help string) prometheus.Counter { + newCounter := func() prometheus.Counter { + return prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: PromNamespace, + Subsystem: mc.subsystem, + Name: name, + Help: help, + }) + } - if metric, hasCounter := mc.counters[name]; hasCounter { - return metric + return registerMetric(mc.register, name, mc.counters, mc.counterMx, newCounter) +} + +func (mc *Ctl) RegisterCounterVec(name, help string, labels ...string) *prometheus.CounterVec { + newCounterVec := func() *prometheus.CounterVec { + return prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: PromNamespace, + Subsystem: mc.subsystem, + Name: name, + Help: help, + }, labels) } - promCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: PromNamespace, - Subsystem: mc.subsystem, - Name: name, - Help: help, - }, labels) - - mc.counters[name] = promCounter - mc.register.Unregister(promCounter) - mc.register.MustRegister(promCounter) - return promCounter + return registerMetric(mc.register, name, mc.counterVecs, mc.counterVecMx, newCounterVec) } -func (mc *Ctl) RegisterGauge(name, help string, labels ...string) *prometheus.GaugeVec { - mc.gaugeMx.Lock() - defer mc.gaugeMx.Unlock() +func (mc *Ctl) RegisterGauge(name, help string) prometheus.Gauge { + newGauge := func() prometheus.Gauge { + return prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: PromNamespace, + Subsystem: mc.subsystem, + Name: name, + Help: help, + }) + } - if metric, hasGauge := mc.gauges[name]; hasGauge { - return metric + return registerMetric(mc.register, name, mc.gauges, mc.gaugeMx, newGauge) +} + +func (mc *Ctl) RegisterGaugeVec(name, help string, labels ...string) *prometheus.GaugeVec { + newGaugeVec := func() *prometheus.GaugeVec { + return prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: PromNamespace, + Subsystem: mc.subsystem, + Name: name, + Help: help, + }, labels) } - promGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: PromNamespace, - Subsystem: mc.subsystem, - Name: name, - Help: help, - }, labels) - - mc.gauges[name] = promGauge - mc.register.Unregister(promGauge) - mc.register.MustRegister(promGauge) - return promGauge + return registerMetric(mc.register, name, mc.gaugeVecs, mc.gaugeVecMx, newGaugeVec) } -func (mc *Ctl) RegisterHistogram(name, help string, buckets []float64, labels ...string) *prometheus.HistogramVec { - mc.histogramMx.Lock() - defer mc.histogramMx.Unlock() +func (mc *Ctl) RegisterHistogram(name, help string, buckets []float64) prometheus.Histogram { + newHistogram := func() prometheus.Histogram { + return prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: PromNamespace, + Subsystem: mc.subsystem, + Name: name, + Help: help, + Buckets: buckets, + }) + } + + return registerMetric(mc.register, name, mc.histograms, mc.histogramMx, newHistogram) +} + +func (mc *Ctl) RegisterHistogramVec(name, help string, buckets []float64, labels ...string) *prometheus.HistogramVec { + newHistogramVec := func() *prometheus.HistogramVec { + return prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: PromNamespace, + Subsystem: mc.subsystem, + Name: name, + Help: help, + Buckets: buckets, + }, labels) + } - if metric, hasHistogram := mc.histograms[name]; hasHistogram { + return registerMetric(mc.register, name, mc.histogramVecs, mc.histogramVecMx, newHistogramVec) +} + +func registerMetric[T prometheus.Collector](register *prometheus.Registry, name string, + metrics map[string]T, mx *sync.RWMutex, newMetric func() T, +) T { + mx.RLock() + metric, has := metrics[name] + mx.RUnlock() + if has { return metric } - promHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: PromNamespace, - Subsystem: mc.subsystem, - Name: name, - Help: help, - Buckets: buckets, - }, labels) - - mc.histograms[name] = promHistogram - mc.register.Unregister(promHistogram) - mc.register.MustRegister(promHistogram) - return promHistogram + mx.Lock() + defer mx.Unlock() + metric, has = metrics[name] + if !has { + metric = newMetric() + + metrics[name] = metric + register.Unregister(metric) + register.MustRegister(metric) + } + + return metric } diff --git a/pipeline/antispam/antispammer.go b/pipeline/antispam/antispammer.go index 6d2f36f71..912205633 100644 --- a/pipeline/antispam/antispammer.go +++ b/pipeline/antispam/antispammer.go @@ -27,8 +27,8 @@ type Antispammer struct { logger *zap.Logger // antispammer metrics - activeMetric *prometheus.GaugeVec - banMetric *prometheus.GaugeVec + activeMetric prometheus.Gauge + banMetric prometheus.Gauge exceptionMetric *prometheus.CounterVec } @@ -66,14 +66,14 @@ func NewAntispammer(o Options) *Antispammer { banMetric: o.MetricsController.RegisterGauge("antispam_banned", "How many times a source was banned", ), - exceptionMetric: o.MetricsController.RegisterCounter("antispam_exceptions", + exceptionMetric: o.MetricsController.RegisterCounterVec("antispam_exceptions", "How many times an exception match with an event", "name", ), } // not enabled by default - a.activeMetric.WithLabelValues().Set(0) + a.activeMetric.Set(0) return a } @@ -119,8 +119,8 @@ func (a *Antispammer) IsSpam(id uint64, name string, isNewSource bool, event []b x := src.counter.Inc() if x == int32(a.threshold) { src.counter.Swap(int32(a.unbanIterations * a.threshold)) - a.activeMetric.WithLabelValues().Set(1) - a.banMetric.WithLabelValues().Inc() + a.activeMetric.Set(1) + a.banMetric.Inc() a.logger.Warn("source has been banned", zap.Uint64("id", id), zap.String("name", name)) } @@ -147,7 +147,7 @@ func (a *Antispammer) Maintenance() { } if isMore && x < a.threshold { - a.banMetric.WithLabelValues().Dec() + a.banMetric.Dec() a.logger.Info("source has been unbanned", zap.Uint64("id", sourceID)) } @@ -163,7 +163,7 @@ func (a *Antispammer) Maintenance() { } if allUnbanned { - a.activeMetric.WithLabelValues().Set(0) + a.activeMetric.Set(0) } else { a.logger.Info("there are banned sources") } diff --git a/pipeline/batch.go b/pipeline/batch.go index 2337ce079..f36926f0d 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -80,8 +80,8 @@ type Batcher struct { outSeq int64 commitSeq int64 - batchOutFnSeconds *prometheus.HistogramVec - commitWaitingSeconds *prometheus.HistogramVec + batchOutFnSeconds prometheus.Histogram + commitWaitingSeconds prometheus.Histogram } type ( @@ -138,7 +138,7 @@ func (b *Batcher) work() { for batch := range b.fullBatches { now := time.Now() b.opts.OutFn(&data, batch) - b.batchOutFnSeconds.WithLabelValues().Observe(time.Since(now).Seconds()) + b.batchOutFnSeconds.Observe(time.Since(now).Seconds()) events = b.commitBatch(events, batch) @@ -164,7 +164,7 @@ func (b *Batcher) commitBatch(events []*Event, batch *Batch) []*Event { b.cond.Wait() } b.commitSeq++ - b.commitWaitingSeconds.WithLabelValues().Observe(time.Since(now).Seconds()) + b.commitWaitingSeconds.Observe(time.Since(now).Seconds()) for _, e := range events { b.opts.Controller.Commit(e) diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 49b58d25f..0a8794191 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -115,15 +115,15 @@ type Pipeline struct { maxSize int // all pipeline`s metrics - inUseEventsMetric *prometheus.GaugeVec - eventPoolCapacityMetric *prometheus.GaugeVec - inputEventsCountMetric *prometheus.CounterVec - inputEventSizeMetric *prometheus.CounterVec - outputEventsCountMetric *prometheus.CounterVec - outputEventSizeMetric *prometheus.CounterVec - readOpsEventsSizeMetric *prometheus.CounterVec - wrongEventCRIFormatMetric *prometheus.CounterVec - maxEventSizeExceededMetric *prometheus.CounterVec + inUseEventsMetric prometheus.Gauge + eventPoolCapacityMetric prometheus.Gauge + inputEventsCountMetric prometheus.Counter + inputEventSizeMetric prometheus.Counter + outputEventsCountMetric prometheus.Counter + outputEventSizeMetric prometheus.Counter + readOpsEventsSizeMetric prometheus.Counter + wrongEventCRIFormatMetric prometheus.Counter + maxEventSizeExceededMetric prometheus.Counter } type Settings struct { @@ -204,7 +204,7 @@ func (p *Pipeline) IncReadOps() { } func (p *Pipeline) IncMaxEventSizeExceeded() { - p.maxEventSizeExceededMetric.WithLabelValues().Inc() + p.maxEventSizeExceededMetric.Inc() } func (p *Pipeline) registerMetrics() { @@ -221,7 +221,7 @@ func (p *Pipeline) registerMetrics() { } func (p *Pipeline) setDefaultMetrics() { - p.eventPoolCapacityMetric.WithLabelValues().Set(float64(p.settings.Capacity)) + p.eventPoolCapacityMetric.Set(float64(p.settings.Capacity)) } // SetupHTTPHandlers creates handlers for plugin endpoints and pipeline info. @@ -370,7 +370,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes } else if dec == decoder.CRI { row, err = decoder.DecodeCRI(bytes) if err != nil { - p.wrongEventCRIFormatMetric.WithLabelValues().Inc() + p.wrongEventCRIFormatMetric.Inc() p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offset, length, err.Error(), sourceID, sourceName, bytes)) return EventSeqIDError } @@ -568,14 +568,14 @@ func (p *Pipeline) AddAction(info *ActionPluginStaticInfo) { } labels = append(labels, info.MetricLabels...) - count := mCtl.RegisterCounter( + count := mCtl.RegisterCounterVec( info.MetricName+"_events_count_total", fmt.Sprintf("how many events processed by pipeline %q and #%d action", p.Name, len(p.actionInfos)-1), labels..., ) countWrapper := p.metricHolder.AddCounterVec(count) - size := mCtl.RegisterCounter( + size := mCtl.RegisterCounterVec( info.MetricName+"_events_size_total", fmt.Sprintf("total size of events processed by pipeline %q and #%d action", p.Name, len(p.actionInfos)-1), labels..., @@ -714,17 +714,17 @@ func (p *Pipeline) incMetrics(inputEvents, inputSize, outputEvents, outputSize, deltaReads, } - p.inputEventsCountMetric.WithLabelValues().Add(myDeltas.deltaInputEvents) - p.inputEventSizeMetric.WithLabelValues().Add(myDeltas.deltaInputSize) - p.outputEventsCountMetric.WithLabelValues().Add(myDeltas.deltaOutputEvents) - p.outputEventSizeMetric.WithLabelValues().Add(myDeltas.deltaOutputSize) - p.readOpsEventsSizeMetric.WithLabelValues().Add(myDeltas.deltaReads) + p.inputEventsCountMetric.Add(myDeltas.deltaInputEvents) + p.inputEventSizeMetric.Add(myDeltas.deltaInputSize) + p.outputEventsCountMetric.Add(myDeltas.deltaOutputEvents) + p.outputEventSizeMetric.Add(myDeltas.deltaOutputSize) + p.readOpsEventsSizeMetric.Add(myDeltas.deltaReads) return myDeltas } func (p *Pipeline) setMetrics(inUseEvents atomic.Int64) { - p.inUseEventsMetric.WithLabelValues().Set(float64(inUseEvents.Load())) + p.inUseEventsMetric.Set(float64(inUseEvents.Load())) } func (p *Pipeline) maintenance() { diff --git a/plugin/action/mask/mask.go b/plugin/action/mask/mask.go index fa3c4b8da..5f1e51c11 100644 --- a/plugin/action/mask/mask.go +++ b/plugin/action/mask/mask.go @@ -52,7 +52,6 @@ type Plugin struct { logger *zap.Logger // plugin metrics - maskAppliedMetric *prometheus.CounterVec } @@ -168,7 +167,7 @@ func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string labelNames = append(labelNames, label) } - return ctl.RegisterCounter(name, help, labelNames...) + return ctl.RegisterCounterVec(name, help, labelNames...) } func compileMasks(masks []Mask, logger *zap.Logger) []Mask { diff --git a/plugin/action/parse_re2/parse_re2.go b/plugin/action/parse_re2/parse_re2.go index 7dcfc48a3..0914207e2 100644 --- a/plugin/action/parse_re2/parse_re2.go +++ b/plugin/action/parse_re2/parse_re2.go @@ -20,7 +20,7 @@ type Plugin struct { re *regexp.Regexp // plugin metrics - eventNotMatchingPatternMetric *prometheus.CounterVec + eventNotMatchingPatternMetric prometheus.Counter } // ! config-params @@ -73,7 +73,7 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { sm := p.re.FindSubmatch(jsonNode.AsBytes()) if len(sm) == 0 { - p.eventNotMatchingPatternMetric.WithLabelValues().Inc() + p.eventNotMatchingPatternMetric.Inc() return pipeline.ActionPass } diff --git a/plugin/action/throttle/limiters_map.go b/plugin/action/throttle/limiters_map.go index e41d02440..a8322745d 100644 --- a/plugin/action/throttle/limiters_map.go +++ b/plugin/action/throttle/limiters_map.go @@ -71,7 +71,7 @@ type limitersMapConfig struct { limiterCfg *limiterConfig - mapSizeMetric *prometheus.GaugeVec + mapSizeMetric prometheus.Gauge } // limitersMap is auxiliary type for storing the map of strings to limiters with additional info for cleanup @@ -92,7 +92,7 @@ type limitersMap struct { limiterCfg *limiterConfig - mapSizeMetric *prometheus.GaugeVec + mapSizeMetric prometheus.Gauge } func newLimitersMap(lmCfg limitersMapConfig, redisOpts *redis.Options) *limitersMap { @@ -214,7 +214,7 @@ func (l *limitersMap) maintenance(ctx context.Context) { delete(l.lims, key) } mapSize := float64(len(l.lims)) - l.mapSizeMetric.WithLabelValues().Set(mapSize) + l.mapSizeMetric.Set(mapSize) l.mu.Unlock() } } diff --git a/plugin/action/throttle/throttle.go b/plugin/action/throttle/throttle.go index a578a3f73..9f0f1884e 100644 --- a/plugin/action/throttle/throttle.go +++ b/plugin/action/throttle/throttle.go @@ -45,7 +45,8 @@ type Plugin struct { limiterBuf []byte rules []*rule - limitersMapSizeMetric *prometheus.GaugeVec + // plugin metrics + limitersMapSizeMetric prometheus.Gauge } // ! config-params diff --git a/plugin/input/dmesg/dmesg.go b/plugin/input/dmesg/dmesg.go index bba3957f9..c521a6fb9 100644 --- a/plugin/input/dmesg/dmesg.go +++ b/plugin/input/dmesg/dmesg.go @@ -29,8 +29,7 @@ type Plugin struct { logger *zap.SugaredLogger // plugin metrics - - offsetErrorsMetric *prometheus.CounterVec + offsetErrorsMetric prometheus.Counter } // ! config-params @@ -66,7 +65,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa p.state = &state{} if err := offset.LoadYAML(p.config.OffsetsFile, p.state); err != nil { - p.offsetErrorsMetric.WithLabelValues().Inc() + p.offsetErrorsMetric.Inc() p.logger.Error("can't load offset file: %s", err.Error()) } @@ -127,7 +126,7 @@ func (p *Plugin) Commit(event *pipeline.Event) { p.state.TS = event.Offset if err := offset.SaveYAML(p.config.OffsetsFile, p.state); err != nil { - p.offsetErrorsMetric.WithLabelValues().Inc() + p.offsetErrorsMetric.Inc() p.logger.Errorf("can't save offset file: %s", err.Error()) } } diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go index e452e4673..885e22093 100644 --- a/plugin/input/file/file.go +++ b/plugin/input/file/file.go @@ -55,9 +55,8 @@ type Plugin struct { jobProvider *jobProvider // plugin metrics - - possibleOffsetCorruptionMetric *prometheus.CounterVec - alreadyWrittenEventsSkippedMetric *prometheus.CounterVec + possibleOffsetCorruptionMetric prometheus.Counter + alreadyWrittenEventsSkippedMetric prometheus.Counter } type persistenceMode int @@ -251,7 +250,7 @@ func (p *Plugin) PassEvent(event *pipeline.Event) bool { // and file-d went down after commit pass := event.Offset > savedOffset if !pass { - p.alreadyWrittenEventsSkippedMetric.WithLabelValues().Inc() + p.alreadyWrittenEventsSkippedMetric.Inc() return false } diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go index 1ad54f3e3..f8a76750e 100644 --- a/plugin/input/file/provider.go +++ b/plugin/input/file/provider.go @@ -55,8 +55,7 @@ type jobProvider struct { logger *zap.SugaredLogger // provider metrics - - possibleOffsetCorruptionMetric *prometheus.CounterVec + possibleOffsetCorruptionMetric prometheus.Counter } type Job struct { @@ -100,7 +99,7 @@ type symlinkInfo struct { inode inodeID } -func NewJobProvider(config *Config, possibleOffsetCorruptionMetric *prometheus.CounterVec, sugLogger *zap.SugaredLogger) *jobProvider { +func NewJobProvider(config *Config, possibleOffsetCorruptionMetric prometheus.Counter, sugLogger *zap.SugaredLogger) *jobProvider { jp := &jobProvider{ config: config, offsetDB: newOffsetDB(config.OffsetsFile, config.OffsetsFileTmp), @@ -198,7 +197,7 @@ func (jp *jobProvider) commit(event *pipeline.Event) { } if value == 0 && event.Offset >= 16*1024*1024 { - jp.possibleOffsetCorruptionMetric.WithLabelValues().Inc() + jp.possibleOffsetCorruptionMetric.Inc() jp.logger.Errorf("it maybe an offset corruption: committing=%d, current=%d, event id=%d, source=%d:%s", event.Offset, value, event.SeqID, event.SourceID, event.SourceName) } diff --git a/plugin/input/http/http.go b/plugin/input/http/http.go index 759997574..344465382 100644 --- a/plugin/input/http/http.go +++ b/plugin/input/http/http.go @@ -93,8 +93,7 @@ type Plugin struct { logger *zap.SugaredLogger // plugin metrics - - httpErrorMetric *prometheus.CounterVec + httpErrorMetric prometheus.Counter } // ! config-params @@ -229,7 +228,7 @@ func (p *Plugin) serve(w http.ResponseWriter, r *http.Request) { } if err != nil && err != io.EOF { - p.httpErrorMetric.WithLabelValues().Inc() + p.httpErrorMetric.Inc() logger.Errorf("http input read error: %s", err.Error()) break } @@ -249,7 +248,7 @@ func (p *Plugin) serve(w http.ResponseWriter, r *http.Request) { _, err := w.Write(result) if err != nil { - p.httpErrorMetric.WithLabelValues().Inc() + p.httpErrorMetric.Inc() logger.Errorf("can't write response: %s", err.Error()) } } diff --git a/plugin/input/journalctl/journalctl.go b/plugin/input/journalctl/journalctl.go index 785f59768..6786c39e2 100644 --- a/plugin/input/journalctl/journalctl.go +++ b/plugin/input/journalctl/journalctl.go @@ -21,10 +21,9 @@ type Plugin struct { offInfo *offsetInfo // plugin metrics - - offsetErrorsMetric *prometheus.CounterVec - journalCtlStopErrorMetric *prometheus.CounterVec - readerErrorsMetric *prometheus.CounterVec + offsetErrorsMetric prometheus.Counter + journalCtlStopErrorMetric prometheus.Counter + readerErrorsMetric prometheus.Counter } type Config struct { @@ -84,7 +83,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa p.offInfo = &offsetInfo{} if err := offset.LoadYAML(p.config.OffsetsFile, p.offInfo); err != nil { - p.offsetErrorsMetric.WithLabelValues().Inc() + p.offsetErrorsMetric.Inc() p.params.Logger.Error("can't load offset file: %s", err.Error()) } @@ -110,12 +109,12 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) { func (p *Plugin) Stop() { err := p.reader.stop() if err != nil { - p.journalCtlStopErrorMetric.WithLabelValues().Inc() + p.journalCtlStopErrorMetric.Inc() p.params.Logger.Error("can't stop journalctl cmd: %s", err.Error()) } if err := offset.SaveYAML(p.config.OffsetsFile, p.offInfo); err != nil { - p.offsetErrorsMetric.WithLabelValues().Inc() + p.offsetErrorsMetric.Inc() p.params.Logger.Error("can't save offset file: %s", err.Error()) } } @@ -124,7 +123,7 @@ func (p *Plugin) Commit(event *pipeline.Event) { p.offInfo.set(pipeline.CloneString(event.Root.Dig("__CURSOR").AsString())) if err := offset.SaveYAML(p.config.OffsetsFile, p.offInfo); err != nil { - p.offsetErrorsMetric.WithLabelValues().Inc() + p.offsetErrorsMetric.Inc() p.params.Logger.Error("can't save offset file: %s", err.Error()) } } diff --git a/plugin/input/journalctl/reader.go b/plugin/input/journalctl/reader.go index 31fe1476b..3d71bce4c 100644 --- a/plugin/input/journalctl/reader.go +++ b/plugin/input/journalctl/reader.go @@ -27,7 +27,7 @@ type journalReader struct { args []string // reader metrics - readerErrorsMetric *prometheus.CounterVec + readerErrorsMetric prometheus.Counter } //nolint:unused @@ -50,13 +50,13 @@ func (r *journalReader) readLines(rd io.Reader, config *journalReaderConfig) { break } if err != nil { - r.readerErrorsMetric.WithLabelValues().Inc() + r.readerErrorsMetric.Inc() config.logger.Error(err) continue } _, err = config.output.Write(bytes) if err != nil { - r.readerErrorsMetric.WithLabelValues().Inc() + r.readerErrorsMetric.Inc() config.logger.Error(err) } @@ -68,7 +68,7 @@ func (r *journalReader) readLines(rd io.Reader, config *journalReaderConfig) { } //nolint:deadcode,unused -func newJournalReader(config *journalReaderConfig, readerErrorsCounter *prometheus.CounterVec) *journalReader { +func newJournalReader(config *journalReaderConfig, readerErrorsCounter prometheus.Counter) *journalReader { res := &journalReader{ config: config, readerErrorsMetric: readerErrorsCounter, diff --git a/plugin/input/kafka/kafka.go b/plugin/input/kafka/kafka.go index a19c55551..2276a73df 100644 --- a/plugin/input/kafka/kafka.go +++ b/plugin/input/kafka/kafka.go @@ -50,9 +50,8 @@ type Plugin struct { idByTopic map[string]int // plugin metrics - - commitErrorsMetric *prometheus.CounterVec - consumeErrorsMetric *prometheus.CounterVec + commitErrorsMetric prometheus.Counter + consumeErrorsMetric prometheus.Counter } // ! config-params @@ -129,7 +128,7 @@ func (p *Plugin) consume(ctx context.Context) { for { err := p.consumerGroup.Consume(ctx, p.config.Topics, p) if err != nil { - p.consumeErrorsMetric.WithLabelValues().Inc() + p.consumeErrorsMetric.Inc() p.logger.Errorf("can't consume from kafka: %s", err.Error()) } @@ -145,7 +144,7 @@ func (p *Plugin) Stop() { func (p *Plugin) Commit(event *pipeline.Event) { if p.session == nil { - p.commitErrorsMetric.WithLabelValues().Inc() + p.commitErrorsMetric.Inc() p.logger.Errorf("no kafka consumer session for event commit") return } diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index 04b756bee..6c0d5a7c7 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -54,9 +54,8 @@ type Plugin struct { requestID atomic.Int64 // plugin metrics - - insertErrorsMetric *prometheus.CounterVec - queriesCountMetric *prometheus.CounterVec + insertErrorsMetric prometheus.Counter + queriesCountMetric prometheus.Counter } type Setting struct { @@ -463,7 +462,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { if err == nil { break } - p.insertErrorsMetric.WithLabelValues().Inc() + p.insertErrorsMetric.Inc() time.Sleep(p.config.Retention_) p.logger.Error("an attempt to insert a batch failed", zap.Error(err)) } @@ -475,7 +474,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { } func (p *Plugin) do(clickhouse Clickhouse, queryInput proto.Input) error { - defer p.queriesCountMetric.WithLabelValues().Inc() + defer p.queriesCountMetric.Inc() ctx, cancel := context.WithTimeout(p.ctx, p.config.InsertTimeout_) defer cancel() diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go index ff7c0ff61..9361d0694 100644 --- a/plugin/output/elasticsearch/elasticsearch.go +++ b/plugin/output/elasticsearch/elasticsearch.go @@ -51,9 +51,8 @@ type Plugin struct { mu *sync.Mutex // plugin metrics - - sendErrorMetric *prometheus.CounterVec - indexingErrorsMetric *prometheus.CounterVec + sendErrorMetric prometheus.Counter + indexingErrorsMetric prometheus.Counter } // ! config-params @@ -257,7 +256,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { for { if err := p.send(data.outBuf); err != nil { - p.sendErrorMetric.WithLabelValues().Inc() + p.sendErrorMetric.Inc() p.logger.Errorf("can't send to the elastic, will try other endpoint: %s", err.Error()) } else { break @@ -307,7 +306,7 @@ func (p *Plugin) send(body []byte) error { } if errors != 0 { - p.indexingErrorsMetric.WithLabelValues().Add(float64(errors)) + p.indexingErrorsMetric.Add(float64(errors)) } p.controller.Error("some events from batch aren't written") diff --git a/plugin/output/gelf/gelf.go b/plugin/output/gelf/gelf.go index 05a39e18c..f8d30b907 100644 --- a/plugin/output/gelf/gelf.go +++ b/plugin/output/gelf/gelf.go @@ -45,8 +45,7 @@ type Plugin struct { controller pipeline.OutputPluginController // plugin metrics - - sendErrorMetric *prometheus.CounterVec + sendErrorMetric prometheus.Counter } // ! config-params @@ -250,7 +249,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { gelf, err := newClient(p.config.Endpoint, p.config.ConnectionTimeout_, p.config.WriteTimeout_, false, nil) if err != nil { - p.sendErrorMetric.WithLabelValues().Inc() + p.sendErrorMetric.Inc() p.logger.Errorf("can't connect to gelf endpoint address=%s: %s", p.config.Endpoint, err.Error()) time.Sleep(time.Second) continue @@ -260,7 +259,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { _, err := data.gelf.send(outBuf) if err != nil { - p.sendErrorMetric.WithLabelValues().Inc() + p.sendErrorMetric.Inc() p.logger.Errorf("can't send data to gelf address=%s, err: %s", p.config.Endpoint, err.Error()) _ = data.gelf.close() data.gelf = nil diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index 30da23fce..083fc3369 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -38,8 +38,7 @@ type Plugin struct { batcher *pipeline.Batcher // plugin metrics - - sendErrorMetric *prometheus.CounterVec + sendErrorMetric prometheus.Counter } // ! config-params @@ -208,7 +207,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { for _, e := range errs { p.logger.Errorf("can't write batch: %s", e.Err.Error()) } - p.sendErrorMetric.WithLabelValues().Add(float64(len(errs))) + p.sendErrorMetric.Add(float64(len(errs))) p.controller.Error("some events from batch were not written") } } diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index fc6a730ba..6e45e318e 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -71,10 +71,9 @@ type Plugin struct { pool PgxIface // plugin metrics - - discardedEventMetric *prometheus.CounterVec - duplicatedEventMetric *prometheus.CounterVec - writtenEventMetric *prometheus.CounterVec + discardedEventMetric prometheus.Counter + duplicatedEventMetric prometheus.Counter + writtenEventMetric prometheus.Counter } type ConfigColumn struct { @@ -272,7 +271,7 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { switch { case errors.Is(err, ErrEventDoesntHaveField), errors.Is(err, ErrEventFieldHasWrongType), errors.Is(err, ErrTimestampFromDistantPastOrFuture): - p.discardedEventMetric.WithLabelValues().Inc() + p.discardedEventMetric.Inc() if p.config.StrictFields || p.config.Strict { p.logger.Fatal(err) } @@ -286,7 +285,7 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { // passes here only if event valid. if _, ok := uniqueEventsMap[uniqueID]; ok { - p.duplicatedEventMetric.WithLabelValues().Inc() + p.duplicatedEventMetric.Inc() p.logger.Infof("event duplicated. Fields: %v, values: %v", pgFields, fieldValues) } else { if uniqueID != "" { @@ -324,7 +323,7 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { time.Sleep(p.config.Retention_) continue } - p.writtenEventMetric.WithLabelValues().Add(float64(len(uniqueEventsMap))) + p.writtenEventMetric.Add(float64(len(uniqueEventsMap))) break } diff --git a/plugin/output/s3/s3.go b/plugin/output/s3/s3.go index 9147eb736..c2bd673c1 100644 --- a/plugin/output/s3/s3.go +++ b/plugin/output/s3/s3.go @@ -153,8 +153,7 @@ type Plugin struct { compressor compressor // plugin metrics - - sendErrorMetric *prometheus.CounterVec + sendErrorMetric prometheus.Counter uploadFileMetric *prometheus.CounterVec } @@ -270,7 +269,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP func (p *Plugin) registerMetrics(ctl *metric.Ctl) { p.sendErrorMetric = ctl.RegisterCounter("output_s3_send_error", "Total s3 send errors") - p.uploadFileMetric = ctl.RegisterCounter("output_s3_upload_file", "Total files upload", "bucket_name") + p.uploadFileMetric = ctl.RegisterCounterVec("output_s3_upload_file", "Total files upload", "bucket_name") } func (p *Plugin) StartWithMinio(config pipeline.AnyConfig, params *pipeline.OutputPluginParams, factory objStoreFactory) { @@ -563,7 +562,7 @@ func (p *Plugin) uploadToS3(compressedDTO fileDTO) error { ) if err != nil { - p.sendErrorMetric.WithLabelValues().Inc() + p.sendErrorMetric.Inc() return fmt.Errorf("could not upload file: %s into bucket: %s, error: %s", compressedDTO.fileName, compressedDTO.bucketName, err.Error()) } return nil diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index 25c8f7270..3d86213a9 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -36,8 +36,7 @@ type Plugin struct { controller pipeline.OutputPluginController // plugin metrics - - sendErrorMetric *prometheus.CounterVec + sendErrorMetric prometheus.Counter } // ! config-params @@ -165,7 +164,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { for { err := p.send(outBuf) if err != nil { - p.sendErrorMetric.WithLabelValues().Inc() + p.sendErrorMetric.Inc() p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, err.Error()) time.Sleep(time.Second)