forked from golang/groupcache
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgrpc.go
190 lines (163 loc) · 4.22 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package groupcache
import (
"context"
"net"
"reflect"
"sort"
"sync"
"github.com/jmuk/groupcache/consistenthash"
"github.com/jmuk/groupcache/groupcachepb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const defaultReplicas = 50
type GRPCPool struct {
groupcachepb.UnimplementedGroupCacheServer
self string
opts grpcOpt
s *grpc.Server
mu sync.Mutex
peersList []string
peers *consistenthash.Map
conns map[string]*grpcGetter
}
type grpcOpt struct {
serverOpts []grpc.ServerOption
listener net.Listener
dialOpts []grpc.DialOption
replicas int
hashFn consistenthash.Hash
}
type GRPCPoolOption func(opt *grpcOpt)
func WithServerOptions(serverOpts ...grpc.ServerOption) GRPCPoolOption {
return func(opts *grpcOpt) {
opts.serverOpts = serverOpts
}
}
func WithListener(lis net.Listener) GRPCPoolOption {
return func(opts *grpcOpt) {
opts.listener = lis
}
}
func WithDialOptions(dialOpts ...grpc.DialOption) GRPCPoolOption {
return func(opts *grpcOpt) {
opts.dialOpts = dialOpts
}
}
func WithReplicas(replicas int) GRPCPoolOption {
return func(opts *grpcOpt) {
opts.replicas = replicas
}
}
func WithHash(hashFn consistenthash.Hash) GRPCPoolOption {
return func(opts *grpcOpt) {
opts.hashFn = hashFn
}
}
func NewGRPCPool(self string, opts ...GRPCPoolOption) (*GRPCPool, error) {
pool := &GRPCPool{
self: self,
opts: grpcOpt{
replicas: defaultReplicas,
},
conns: map[string]*grpcGetter{},
}
for _, opt := range opts {
opt(&pool.opts)
}
pool.peers = consistenthash.New(pool.opts.replicas, pool.opts.hashFn)
pool.s = grpc.NewServer(pool.opts.serverOpts...)
pool.s.RegisterService(&groupcachepb.GroupCache_ServiceDesc, pool)
lis := pool.opts.listener
if lis == nil {
// extract the port number.
_, port, err := net.SplitHostPort(self)
if err == nil {
lis, err = net.Listen("tcp", ":"+port)
if err != nil {
return nil, err
}
} else {
lis, err = net.Listen("tcp", self)
if err != nil {
return nil, err
}
}
}
go pool.s.Serve(lis)
RegisterPeerPicker(func() PeerPicker { return pool })
return pool, nil
}
func (p *GRPCPool) Shutdown() {
p.s.GracefulStop()
}
func (p *GRPCPool) newListener() (net.Listener, error) {
if p.opts.listener != nil {
return p.opts.listener, nil
}
_, port, err := net.SplitHostPort(p.self)
if err != nil {
return net.Listen("tcp", p.self)
}
return net.Listen("tcp", ":"+port)
}
// Set updates the pool's list of peers.
// Each peer value should be an endpoint of gRPC, e.g. "localhost:8080".
func (p *GRPCPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
sort.Strings(peers)
if reflect.DeepEqual(peers, p.peersList) {
return
}
p.peers = consistenthash.New(p.opts.replicas, p.opts.hashFn)
p.peers.Add(peers...)
p.peersList = peers
}
func (p *GRPCPool) PickPeer(key string) (ProtoGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.peers.IsEmpty() {
return nil, false
}
if peer := p.peers.Get(key); peer != p.self {
c, ok := p.conns[peer]
if !ok {
conn, err := grpc.Dial(peer, p.opts.dialOpts...)
if err != nil {
// maybe PickPeer should return an error as well.
// TODO: log the error at least.
return nil, false
}
c = &grpcGetter{groupcachepb.NewGroupCacheClient(conn)}
p.conns[peer] = c
}
return c, true
}
return nil, false
}
// Get implements the gRPC method of GroupCacheServer.
func (p *GRPCPool) Get(ctx context.Context, req *groupcachepb.GetRequest) (*groupcachepb.GetResponse, error) {
group := GetGroup(req.Group)
if group == nil {
return nil, status.Errorf(codes.NotFound, "group %s not found", req.Group)
}
group.Stats.ServerRequests.Add(1)
var value []byte
// Do not just call Get(), as it may forward the request to another server
// in case that the list of peers has been changed -- it is possible in k8s
// environment.
if err := group.get(ctx, req.Key, AllocatingByteSliceSink(&value), false /*usePeers*/); err != nil {
return nil, err
}
return &groupcachepb.GetResponse{
Value: value,
}, nil
}
type grpcGetter struct {
client groupcachepb.GroupCacheClient
}
func (g *grpcGetter) Get(ctx context.Context, in *groupcachepb.GetRequest) (*groupcachepb.GetResponse, error) {
return g.client.Get(ctx, in)
}