Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into 351-speedup-batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Jul 4, 2023
2 parents d680078 + 5866254 commit 29604e1
Show file tree
Hide file tree
Showing 16 changed files with 73 additions and 42 deletions.
35 changes: 20 additions & 15 deletions metric/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,55 @@ package metric
import (
"sync"

prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus"
)

const (
PromNamespace = "file_d"
)

var (
SecondsBucketsDetailed = prometheus.ExponentialBuckets(0.0005, 2, 16) // covers range from 500us to 16.384s
SecondsBucketsLong = prometheus.ExponentialBuckets(0.005, 2, 16) // covers range from 5ms to 163.84s
)

type Ctl struct {
subsystem string
register *prom.Registry
register *prometheus.Registry

counters map[string]*prom.CounterVec
counters map[string]*prometheus.CounterVec
counterMx *sync.Mutex

gauges map[string]*prom.GaugeVec
gauges map[string]*prometheus.GaugeVec
gaugeMx *sync.Mutex

histograms map[string]*prom.HistogramVec
histograms map[string]*prometheus.HistogramVec
histogramMx *sync.Mutex
}

func New(subsystem string, registry *prom.Registry) *Ctl {
func New(subsystem string, registry *prometheus.Registry) *Ctl {
ctl := &Ctl{
subsystem: subsystem,
counters: make(map[string]*prom.CounterVec),
counters: make(map[string]*prometheus.CounterVec),
counterMx: new(sync.Mutex),
gauges: make(map[string]*prom.GaugeVec),
gauges: make(map[string]*prometheus.GaugeVec),
gaugeMx: new(sync.Mutex),
histograms: make(map[string]*prom.HistogramVec),
histograms: make(map[string]*prometheus.HistogramVec),
histogramMx: new(sync.Mutex),
register: registry,
}
return ctl
}

func (mc *Ctl) RegisterCounter(name, help string, labels ...string) *prom.CounterVec {
func (mc *Ctl) RegisterCounter(name, help string, labels ...string) *prometheus.CounterVec {
mc.counterMx.Lock()
defer mc.counterMx.Unlock()

if metric, hasCounter := mc.counters[name]; hasCounter {
return metric
}

promCounter := prom.NewCounterVec(prom.CounterOpts{
promCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Expand All @@ -59,15 +64,15 @@ func (mc *Ctl) RegisterCounter(name, help string, labels ...string) *prom.Counte
return promCounter
}

func (mc *Ctl) RegisterGauge(name, help string, labels ...string) *prom.GaugeVec {
func (mc *Ctl) RegisterGauge(name, help string, labels ...string) *prometheus.GaugeVec {
mc.gaugeMx.Lock()
defer mc.gaugeMx.Unlock()

if metric, hasGauge := mc.gauges[name]; hasGauge {
return metric
}

promGauge := prom.NewGaugeVec(prom.GaugeOpts{
promGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Expand All @@ -80,15 +85,15 @@ func (mc *Ctl) RegisterGauge(name, help string, labels ...string) *prom.GaugeVec
return promGauge
}

func (mc *Ctl) RegisterHistogram(name, help string, buckets []float64, labels ...string) *prom.HistogramVec {
func (mc *Ctl) RegisterHistogram(name, help string, buckets []float64, labels ...string) *prometheus.HistogramVec {
mc.histogramMx.Lock()
defer mc.histogramMx.Unlock()

if metric, hasHistogram := mc.histograms[name]; hasHistogram {
return metric
}

promHistogram := prom.NewHistogramVec(prom.HistogramOpts{
promHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Expand Down
8 changes: 4 additions & 4 deletions pipeline/antispam/antispammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/ozontech/file.d/cfg/matchrule"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"
)
Expand All @@ -27,9 +27,9 @@ type Antispammer struct {
logger *zap.Logger

// antispammer metrics
activeMetric *prom.GaugeVec
banMetric *prom.GaugeVec
exceptionMetric *prom.CounterVec
activeMetric *prometheus.GaugeVec
banMetric *prometheus.GaugeVec
exceptionMetric *prometheus.CounterVec
}

type source struct {
Expand Down
15 changes: 15 additions & 0 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -80,6 +82,9 @@ type Batcher struct {

outSeq int64
commitSeq int64

batchOutFnSeconds *prometheus.HistogramVec
commitWaitingSeconds *prometheus.HistogramVec
}

type (
Expand All @@ -97,6 +102,7 @@ type (
BatchSizeBytes int
FlushTimeout time.Duration
MaintenanceInterval time.Duration
MetricCtl *metric.Ctl
}
)

Expand All @@ -109,6 +115,10 @@ func (b *Batcher) Start(_ context.Context) {
b.mu = &sync.Mutex{}
b.seqMu = &sync.Mutex{}
b.cond = sync.NewCond(b.seqMu)
b.batchOutFnSeconds = b.opts.MetricCtl.
RegisterHistogram("batcher_out_fn_seconds", "", metric.SecondsBucketsLong)
b.commitWaitingSeconds = b.opts.MetricCtl.
RegisterHistogram("batcher_commit_waiting_seconds", "", metric.SecondsBucketsDetailed)

b.freeBatches = make(chan *Batch, b.opts.Workers)
b.fullBatches = make(chan *Batch, b.opts.Workers)
Expand All @@ -129,7 +139,10 @@ func (b *Batcher) work() {
events := make([]*Event, 0)
data := WorkerData(nil)
for batch := range b.fullBatches {
now := time.Now()
b.opts.OutFn(&data, batch)
b.batchOutFnSeconds.WithLabelValues().Observe(time.Since(now).Seconds())

events = b.commitBatch(events, batch)

shouldRunMaintenance := b.opts.MaintenanceFn != nil && b.opts.MaintenanceInterval != 0 && time.Since(t) > b.opts.MaintenanceInterval
Expand All @@ -153,12 +166,14 @@ func (b *Batcher) commitBatch(events []*Event, batch *Batch) []*Event {
b.opts.Controller.ReleaseEvents(events)
events = eventOffsets

now := time.Now()
// let's restore the sequence of batches to make sure input will commit offsets incrementally
b.seqMu.Lock()
for b.commitSeq != batchSeq {
b.cond.Wait()
}
b.commitSeq++
b.commitWaitingSeconds.WithLabelValues().Observe(time.Since(now).Seconds())

for i := range events {
b.opts.Controller.Commit(events[i], false)
Expand Down
4 changes: 4 additions & 0 deletions pipeline/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/atomic"
Expand Down Expand Up @@ -75,6 +77,7 @@ func TestBatcher(t *testing.T) {
Workers: 8,
BatchSizeCount: batchSize,
FlushTimeout: time.Second,
MetricCtl: metric.New("", prometheus.NewRegistry()),
})

ctx := context.TODO()
Expand Down Expand Up @@ -145,6 +148,7 @@ func TestBatcherMaxSize(t *testing.T) {
Workers: 8,
BatchSizeBytes: batchSize,
FlushTimeout: time.Minute,
MetricCtl: metric.New("", prometheus.NewRegistry()),
})

batcher.Start(context.Background())
Expand Down
8 changes: 4 additions & 4 deletions plugin/action/mask/mask.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -53,7 +53,7 @@ type Plugin struct {

// plugin metrics

maskAppliedMetric *prom.CounterVec
maskAppliedMetric *prometheus.CounterVec
}

// ! config-params
Expand Down Expand Up @@ -135,7 +135,7 @@ type Mask struct {
MetricLabels []string `json:"metric_labels"` // *

// mask metric
appliedMetric *prom.CounterVec
appliedMetric *prometheus.CounterVec
}

func init() {
Expand All @@ -149,7 +149,7 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
return &Plugin{}, &Config{}
}

func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string) *prom.CounterVec {
func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string) *prometheus.CounterVec {
if name == "" {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/parse_re2/parse_re2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus"
insaneJSON "github.com/vitkovskii/insane-json"
)

Expand All @@ -20,7 +20,7 @@ type Plugin struct {
re *regexp.Regexp

// plugin metrics
eventNotMatchingPatternMetric *prom.CounterVec
eventNotMatchingPatternMetric *prometheus.CounterVec
}

// ! config-params
Expand Down
6 changes: 3 additions & 3 deletions plugin/action/throttle/limiters_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/go-redis/redis"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"

Expand Down Expand Up @@ -71,7 +71,7 @@ type limitersMapConfig struct {

limiterCfg *limiterConfig

mapSizeMetric *prom.GaugeVec
mapSizeMetric *prometheus.GaugeVec
}

// limitersMap is auxiliary type for storing the map of strings to limiters with additional info for cleanup
Expand All @@ -92,7 +92,7 @@ type limitersMap struct {

limiterCfg *limiterConfig

mapSizeMetric *prom.GaugeVec
mapSizeMetric *prometheus.GaugeVec
}

func newLimitersMap(lmCfg limitersMapConfig, redisOpts *redis.Options) *limitersMap {
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/throttle/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/go-redis/redis"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/ozontech/file.d/cfg"
Expand Down Expand Up @@ -45,7 +45,7 @@ type Plugin struct {
limiterBuf []byte
rules []*rule

limitersMapSizeMetric *prom.GaugeVec
limitersMapSizeMetric *prometheus.GaugeVec
}

// ! config-params
Expand Down
6 changes: 3 additions & 3 deletions plugin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -56,8 +56,8 @@ type Plugin struct {

// plugin metrics

possibleOffsetCorruptionMetric *prom.CounterVec
alreadyWrittenEventsSkippedMetric *prom.CounterVec
possibleOffsetCorruptionMetric *prometheus.CounterVec
alreadyWrittenEventsSkippedMetric *prometheus.CounterVec
}

type persistenceMode int
Expand Down
7 changes: 4 additions & 3 deletions plugin/output/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/tls"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -55,8 +55,8 @@ type Plugin struct {

// plugin metrics

insertErrorsMetric *prom.CounterVec
queriesCountMetric *prom.CounterVec
insertErrorsMetric *prometheus.CounterVec
queriesCountMetric *prometheus.CounterVec
}

type Setting struct {
Expand Down Expand Up @@ -375,6 +375,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
})

p.batcher.Start(p.ctx)
Expand Down
1 change: 1 addition & 0 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MaintenanceInterval: time.Minute,
MetricCtl: params.MetricCtl,
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down
1 change: 1 addition & 0 deletions plugin/output/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
})

p.mu = &sync.RWMutex{}
Expand Down
Loading

0 comments on commit 29604e1

Please sign in to comment.