-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathsentinel.go
383 lines (348 loc) · 10.3 KB
/
sentinel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
package sentinel
import (
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/gomodule/redigo/redis"
)
// Sentinel provides a way to add high availability (HA) to Redis Pool using
// preconfigured addresses of Sentinel servers and name of master which Sentinels
// monitor. It works with Redis >= 2.8.12 (mostly because of ROLE command that
// was introduced in that version, it's possible though to support old versions
// using INFO command).
//
// Example of the simplest usage to contact master "mymaster":
//
// func newSentinelPool() *redis.Pool {
// sntnl := &sentinel.Sentinel{
// Addrs: []string{":26379", ":26380", ":26381"},
// MasterName: "mymaster",
// Dial: func(addr string) (redis.Conn, error) {
// timeout := 500 * time.Millisecond
// c, err := redis.DialTimeout("tcp", addr, timeout, timeout, timeout)
// if err != nil {
// return nil, err
// }
// return c, nil
// },
// }
// return &redis.Pool{
// MaxIdle: 3,
// MaxActive: 64,
// Wait: true,
// IdleTimeout: 240 * time.Second,
// Dial: func() (redis.Conn, error) {
// masterAddr, err := sntnl.MasterAddr()
// if err != nil {
// return nil, err
// }
// c, err := redis.Dial("tcp", masterAddr)
// if err != nil {
// return nil, err
// }
// if _, err = c.Do("AUTH", "your-Password"); err != nil {
// c.Close()
// return nil, err
// }
// isMaster, err := sentinel.TestRole(c, "master")
// if err != nil {
// c.Close()
// return nil, err
// }
// if !isMaster {
// c.Close()
// return nil, fmt.Errorf("%s is not redis master", masterAddr)
// }
// return c, nil
// },
// }
// }
type Sentinel struct {
// Addrs is a slice with known Sentinel addresses.
Addrs []string
// MasterName is a name of Redis master Sentinel servers monitor.
MasterName string
// Dial is a user supplied function to connect to Sentinel on given address. This
// address will be chosen from Addrs slice.
// Note that as per the redis-sentinel client guidelines, a timeout is mandatory
// while connecting to Sentinels, and should not be set to 0.
Dial func(addr string) (redis.Conn, error)
// Pool is a user supplied function returning custom connection pool to Sentinel.
// This can be useful to tune options if you are not satisfied with what default
// Sentinel pool offers. See defaultPool() method for default pool implementation.
// In most cases you only need to provide Dial function and let this be nil.
Pool func(addr string) *redis.Pool
mu sync.RWMutex
pools map[string]*redis.Pool
addr string
}
// NoSentinelsAvailable is returned when all sentinels in the list are exhausted
// (or none configured), and contains the last error returned by Dial (which
// may be nil)
type NoSentinelsAvailable struct {
lastError error
}
func (ns NoSentinelsAvailable) Error() string {
if ns.lastError != nil {
return fmt.Sprintf("redigo: no sentinels available; last error: %s", ns.lastError.Error())
}
return fmt.Sprintf("redigo: no sentinels available")
}
// putToTop puts Sentinel address to the top of address list - this means
// that all next requests will use Sentinel on this address first.
//
// From Sentinel guidelines:
//
// The first Sentinel replying to the client request should be put at the
// start of the list, so that at the next reconnection, we'll try first
// the Sentinel that was reachable in the previous connection attempt,
// minimizing latency.
func (s *Sentinel) putToTop(addr string) {
s.mu.Lock()
defer s.mu.Unlock()
for i, a := range s.Addrs {
if a == addr {
s.Addrs[0], s.Addrs[i] = s.Addrs[i], s.Addrs[0]
break
}
}
}
// putToBottom puts Sentinel address to the bottom of address list.
// We call this method internally when see that some Sentinel failed to answer
// on application request so next time we start with another one.
func (s *Sentinel) putToBottom(addr string) {
s.mu.Lock()
defer s.mu.Unlock()
for i, a := range s.Addrs {
if a == addr {
copy(s.Addrs[i:], s.Addrs[i+1:])
s.Addrs[len(s.Addrs)-1] = a
break
}
}
}
// defaultPool returns a connection pool to one Sentinel. This allows
// us to call concurrent requests to Sentinel using connection Do method.
func (s *Sentinel) defaultPool(addr string) *redis.Pool {
return &redis.Pool{
MaxIdle: 3,
MaxActive: 10,
Wait: true,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
return s.Dial(addr)
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func (s *Sentinel) poolForAddr(addr string) *redis.Pool {
s.mu.Lock()
defer s.mu.Unlock()
if pool, ok := s.pools[addr]; ok {
return pool
}
s.mu.Unlock()
newPool := s.newPool(addr)
s.mu.Lock()
if pool, ok := s.pools[addr]; ok {
return pool
}
if s.pools == nil {
s.pools = make(map[string]*redis.Pool)
}
s.pools[addr] = newPool
return newPool
}
func (s *Sentinel) newPool(addr string) *redis.Pool {
if s.Pool != nil {
return s.Pool(addr)
}
return s.defaultPool(addr)
}
// close connection pool to Sentinel.
// Lock must be hold by caller.
func (s *Sentinel) close() {
for _, pool := range s.pools {
pool.Close()
}
s.pools = nil
}
func (s *Sentinel) doUntilSuccess(f func(redis.Conn) (interface{}, error)) (interface{}, error) {
s.mu.RLock()
addrs := s.Addrs
s.mu.RUnlock()
var lastErr error
for _, addr := range addrs {
conn := s.poolForAddr(addr).Get()
reply, err := f(conn)
conn.Close()
if err != nil {
lastErr = err
s.putToBottom(addr)
continue
}
s.putToTop(addr)
return reply, nil
}
return nil, NoSentinelsAvailable{lastError: lastErr}
}
// MasterAddr returns an address of current Redis master instance.
func (s *Sentinel) MasterAddr() (string, error) {
return redis.String(s.doUntilSuccess(func(c redis.Conn) (interface{}, error) {
return queryForMaster(c, s.MasterName)
}))
}
// SlaveAddrs returns a slice with known slave addresses of current master instance.
func (s *Sentinel) SlaveAddrs() ([]string, error) {
return redis.Strings(s.doUntilSuccess(func(c redis.Conn) (interface{}, error) {
return queryForSlaveAddrs(c, s.MasterName)
}))
}
// Slave represents a Redis slave instance which is known by Sentinel.
type Slave struct {
ip string
port string
flags string
}
// Addr returns an address of slave.
func (s *Slave) Addr() string {
return net.JoinHostPort(s.ip, s.port)
}
// Available returns if slave is in working state at moment based on information in slave flags.
func (s *Slave) Available() bool {
return !strings.Contains(s.flags, "disconnected") && !strings.Contains(s.flags, "s_down")
}
// Slaves returns a slice with known slaves of master instance.
func (s *Sentinel) Slaves() ([]*Slave, error) {
res, err := s.doUntilSuccess(func(c redis.Conn) (interface{}, error) {
return queryForSlaves(c, s.MasterName)
})
if err != nil {
return nil, err
}
return res.([]*Slave), nil
}
// SentinelAddrs returns a slice of known Sentinel addresses Sentinel server aware of.
func (s *Sentinel) SentinelAddrs() ([]string, error) {
return redis.Strings(s.doUntilSuccess(func(c redis.Conn) (interface{}, error) {
return queryForSentinels(c, s.MasterName)
}))
}
// Discover allows to update list of known Sentinel addresses. From docs:
//
// A client may update its internal list of Sentinel nodes following this procedure:
// 1) Obtain a list of other Sentinels for this master using the command SENTINEL sentinels <master-name>.
// 2) Add every ip:port pair not already existing in our list at the end of the list.
func (s *Sentinel) Discover() error {
addrs, err := s.SentinelAddrs()
if err != nil {
return err
}
s.mu.Lock()
for _, addr := range addrs {
if !stringInSlice(addr, s.Addrs) {
s.Addrs = append(s.Addrs, addr)
}
}
s.mu.Unlock()
return nil
}
// Close closes current connection to Sentinel.
func (s *Sentinel) Close() error {
s.mu.Lock()
s.close()
s.mu.Unlock()
return nil
}
// TestRole wraps GetRole in a test to verify if the role matches an expected
// role string. If there was any error in querying the supplied connection,
// the function returns false. Works with Redis >= 2.8.12.
// It's not goroutine safe, but if you call this method on pooled connections
// then you are OK.
func TestRole(c redis.Conn, expectedRole string) (bool, error) {
role, err := getRole(c)
if err != nil {
return false, err
}
return role == expectedRole, err
}
// getRole is a convenience function supplied to query an instance (master or
// slave) for its role. It attempts to use the ROLE command introduced in
// redis 2.8.12.
func getRole(c redis.Conn) (string, error) {
res, err := c.Do("ROLE")
if err != nil {
return "", err
}
rres, ok := res.([]interface{})
if ok {
return redis.String(rres[0], nil)
}
return "", errors.New("redigo: can not transform ROLE reply to string")
}
func queryForMaster(conn redis.Conn, masterName string) (string, error) {
res, err := redis.Strings(conn.Do("SENTINEL", "get-master-addr-by-name", masterName))
if err != nil {
return "", err
}
if len(res) < 2 {
return "", errors.New("redigo: malformed get-master-addr-by-name reply")
}
masterAddr := net.JoinHostPort(res[0], res[1])
return masterAddr, nil
}
func queryForSlaveAddrs(conn redis.Conn, masterName string) ([]string, error) {
slaves, err := queryForSlaves(conn, masterName)
if err != nil {
return nil, err
}
slaveAddrs := make([]string, len(slaves))
for i, slave := range slaves {
slaveAddrs[i] = slave.Addr()
}
return slaveAddrs, nil
}
func queryForSlaves(conn redis.Conn, masterName string) ([]*Slave, error) {
res, err := redis.Values(conn.Do("SENTINEL", "slaves", masterName))
if err != nil {
return nil, err
}
slaves := make([]*Slave, len(res))
for i, a := range res {
sm, err := redis.StringMap(a, err)
if err != nil {
return nil, err
}
slaves[i] = &Slave{ip: sm["ip"], port: sm["port"], flags: sm["flags"]}
}
return slaves, nil
}
func queryForSentinels(conn redis.Conn, masterName string) ([]string, error) {
res, err := redis.Values(conn.Do("SENTINEL", "sentinels", masterName))
if err != nil {
return nil, err
}
sentinels := make([]string, len(res))
for i, a := range res {
sm, err := redis.StringMap(a, err)
if err != nil {
return nil, err
}
sentinels[i] = fmt.Sprintf("%s:%s", sm["ip"], sm["port"])
}
return sentinels, nil
}
func stringInSlice(str string, slice []string) bool {
for _, s := range slice {
if s == str {
return true
}
}
return false
}