From 28f727de1f85a047881653391ba901c667ee87bf Mon Sep 17 00:00:00 2001 From: Guido Zuidhof Date: Thu, 21 Nov 2024 11:33:20 +0100 Subject: [PATCH 1/2] Never flush without items --- batcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batcher.go b/batcher.go index 3f6adf8..cf050dd 100644 --- a/batcher.go +++ b/batcher.go @@ -84,7 +84,7 @@ func (b *Batcher[T]) start(ctx context.Context, // If the batcher is cancelled and the buffer is not empty, we want to flush the // remaining items with the maximum batch size, so we skip until we reach max size or the buffer is empty. - skipFlush := isCancelled && len(b.buffer) > 0 && !isMaxSize + skipFlush := (isCancelled && len(b.buffer) > 0 && !isMaxSize) || len(items) == 0 if !skipFlush { // We need to copy the slice to make sure that the slice that is passed is valid even if asynchronously From b9c6205461f483e87826772e89dd31758fd4a272 Mon Sep 17 00:00:00 2001 From: Guido Zuidhof Date: Thu, 21 Nov 2024 11:39:07 +0100 Subject: [PATCH 2/2] Add test to reproduce issue --- batcher_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/batcher_test.go b/batcher_test.go index 4fa5af5..bac4bcb 100644 --- a/batcher_test.go +++ b/batcher_test.go @@ -282,4 +282,17 @@ func TestBatcherCancellation(t *testing.T) { assert.ErrorIs(t, batcher.Push(0), ErrBatcherStopped) assert.Equal(t, 0, batcher.CurrentBufferSize()) }) + + t.Run("no empty flush after cancellation", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + + _, flusher := startWithMockFlusher(ctx, t, New[int]()) + + cancel() + + time.Sleep(time.Millisecond * 5) + + assert.Equal(t, 0, flusher.StartCount()) + }) }