From d920bc40b8d78fbda163d1a88c1a08e50a4dcba9 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Sun, 5 Jan 2025 14:54:10 -0800 Subject: [PATCH] [chore] Remove the need of calling startWorkerPool, simplify flushing Signed-off-by: Bogdan Drutu --- exporter/internal/queue/batcher.go | 73 ++++++++------------- exporter/internal/queue/default_batcher.go | 16 ++--- exporter/internal/queue/disabled_batcher.go | 11 +--- 3 files changed, 36 insertions(+), 64 deletions(-) diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index 2250b27d1b1..33b110f51bf 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -26,7 +26,6 @@ type Batcher interface { type BaseBatcher struct { batchCfg exporterbatcher.Config queue Queue[internal.Request] - maxWorkers int workerPool chan bool exportFunc func(ctx context.Context, req internal.Request) error stopWG sync.WaitGroup @@ -38,60 +37,46 @@ func NewBatcher(batchCfg exporterbatcher.Config, maxWorkers int, ) (Batcher, error) { if !batchCfg.Enabled { - return &DisabledBatcher{ - BaseBatcher{ - batchCfg: batchCfg, - queue: queue, - maxWorkers: maxWorkers, - exportFunc: exportFunc, - stopWG: sync.WaitGroup{}, - }, - }, nil + return &DisabledBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil } - - return &DefaultBatcher{ - BaseBatcher: BaseBatcher{ - batchCfg: batchCfg, - queue: queue, - maxWorkers: maxWorkers, - exportFunc: exportFunc, - stopWG: sync.WaitGroup{}, - }, - }, nil + return &DefaultBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil } -func (qb *BaseBatcher) startWorkerPool() { - if qb.maxWorkers == 0 { - return +func newBaseBatcher(batchCfg exporterbatcher.Config, + queue Queue[internal.Request], + exportFunc func(ctx context.Context, req internal.Request) error, + maxWorkers int, +) BaseBatcher { + var workerPool chan bool + if maxWorkers != 0 { + workerPool = make(chan bool, maxWorkers) + for i := 0; i < maxWorkers; i++ { + workerPool <- true + } } - qb.workerPool = make(chan bool, qb.maxWorkers) - for i := 0; i < qb.maxWorkers; i++ { - qb.workerPool <- true + return BaseBatcher{ + batchCfg: batchCfg, + queue: queue, + workerPool: workerPool, + exportFunc: exportFunc, + stopWG: sync.WaitGroup{}, } } -// flush exports the incoming batch synchronously. +// flush starts a goroutine that calls exportFunc. It blocks until a worker is available if necessary. func (qb *BaseBatcher) flush(batchToFlush batch) { - err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req) - for _, idx := range batchToFlush.idxList { - qb.queue.OnProcessingFinished(idx, err) - } -} - -// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available. -func (qb *BaseBatcher) flushAsync(batchToFlush batch) { qb.stopWG.Add(1) - if qb.maxWorkers == 0 { - go func() { - defer qb.stopWG.Done() - qb.flush(batchToFlush) - }() - return + if qb.workerPool != nil { + <-qb.workerPool } - <-qb.workerPool go func() { defer qb.stopWG.Done() - qb.flush(batchToFlush) - qb.workerPool <- true + err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req) + for _, idx := range batchToFlush.idxList { + qb.queue.OnProcessingFinished(idx, err) + } + if qb.workerPool != nil { + qb.workerPool <- true + } }() } diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index 7e4e9273ed9..0e060e1c94f 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -64,7 +64,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatch = nil qb.currentBatchMu.Unlock() for i := 0; i < len(reqList); i++ { - qb.flushAsync(batch{ + qb.flush(batch{ req: reqList[i], ctx: ctx, idxList: []uint64{idx}, @@ -108,8 +108,8 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatch = nil qb.currentBatchMu.Unlock() - // flushAsync() blocks until successfully started a goroutine for flushing. - qb.flushAsync(batchToFlush) + // flush() blocks until successfully started a goroutine for flushing. + qb.flush(batchToFlush) qb.resetTimer() } else { qb.currentBatchMu.Unlock() @@ -137,12 +137,6 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() { // Start starts the goroutine that reads from the queue and flushes asynchronously. func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error { - // maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics. - if qb.maxWorkers == -1 { - return nil - } - - qb.startWorkerPool() qb.shutdownCh = make(chan bool, 1) if qb.batchCfg.FlushTimeout == 0 { @@ -168,8 +162,8 @@ func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() { qb.currentBatch = nil qb.currentBatchMu.Unlock() - // flushAsync() blocks until successfully started a goroutine for flushing. - qb.flushAsync(batchToFlush) + // flush() blocks until successfully started a goroutine for flushing. + qb.flush(batchToFlush) qb.resetTimer() } diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go index 250b38e7640..ec154899c39 100644 --- a/exporter/internal/queue/disabled_batcher.go +++ b/exporter/internal/queue/disabled_batcher.go @@ -17,16 +17,9 @@ type DisabledBatcher struct { // Start starts the goroutine that reads from the queue and flushes asynchronously. func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { - // maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics. - if qb.maxWorkers == -1 { - return nil - } - - qb.startWorkerPool() - // This goroutine reads and then flushes. // 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped. - // 2. flushAsync() blocks until there are idle workers in the worker pool. + // 2. flush() blocks until there are idle workers in the worker pool. qb.stopWG.Add(1) go func() { defer qb.stopWG.Done() @@ -35,7 +28,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { if !ok { return } - qb.flushAsync(batch{ + qb.flush(batch{ req: req, ctx: context.Background(), idxList: []uint64{idx},