diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index 2250b27d1b1..d320d2371e0 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -24,8 +24,9 @@ type Batcher interface { } type BaseBatcher struct { - batchCfg exporterbatcher.Config - queue Queue[internal.Request] + batchCfg exporterbatcher.Config + queue Queue[internal.Request] + // TODO: Remove when the -1 hack for testing is removed. maxWorkers int workerPool chan bool exportFunc func(ctx context.Context, req internal.Request) error @@ -38,60 +39,47 @@ func NewBatcher(batchCfg exporterbatcher.Config, maxWorkers int, ) (Batcher, error) { if !batchCfg.Enabled { - return &DisabledBatcher{ - BaseBatcher{ - batchCfg: batchCfg, - queue: queue, - maxWorkers: maxWorkers, - exportFunc: exportFunc, - stopWG: sync.WaitGroup{}, - }, - }, nil + return &DisabledBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil } - - return &DefaultBatcher{ - BaseBatcher: BaseBatcher{ - batchCfg: batchCfg, - queue: queue, - maxWorkers: maxWorkers, - exportFunc: exportFunc, - stopWG: sync.WaitGroup{}, - }, - }, nil + return &DefaultBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil } -func (qb *BaseBatcher) startWorkerPool() { - if qb.maxWorkers == 0 { - return +func newBaseBatcher(batchCfg exporterbatcher.Config, + queue Queue[internal.Request], + exportFunc func(ctx context.Context, req internal.Request) error, + maxWorkers int, +) BaseBatcher { + var workerPool chan bool + if maxWorkers > 0 { + workerPool = make(chan bool, maxWorkers) + for i := 0; i < maxWorkers; i++ { + workerPool <- true + } } - qb.workerPool = make(chan bool, qb.maxWorkers) - for i := 0; i < qb.maxWorkers; i++ { - qb.workerPool <- true + return BaseBatcher{ + batchCfg: batchCfg, + queue: queue, + maxWorkers: maxWorkers, + workerPool: workerPool, + exportFunc: exportFunc, + stopWG: sync.WaitGroup{}, } } -// flush exports the incoming batch synchronously. +// flush starts a goroutine that calls exportFunc. It blocks until a worker is available if necessary. func (qb *BaseBatcher) flush(batchToFlush batch) { - err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req) - for _, idx := range batchToFlush.idxList { - qb.queue.OnProcessingFinished(idx, err) - } -} - -// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available. -func (qb *BaseBatcher) flushAsync(batchToFlush batch) { qb.stopWG.Add(1) - if qb.maxWorkers == 0 { - go func() { - defer qb.stopWG.Done() - qb.flush(batchToFlush) - }() - return + if qb.workerPool != nil { + <-qb.workerPool } - <-qb.workerPool go func() { defer qb.stopWG.Done() - qb.flush(batchToFlush) - qb.workerPool <- true + err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req) + for _, idx := range batchToFlush.idxList { + qb.queue.OnProcessingFinished(idx, err) + } + if qb.workerPool != nil { + qb.workerPool <- true + } }() } diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index 7e4e9273ed9..dced12e9e2b 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -64,7 +64,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatch = nil qb.currentBatchMu.Unlock() for i := 0; i < len(reqList); i++ { - qb.flushAsync(batch{ + qb.flush(batch{ req: reqList[i], ctx: ctx, idxList: []uint64{idx}, @@ -108,8 +108,8 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() { qb.currentBatch = nil qb.currentBatchMu.Unlock() - // flushAsync() blocks until successfully started a goroutine for flushing. - qb.flushAsync(batchToFlush) + // flush() blocks until successfully started a goroutine for flushing. + qb.flush(batchToFlush) qb.resetTimer() } else { qb.currentBatchMu.Unlock() @@ -142,7 +142,6 @@ func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error { return nil } - qb.startWorkerPool() qb.shutdownCh = make(chan bool, 1) if qb.batchCfg.FlushTimeout == 0 { @@ -168,8 +167,8 @@ func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() { qb.currentBatch = nil qb.currentBatchMu.Unlock() - // flushAsync() blocks until successfully started a goroutine for flushing. - qb.flushAsync(batchToFlush) + // flush() blocks until successfully started a goroutine for flushing. + qb.flush(batchToFlush) qb.resetTimer() } diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go index 250b38e7640..be89f58e011 100644 --- a/exporter/internal/queue/disabled_batcher.go +++ b/exporter/internal/queue/disabled_batcher.go @@ -22,11 +22,9 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { return nil } - qb.startWorkerPool() - // This goroutine reads and then flushes. // 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped. - // 2. flushAsync() blocks until there are idle workers in the worker pool. + // 2. flush() blocks until there are idle workers in the worker pool. qb.stopWG.Add(1) go func() { defer qb.stopWG.Done() @@ -35,7 +33,7 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { if !ok { return } - qb.flushAsync(batch{ + qb.flush(batch{ req: req, ctx: context.Background(), idxList: []uint64{idx},