Skip to content

Commit

Permalink
Merge branch 'master' into 410-metricholder
Browse files Browse the repository at this point in the history
# Conflicts:
#	pipeline/batch.go
#	pipeline/pipeline.go
  • Loading branch information
Yaroslav Kirillov committed Jul 7, 2023
2 parents fba676d + 8e78e63 commit 7dde5dd
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 8 deletions.
54 changes: 46 additions & 8 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import (
"go.uber.org/atomic"
)

type BatchStatus byte

const (
BatchStatusNotReady BatchStatus = iota
BatchStatusMaxSizeExceeded
BatchStatusTimeoutExceeded
)

type Batch struct {
Events []*Event
// eventsSize contains total size of the Events in bytes
Expand All @@ -24,6 +32,7 @@ type Batch struct {
maxSizeCount int
// maxSizeBytes max size of events per batch in bytes
maxSizeBytes int
status BatchStatus
}

func newBatch(maxSizeCount int, maxSizeBytes int, timeout time.Duration) *Batch {
Expand All @@ -48,6 +57,7 @@ func newBatch(maxSizeCount int, maxSizeBytes int, timeout time.Duration) *Batch
func (b *Batch) reset() {
b.Events = b.Events[:0]
b.eventsSize = 0
b.status = BatchStatusNotReady
b.startTime = time.Now()
}

Expand All @@ -56,11 +66,17 @@ func (b *Batch) append(e *Event) {
b.eventsSize += e.Size
}

func (b *Batch) isReady() bool {
func (b *Batch) updateStatus() BatchStatus {
l := len(b.Events)
isFull := (b.maxSizeCount != 0 && l == b.maxSizeCount) || (b.maxSizeBytes != 0 && b.maxSizeBytes <= b.eventsSize)
isTimeout := l > 0 && time.Since(b.startTime) > b.timeout
return isFull || isTimeout
switch {
case (b.maxSizeCount != 0 && l == b.maxSizeCount) || (b.maxSizeBytes != 0 && b.maxSizeBytes <= b.eventsSize):
b.status = BatchStatusMaxSizeExceeded
case l > 0 && time.Since(b.startTime) > b.timeout:
b.status = BatchStatusTimeoutExceeded
default:
b.status = BatchStatusNotReady
}
return b.status
}

type Batcher struct {
Expand All @@ -80,8 +96,11 @@ type Batcher struct {
outSeq int64
commitSeq int64

batchOutFnSeconds prometheus.Histogram
commitWaitingSeconds prometheus.Histogram
batchOutFnSeconds prometheus.Observer
commitWaitingSeconds prometheus.Observer
workersInProgress prometheus.Gauge
batchesDoneByMaxSize prometheus.Counter
batchesDoneByTimeout prometheus.Counter
}

type (
Expand Down Expand Up @@ -116,6 +135,13 @@ func (b *Batcher) Start(_ context.Context) {
RegisterHistogram("batcher_out_fn_seconds", "", metric.SecondsBucketsLong)
b.commitWaitingSeconds = b.opts.MetricCtl.
RegisterHistogram("batcher_commit_waiting_seconds", "", metric.SecondsBucketsDetailed)
b.workersInProgress = b.opts.MetricCtl.
RegisterGauge("batcher_workers_in_progress", "")

jobsDone := b.opts.MetricCtl.
RegisterCounterVec("batcher_jobs_done_total", "", "status")
b.batchesDoneByMaxSize = jobsDone.WithLabelValues("max_size_exceeded")
b.batchesDoneByTimeout = jobsDone.WithLabelValues("timeout_exceeded")

b.freeBatches = make(chan *Batch, b.opts.Workers)
b.fullBatches = make(chan *Batch, b.opts.Workers)
Expand All @@ -136,6 +162,8 @@ func (b *Batcher) work() {
events := make([]*Event, 0)
data := WorkerData(nil)
for batch := range b.fullBatches {
b.workersInProgress.Inc()

now := time.Now()
b.opts.OutFn(&data, batch)
b.batchOutFnSeconds.Observe(time.Since(now).Seconds())
Expand All @@ -147,6 +175,16 @@ func (b *Batcher) work() {
t = time.Now()
b.opts.MaintenanceFn(&data)
}

b.workersInProgress.Dec()
switch batch.status {
case BatchStatusMaxSizeExceeded:
b.batchesDoneByMaxSize.Inc()
case BatchStatusTimeoutExceeded:
b.batchesDoneByTimeout.Inc()
default:
logger.Panic("unreachable")
}
}
}

Expand Down Expand Up @@ -201,9 +239,9 @@ func (b *Batcher) Add(event *Event) {
b.trySendBatchAndUnlock(batch)
}

// trySendBatch mu should be locked and it'll be unlocked after execution of this function
// trySendBatch mu should be locked, and it'll be unlocked after execution of this function
func (b *Batcher) trySendBatchAndUnlock(batch *Batch) {
if !batch.isReady() {
if batch.updateStatus() == BatchStatusNotReady {
b.mu.Unlock()
return
}
Expand Down
5 changes: 5 additions & 0 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type Pipeline struct {
readOpsEventsSizeMetric prometheus.Counter
wrongEventCRIFormatMetric prometheus.Counter
maxEventSizeExceededMetric prometheus.Counter
eventPoolLatency prometheus.Observer
}

type Settings struct {
Expand Down Expand Up @@ -218,6 +219,8 @@ func (p *Pipeline) registerMetrics() {
p.readOpsEventsSizeMetric = m.RegisterCounter("read_ops_count", "Read OPS count")
p.wrongEventCRIFormatMetric = m.RegisterCounter("wrong_event_cri_format", "Wrong event CRI format counter")
p.maxEventSizeExceededMetric = m.RegisterCounter("max_event_size_exceeded", "Max event size exceeded counter")
p.eventPoolLatency = m.RegisterHistogram("event_pool_latency_seconds",
"How long we are wait an event from the pool", metric.SecondsBucketsDetailed)
}

func (p *Pipeline) setDefaultMetrics() {
Expand Down Expand Up @@ -394,7 +397,9 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
p.inputEvents.Inc()
p.inputSize.Add(int64(length))

now := time.Now()
event := p.eventPool.get()
p.eventPoolLatency.Observe(time.Since(now).Seconds())

switch dec {
case decoder.JSON:
Expand Down

0 comments on commit 7dde5dd

Please sign in to comment.