diff --git a/batcher.go b/batcher.go index 3f6adf8..cf050dd 100644 --- a/batcher.go +++ b/batcher.go @@ -84,7 +84,7 @@ func (b *Batcher[T]) start(ctx context.Context, // If the batcher is cancelled and the buffer is not empty, we want to flush the // remaining items with the maximum batch size, so we skip until we reach max size or the buffer is empty. - skipFlush := isCancelled && len(b.buffer) > 0 && !isMaxSize + skipFlush := (isCancelled && len(b.buffer) > 0 && !isMaxSize) || len(items) == 0 if !skipFlush { // We need to copy the slice to make sure that the slice that is passed is valid even if asynchronously diff --git a/batcher_test.go b/batcher_test.go index 4fa5af5..bac4bcb 100644 --- a/batcher_test.go +++ b/batcher_test.go @@ -282,4 +282,17 @@ func TestBatcherCancellation(t *testing.T) { assert.ErrorIs(t, batcher.Push(0), ErrBatcherStopped) assert.Equal(t, 0, batcher.CurrentBufferSize()) }) + + t.Run("no empty flush after cancellation", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + + _, flusher := startWithMockFlusher(ctx, t, New[int]()) + + cancel() + + time.Sleep(time.Millisecond * 5) + + assert.Equal(t, 0, flusher.StartCount()) + }) }