-
Notifications
You must be signed in to change notification settings - Fork 0
/
shard_test.go
350 lines (290 loc) · 9.23 KB
/
shard_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
package shard
import (
"fmt"
"math"
"sort"
"testing"
"time"
"github.com/coreos/go-etcd/etcd"
"github.com/csigo/ephemeral"
"github.com/csigo/portforward"
"github.com/csigo/test"
"github.com/golang/groupcache/consistenthash"
"github.com/satori/go.uuid"
"github.com/spaolacci/murmur3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
const (
svr1 = "127.0.0.1:8080"
svr2 = "192.168.0.1:81"
svr3 = "10.0.0.1:4000"
testEmRoot = "/htc.com/csi/groupcache"
)
var (
etcdCli *etcd.Client
etcdForwdCli *etcd.Client
)
type ConnHashTestSuite struct {
suite.Suite
etcdOriPort int
etcdPort int
stopForward chan struct{}
forwarder func() (chan struct{}, error)
}
func portForwarder(from, to string) func() (chan struct{}, error) {
return func() (chan struct{}, error) {
return portforward.PortForward(
from, to,
)
}
}
func TestConnHashTestSuite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping zk shard test in short mode.")
}
//launch zk
sl := test.NewServiceLauncher()
etcdOriPort, stopEtcd, err := sl.Start(test.Etcd)
assert.NoError(t, err)
s := new(ConnHashTestSuite)
s.etcdOriPort = etcdOriPort
s.etcdPort = 3333 // used for port forward
s.forwarder = portForwarder(
fmt.Sprintf(":%d", s.etcdPort), fmt.Sprintf(":%d", s.etcdOriPort))
s.stopForward, err = s.forwarder()
assert.NoError(t, err, "no error")
// non-forward etcdCli
etcdCli = etcd.NewClient([]string{fmt.Sprintf("http://localhost:%d", etcdOriPort)})
// forwardable etcdCli
etcdForwdCli = etcd.NewClient([]string{fmt.Sprintf("http://localhost:%d", s.etcdPort)})
suite.Run(t, s)
// clean up the forwarding
s.stopForward <- struct{}{}
etcdCli.Close()
etcdForwdCli.Close()
assert.NoError(s.T(), stopEtcd())
}
func (c *ConnHashTestSuite) TestNewConnHashTimeout() {
// overwrite log.Exiter as do noting, otherwise,
// we will go to fatal since conn is closed and we attemp to create a znode on a closed conn
em, err := ephemeral.NewEtcdEphemeral(etcdCli)
assert.NoError(c.T(), err, "should get the connection")
conn, err := NewConsistentHashResServer(em, testEmRoot, svr3,
ConsistentHashMapReplicaNum, time.Nanosecond, dummy{})
fmt.Printf("result of NewConsistentHashResServer %v, %v\n", conn, err)
assert.Error(c.T(), err, "should hit timeout error")
assert.Equal(c.T(), err, ErrConnTimedOut)
assert.Nil(c.T(), conn, "should be nil")
em.Close()
fmt.Println("done")
}
func (c *ConnHashTestSuite) TestConsistentHashRes() {
//creates two servers 127.0.0.1:8080 and 192.168.0.1:81
//testing key uid[1-10]. Compare the result against
//direct hash calculation from consistenthash.Map
//first connect server 1 and server 2
//consistent server 1
t := c.T()
em1, err := ephemeral.NewEtcdEphemeral(etcdForwdCli)
if err != nil {
t.Fatalf("Connect to zk error for server1: %s", err)
}
conn1, err := NewConsistentHashResServer(em1, testEmRoot, svr1,
ConsistentHashMapReplicaNum, time.Second, dummy{})
if err != nil {
t.Fatalf("consistent server 1 %s create failed:%s", svr1, err)
}
assert.Equal(t, conn1.HostPort(), svr1)
// wait zk change to stablize
time.Sleep(1 * time.Second)
assert.True(t, conn1.IsMyKey("any keys"), "should always be true since only one server only")
//consistent server 2
em2, err := ephemeral.NewEtcdEphemeral(etcdForwdCli)
if err != nil {
t.Fatalf("Connect to zk error for server2: %s", err)
}
conn2, err := NewConsistentHashResServer(em2, testEmRoot, svr2,
ConsistentHashMapReplicaNum, time.Second, dummy{})
if err != nil {
t.Fatalf("consistent server 2 %s create failed:%s", svr2, err)
}
assert.Equal(t, conn2.HostPort(), svr2)
//client
emClient, err := ephemeral.NewEtcdEphemeral(etcdForwdCli)
assert.NoError(t, err)
client, err := NewConsistentHashResClient(emClient, testEmRoot,
ConsistentHashMapReplicaNum, time.Second, dummy{})
if err != nil {
t.Fatalf("consistent client create failed:%s", err)
}
assert.Equal(t, client.HostPort(), "")
//add server 1 and 2
cmap := consistenthash.New(ConsistentHashMapReplicaNum, murmur3.Sum32)
cmap.Add(svr1, svr2)
//verify hashes are the same across all instances
verifyAnswer(t, cmap, conn1, conn2, client)
//verify peers
verifyPeers(t, client.GetResources(), []string{svr1, svr2})
// verify shard assignment distribution
verifyShardDist(t, client, 2, 1000)
//add another server
em3, err := ephemeral.NewEtcdEphemeral(etcdForwdCli)
if err != nil {
t.Fatalf("Connect to zk error for server3: %s", err)
}
conn3, err := NewConsistentHashResServer(em3, testEmRoot, svr3,
ConsistentHashMapReplicaNum, time.Second, dummy{})
if err != nil {
t.Fatalf("consistent server 3 %s create failed:%s", svr3, err)
}
assert.Equal(t, conn3.HostPort(), svr3)
cmap.Add(svr3)
//verify hashes are the same across all instances
verifyAnswer(t, cmap, conn1, conn3, client, conn1)
//verify peers
verifyPeers(t, client.GetResources(), []string{svr1, svr2, svr3})
// verify shard assignment distribution
verifyShardDist(t, client, 3, 1000)
// when zk are unreachable for like 20 seconds
// all znodes are expired due to clent session is expired by zk
// when the zkconn is back again, we still can do sharding
c.stopForward <- struct{}{}
time.Sleep(10 * time.Second)
// make conn alive
c.stopForward, _ = c.forwarder()
time.Sleep(time.Second) // wait one second for ready
//verify peers
verifyPeers(t, client.GetResources(), []string{svr1, svr2, svr3})
// verify shard assignment distribution
verifyShardDist(t, client, 3, 1000)
conn1.Close()
conn2.Close()
conn3.Close()
client.Close()
emClient.Close()
em1.Close()
em2.Close()
em3.Close()
}
func verifyPeers(t *testing.T, recv []string, expect []string) {
if len(recv) != len(expect) {
t.Errorf("peers are different. Expecting %v, received %v", expect, recv)
}
sort.Strings(recv)
sort.Strings(expect)
for i := 0; i < len(recv); i++ {
if recv[i] != expect[i] {
t.Errorf("peers are different. Expecting %v, received %v", expect, recv)
break
}
}
}
func verifyAnswer(t *testing.T, cmap *consistenthash.Map, servers ...*ConsistentHashRes) {
//give zk sometime to stabalize
time.Sleep(time.Second)
for i := 0; i < 10; i++ {
key := fmt.Sprintf("uid%d", i)
answer := cmap.Get(key)
for _, s := range servers {
shash, _ := s.Get(key)
if answer != shash {
t.Errorf("for %s expected %s, %s responded %s",
key, answer, s.HostPort(), shash)
}
}
}
}
func (c *ConnHashTestSuite) TestShardDistribution() {
checkShardDispatch(c.T(), 5, 1000)
checkShardDispatch(c.T(), 10, 10000)
checkShardDispatch(c.T(), 80, 100000)
}
func checkShardDispatch(t *testing.T, nServer int, iteration int) {
replica := ConsistentHashMapReplicaNum
em, err := ephemeral.NewEtcdEphemeral(etcdForwdCli)
if err != nil {
t.Fatalf("Connect to zk error for server1: %s", err)
}
defer em.Close()
fmt.Println("start creating shard host")
for i := 0; i < nServer; i++ {
name := fmt.Sprintf("192.168.0.1:%d", i)
conn, err := NewConsistentHashResServer(em, testEmRoot, name,
replica, 5*time.Second, dummy{})
if err != nil {
t.Fatalf("consistent server %s create failed:%s", name, err)
}
fmt.Printf("%d\t", i)
defer conn.Close()
assert.Equal(t, conn.HostPort(), name)
}
fmt.Println("finish creating shard host")
// take a snap to get ready
time.Sleep(time.Second)
emClient, err := ephemeral.NewEtcdEphemeral(etcdForwdCli)
assert.NoError(t, err)
defer emClient.Close()
client, err := NewConsistentHashResClient(emClient, testEmRoot,
replica, 5*time.Second, dummy{})
if err != nil {
t.Fatalf("consistent client create failed:%s", err)
}
defer client.Close()
assert.Equal(t, client.HostPort(), "")
verifyShardDist(t, client, nServer, iteration)
}
func verifyShardDist(t *testing.T, client *ConsistentHashRes, nShard int, n int) {
sdMax := float64(6)
result := make(map[string]int)
fmt.Println("========================")
for i := 0; i < n; i++ {
id := murmur3.Sum64(uuid.NewV4().Bytes())
info, ok := client.Get(fmt.Sprintf("%d", id))
assert.True(t, ok, "should get shard info")
if _, b := result[info]; b == true {
result[info]++
} else {
result[info] = 1
}
}
avg := float64(100.0) / float64(nShard)
var sum float64
for key, val := range result {
fmt.Printf("%s, count = %d\n", key, val)
sum += math.Pow(((100.0 * float64(val) / float64(n)) - avg), 2)
}
sd := math.Sqrt(sum / float64(nShard))
fmt.Printf("average: %.3f%% standard deviation: %.3f%%\n", avg, sd)
if sd > sdMax {
assert.Fail(t, fmt.Sprintf("standard deviation is too high %v", sd))
}
}
func TestExtractIPort(t *testing.T) {
ipport := "192.180.1.1:1234"
v, err := ExtractIPPort(ipport + seperator + "xxxxx")
assert.NoError(t, err)
assert.Equal(t, ipport, v)
v, err = ExtractIPPort(ipport + seperator)
assert.NoError(t, err)
assert.Equal(t, ipport, v)
v, err = ExtractIPPort(ipport)
assert.NoError(t, err)
assert.Equal(t, ipport, v)
v, err = ExtractIPPort("xxxxxyyyyyy")
assert.NoError(t, err)
assert.Equal(t, "xxxxxyyyyyy", v)
v, err = ExtractIPPort("")
assert.NoError(t, err)
assert.Equal(t, "", v)
}
type dummy struct{}
func (d dummy) BumpAvg(key string, val float64) {}
func (d dummy) BumpSum(key string, val float64) {}
func (d dummy) BumpHistogram(key string, val float64) {}
func (d dummy) End() {}
func (d dummy) BumpTime(key string) interface {
End()
} {
return dummy{}
}