-
Notifications
You must be signed in to change notification settings - Fork 0
/
bundle.go
104 lines (91 loc) · 1.94 KB
/
bundle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package bundle
import (
"sync"
"time"
)
const (
defaultSize = 20
defaultTimeout = 10 * time.Second
defaultPCap = 1000
)
func New[T any](handler func([]T), opt ...Apply[T]) *Bundle[T] {
b := &Bundle[T]{
size: defaultSize,
timeout: defaultTimeout,
close: make(chan struct{}),
handler: handler,
}
for _, apply := range opt {
apply(b)
}
b.timer = time.NewTimer(b.timeout)
if b.payloads == nil {
b.payloads = make(chan T, defaultPCap)
}
b.do = make(chan struct{}, cap(b.payloads)/b.size)
return b
}
type Bundle[T any] struct {
count int // 计数是否达到阈值
size int // 打包阈值
payloads chan T // 载荷
timeout time.Duration // 超时时间
timer *time.Timer // 计时器
close chan struct{} // 关闭信号
do chan struct{} // 打包信号
lock sync.Mutex // 计数锁
handler func([]T) // 打包后处理回调
}
// Add 添加一个载荷
func (b *Bundle[T]) Add(payload T) {
b.payloads <- payload
b.lock.Lock()
defer b.lock.Unlock()
b.count = (b.count + 1) % b.size
if b.count+1 == b.size {
b.do <- struct{}{}
}
}
// Start 启动服务
func (b *Bundle[T]) Start() {
go b.working()
}
func (b *Bundle[T]) working() {
for {
select {
case <-b.do: // 收到打包信号
b.pack()
b.timer.Reset(b.timeout)
case <-b.timer.C: // 收到超时信号
b.pack()
b.timer.Reset(b.timeout)
case <-b.close: // 收到关闭信号
return
}
}
}
// 执行分组打包
func (b *Bundle[T]) pack() {
l := len(b.payloads)
if l > 0 {
var size = l
if size > b.size {
size = b.size
}
var ts = make([]T, size)
for i := 0; i < size; i++ {
ts[i] = <-b.payloads
l--
}
b.handler(ts)
}
}
func (b *Bundle[T]) Close() {
close(b.payloads) // 关闭payloads
b.timer.Stop() // 关闭计时器
close(b.close) // 销毁关闭信号chan
close(b.do) // 销毁执行任务信号chan
for len(b.payloads) > 0 {
b.pack()
}
}