-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathheap_test.go
112 lines (95 loc) · 2.85 KB
/
heap_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package zkafka
import (
"slices"
"testing"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/stretchr/testify/require"
)
func Test_offsetHeap_PushPopPeek_WhenInsertSmaller(t *testing.T) {
defer recoverThenFail(t)
heap := offsetHeap{}
input1 := kafka.TopicPartition{Partition: 1, Offset: 1}
heap.Push(input1)
got, err := heap.Peek()
require.NoError(t, err)
require.Equal(t, input1, got, "expected the minimum offset, which is the only offset")
input2 := kafka.TopicPartition{Partition: 1, Offset: 0}
// insert smaller item into heap
heap.Push(input2)
got, err = heap.Peek()
require.NoError(t, err)
require.Equal(t, input2, got)
got = heap.Pop()
require.Equal(t, input2, got)
got = heap.Pop()
require.Equal(t, input1, got)
}
func Test_offsetHeap_PushPopPeek_WhenInsertBigger(t *testing.T) {
defer recoverThenFail(t)
heap := offsetHeap{}
input1 := kafka.TopicPartition{Partition: 1, Offset: 1}
heap.Push(input1)
got, err := heap.Peek()
require.NoError(t, err)
require.Equal(t, input1, got, "expected the minimum offset, which is the only offset")
input2 := kafka.TopicPartition{Partition: 1, Offset: 100}
// insert smaller item into heap
heap.Push(input2)
got, err = heap.Peek()
require.NoError(t, err)
require.Equal(t, input1, got)
got = heap.Pop()
require.Equal(t, input1, got)
}
func Test_offsetHeap_PopWhenEmptyResultsInPanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
require.Fail(t, "expected panic on pop when empty")
}
}()
heap := offsetHeap{}
_ = heap.Pop()
}
func Test_offsetHeap_PeekWhenEmpty(t *testing.T) {
defer recoverThenFail(t)
heap := offsetHeap{}
_, err := heap.Peek()
require.Error(t, err, "expected error on peek when empty")
}
// Test_offsetHeap_SeekPop_DoesntImpactHeapOrdering
// given 100 items in the heap.
// when n are seek popped (taken out of the middle of the heap)
// then when we heap.Pop we still get the minimum offsets
func Test_offsetHeap_SeekPop_DoesntImpactHeapOrdering(t *testing.T) {
defer recoverThenFail(t)
heap := offsetHeap{}
var offsets []kafka.TopicPartition
// build up a heap of size N
count := 100
for i := 0; i < count; i++ {
offset := kafka.TopicPartition{Partition: 1, Offset: kafka.Offset(i)}
offsets = append(offsets, offset)
heap.Push(offset)
}
require.Len(t, heap.data, count)
// remove M items from heap
offsetsToRemove := []int{95, 34, 12, 2, 44, 45}
for _, index := range offsetsToRemove {
heap.SeekPop(offsets[index])
}
remainingCount := count - len(offsetsToRemove)
require.Len(t, heap.data, remainingCount)
// Loop through the N items that were in the heap
// skip the ones known to be seekPopped out
i := 0
for i < count {
if slices.Contains(offsetsToRemove, i) {
i++
continue
}
got := heap.Pop()
want := offsets[i]
require.Equal(t, want, got, "Expect pop to still pop minimums even after seek pops")
i++
}
}