-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.go
253 lines (214 loc) · 6.37 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package memberlist
import (
"Compress/lzw"
"bytes"
"encoding/binary"
"fmt"
"io"
"math"
"math/rand"
"time"
"github.com/hashicorp/go-msgpack/codec"
)
const PushPullScaleThreshold = 32
const (
// Constant litWidth 2-8
lzwLitWidth = 8
)
// Decode 解码
func Decode(buf []byte, out interface{}) error {
r := bytes.NewReader(buf)
hd := codec.MsgpackHandle{}
dec := codec.NewDecoder(r, &hd)
return dec.Decode(out)
}
// Encode 编码
func Encode(msgType MessageType, in interface{}) (*bytes.Buffer, error) {
buf := bytes.NewBuffer(nil)
buf.WriteByte(uint8(msgType))
hd := codec.MsgpackHandle{}
enc := codec.NewEncoder(buf, &hd)
err := enc.Encode(in)
return buf, err
}
// RandomOffset 反回0~n之间的随机值
func RandomOffset(n int) int {
if n == 0 {
return 0
}
return int(rand.Uint32() % uint32(n))
}
// SuspicionTimeout computes the timeout that should be used when
// a node is Suspected
func SuspicionTimeout(SuspicionMult, n int, interval time.Duration) time.Duration {
nodeScale := math.Max(1.0, math.Log10(math.Max(1.0, float64(n))))
// multiply by 1000 to keep some precision because time.Duration is an int64 type
timeout := time.Duration(SuspicionMult) * time.Duration(nodeScale*1000) * interval / 1000
return timeout
}
// ShuffleNodes randomly shuffles the input Nodes using the Fisher-Yates shuffle
func ShuffleNodes(nodes []*NodeState) {
n := len(nodes)
rand.Shuffle(n, func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
}
// PushPullScale push\pull 间隔,需要随着集群规模变化。避免集群增大,网络阻塞
func PushPullScale(interval time.Duration, n int) time.Duration {
// 节点数小于32个,时间不变
if n <= PushPullScaleThreshold {
return interval
}
multiplier := math.Ceil(math.Log2(float64(n))-math.Log2(PushPullScaleThreshold)) + 1.0
return time.Duration(multiplier) * interval
}
// MoveDeadNodes 移除Dead\left节点 超过一个gossipToTheDeadTime间隔的;并返回当前依然存活的节点个数
func MoveDeadNodes(nodes []*NodeState, gossipToTheDeadTime time.Duration) int {
numDead := 0
n := len(nodes)
// 【a,b,c,d,e,f,g,h,j,k,l】
for i := 0; i < n-numDead; i++ {
if !nodes[i].DeadOrLeft() {
continue
}
// 判断节点的Dead超时有没有到
if time.Since(nodes[i].StateChange) <= gossipToTheDeadTime {
continue
}
// 将节点移至最后
nodes[i], nodes[n-numDead-1] = nodes[n-numDead-1], nodes[i] // 存活节点、当前节点
numDead++
i--
}
return n - numDead
}
// KRandomNodes
// 是用来选择多达k个随机节点的,排除任何排除函数返回true的节点。有可能返回少于k个的节点。
func KRandomNodes(k int, nodes []*NodeState, exclude func(*NodeState) bool) []Node {
//exclude StateAlive, StateSuspect 以及StateDead没有超过时限的机器
n := len(nodes)
kNodes := make([]Node, 0, k)
OUTER:
// 探测多达3*n次,对于大的n来说,这是没有必要的,因为k<<n,但对于小的n,我们希望搜索是 穷尽
for i := 0; i < 3*n && len(kNodes) < k; i++ {
idx := RandomOffset(n)
state := nodes[idx]
if exclude != nil && exclude(state) {
continue OUTER
}
// 检查是否已经有了该节点
for j := 0; j < len(kNodes); j++ {
if state.Node.Name == kNodes[j].Name {
continue OUTER
}
}
kNodes = append(kNodes, state.Node)
}
return kNodes
}
// MakeCompoundMessages takes a list of messages and packs
// them into one or multiple messages based on the limitations
// of compound messages (255 messages each).
func MakeCompoundMessages(msgs [][]byte) []*bytes.Buffer {
const maxMsgs = 255
bufs := make([]*bytes.Buffer, 0, (len(msgs)+(maxMsgs-1))/maxMsgs)
for ; len(msgs) > maxMsgs; msgs = msgs[maxMsgs:] {
bufs = append(bufs, MakeCompoundMessage(msgs[:maxMsgs]))
}
if len(msgs) > 0 {
bufs = append(bufs, MakeCompoundMessage(msgs))
}
return bufs
}
// MakeCompoundMessage 将多个消息组合成复合消息
func MakeCompoundMessage(msgs [][]byte) *bytes.Buffer {
// CompoundMsg + len(msgs) uint8 + 每个消息的长度uint16 + 每个消息
buf := bytes.NewBuffer(nil)
buf.WriteByte(uint8(CompoundMsg))
buf.WriteByte(uint8(len(msgs)))
for _, m := range msgs {
binary.Write(buf, binary.BigEndian, uint16(len(m)))
}
for _, m := range msgs {
buf.Write(m)
}
return buf
}
// DecodeCompoundMessage 切割复合消息, 【len,xxxxxxxxxx】
func DecodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
// trunc 有几部分没有数据
// CompoundMsg +
// len(msgs) uint8 + 每个消息的长度uint16 + 每个消息
if len(buf) < 1 {
err = fmt.Errorf("复合消息长度未知")
return
}
numParts := int(buf[0]) // 几个消息
buf = buf[1:]
// 检查是否有足够的数据,判断 "每个消息的长度uint16 " 这一部分数据全不全
if len(buf) < numParts*2 {
err = fmt.Errorf("截断的长片")
return
}
// 解码
lengths := make([]uint16, numParts) // 每部分的长度
for i := 0; i < numParts; i++ {
lengths[i] = binary.BigEndian.Uint16(buf[i*2 : i*2+2])
}
buf = buf[numParts*2:] // 剩余的消息体
// 切割消息
for idx, msgLen := range lengths {
if len(buf) < int(msgLen) {
trunc = numParts - idx
return
}
slice := buf[:msgLen]
buf = buf[msgLen:]
parts = append(parts, slice)
}
return
}
// CompressPayload 压缩
func CompressPayload(inp []byte) (*bytes.Buffer, error) {
var buf bytes.Buffer
Compressor := lzw.NewWriter(&buf, lzw.LSB, lzwLitWidth)
_, err := Compressor.Write(inp)
if err != nil {
return nil, err
}
if err := Compressor.Close(); err != nil {
return nil, err
}
c := Compress{
Algo: lzwAlgo,
Buf: buf.Bytes(),
}
return Encode(CompressMsg, &c)
}
// DeCompressPayload 解压缩
func DeCompressPayload(msg []byte) ([]byte, error) {
var c Compress
if err := Decode(msg, &c); err != nil {
return nil, err
}
return DeCompressBuffer(&c)
}
// DeCompressBuffer is used to deCompress the buffer of
// a single Compress message, handling multiple algorithms
func DeCompressBuffer(c *Compress) ([]byte, error) {
// Verify the algorithm
if c.Algo != lzwAlgo {
return nil, fmt.Errorf("Cannot deCompress unknown algorithm %d", c.Algo)
}
// Create a unCompressor
uncomp := lzw.NewReader(bytes.NewReader(c.Buf), lzw.LSB, lzwLitWidth)
defer uncomp.Close()
// Read all the data
var b bytes.Buffer
_, err := io.Copy(&b, uncomp)
if err != nil {
return nil, err
}
// Return the unCompressed bytes
return b.Bytes(), nil
}