-
Notifications
You must be signed in to change notification settings - Fork 1
/
watcher.go
106 lines (100 loc) · 2.57 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package discovery
import (
"context"
"encoding/json"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/resolver"
"time"
)
type Watcher struct {
cli *clientv3.Client
prefix string
notify chan ServiceInfo
addrId map[string]int64
}
func NewWatcher(etcd []string, service string) *Watcher {
cli, err := clientv3.New(clientv3.Config{
Endpoints: etcd,
DialTimeout: time.Second * 5,
})
if err != nil {
Log.Printf("watcher connect to etcd error:%v \n", err)
return nil
}
w := &Watcher{
cli: cli,
prefix: service,
notify: make(chan ServiceInfo, 5),
addrId: make(map[string]int64, 5),
}
go w.start()
return w
}
func (w *Watcher) start() {
addrDict := make(map[string]resolver.Address)
update := func() {
addrList := make([]resolver.Address, 0, len(addrDict))
for _, v := range addrDict {
addrList = append(addrList, v)
}
}
// 首次获取所有的服务
resp, err := w.cli.Get(context.Background(), w.prefix, clientv3.WithPrefix())
if err != nil {
panic(err)
}
for i, kv := range resp.Kvs {
var info ServiceInfo
err = json.Unmarshal(kv.Value, &info)
if err != nil {
panic(err)
}
// 通知外部
w.changeNotify(info)
addrDict[string(resp.Kvs[i].Key)] = resolver.Address{Addr: info.Addr, ServerName: info.Name}
}
update()
Log.Printf("watch prefix:%s\n", w.prefix)
rch := w.cli.Watch(context.Background(), w.prefix, clientv3.WithPrefix(), clientv3.WithPrevKV())
for n := range rch {
for _, ev := range n.Events {
key := string(ev.Kv.Key)
switch ev.Type {
case mvccpb.PUT:
var info ServiceInfo
if err := json.Unmarshal(ev.Kv.Value, &info); err != nil {
Log.Printf("ev.Kv.Value json.Unmarshal error:%v \n", err)
}
addrDict[key] = resolver.Address{Addr: info.Addr, ServerName: info.Name}
Log.Printf("[增加服务] key:%v server_name:%v addr:%v \n", key, info.Name, info.Addr)
// 通知外部
info.Delete = false
w.changeNotify(info)
case mvccpb.DELETE:
Log.Printf("[移除服务] key:%v server_name:%v addr:%v \n", key,
addrDict[key].ServerName, addrDict[key].Addr)
// 通知外部
w.changeNotify(ServiceInfo{
ID: w.addrId[addrDict[key].Addr],
Addr: addrDict[key].Addr,
Delete: true,
})
delete(addrDict, string(ev.PrevKv.Key))
}
}
update()
}
}
func (w *Watcher) changeNotify(srv ServiceInfo) {
Log.Printf("srv change:%v", srv)
w.addrId[srv.Addr] = srv.ID
select {
case w.notify <- srv:
default:
Log.Printf("srv change no listen:%v", srv)
}
}
func (w *Watcher) Watch() chan ServiceInfo {
return w.notify
}