forked from mna/redisc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conn.go
326 lines (295 loc) · 8.52 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
package redisc
import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"github.com/garyburd/redigo/redis"
)
var _ redis.Conn = (*Conn)(nil)
// Conn is a redis cluster connection. When returned by Get
// or Dial, it is not yet bound to any node in the cluster.
// Only when a call to Do, Send, Receive or Bind is made is a connection
// to a specific node established:
//
// - if Do or Send is called first, the command's first parameter
// is assumed to be the key, and its slot is used to find the node
// - if Receive is called first, or if Do or Send is called first
// but with no parameter for the command (or no command), a
// random node is selected in the cluster
// - if Bind is called first, the node corresponding to the slot of
// the specified key(s) is selected
//
// Because Get and Dial return a redis.Conn interface,
// a type assertion can be used to call Bind or ReadOnly on this
// concrete Conn type:
//
// redisConn := cluster.Get()
// if conn, ok := redisConn.(*redisc.Conn); ok {
// if err := conn.Bind("my-key"); err != nil {
// // handle error
// }
// }
//
// Or call the package-level BindConn or ReadOnlyConn helper functions.
//
type Conn struct {
cluster *Cluster
forceDial bool // immutable
// redigo allows concurrent reader and writer (conn.Receive and
// conn.Send/conn.Flush), a mutex is needed to protect concurrent
// accesses.
mu sync.Mutex
readOnly bool
boundAddr string
err error
rc redis.Conn
}
// RedirError is a cluster redirection error. It indicates that
// the redis node returned either a MOVED or an ASK error, as
// specified by the Type field.
type RedirError struct {
// Type indicates if the redirection is a MOVED or an ASK.
Type string
// NewSlot is the slot number of the redirection.
NewSlot int
// Addr is the node address to redirect to.
Addr string
raw string
}
// Error returns the error message of a RedirError. This is the
// message as received from redis.
func (e *RedirError) Error() string {
return e.raw
}
func isRedisErr(err error, typ string) bool {
re, ok := err.(redis.Error)
if !ok {
return false
}
parts := strings.Fields(re.Error())
return len(parts) > 0 && parts[0] == typ
}
// IsTryAgain returns true if the error is a redis cluster
// error of type TRYAGAIN, meaning that the command is valid,
// but the cluster is in an unstable state and it can't complete
// the request at the moment.
func IsTryAgain(err error) bool {
return isRedisErr(err, "TRYAGAIN")
}
// IsCrossSlot returns true if the error is a redis cluster
// error of type CROSSSLOT, meaning that a command was sent
// with keys from different slots.
func IsCrossSlot(err error) bool {
return isRedisErr(err, "CROSSSLOT")
}
// ParseRedir parses err into a RedirError. If err is
// not a MOVED or ASK error or if it is nil, it returns nil.
func ParseRedir(err error) *RedirError {
re, ok := err.(redis.Error)
if !ok {
return nil
}
parts := strings.Fields(re.Error())
if len(parts) != 3 || (parts[0] != "MOVED" && parts[0] != "ASK") {
return nil
}
slot, err := strconv.Atoi(parts[1])
if err != nil {
return nil
}
return &RedirError{
Type: parts[0],
NewSlot: slot,
Addr: parts[2],
raw: re.Error(),
}
}
// binds the connection to a specific node, the one holding the slot
// or a random node if slot is -1, iff the connection is not broken
// and is not already bound. It returns the redis conn, true if it
// successfully bound to this slot, or any error.
func (c *Conn) bind(slot int) (rc redis.Conn, ok bool, err error) {
c.mu.Lock()
rc, err = c.rc, c.err
if err == nil {
if rc == nil {
conn, addr, err2 := c.cluster.getConn(slot, c.forceDial, c.readOnly)
if err2 != nil {
err = err2
} else {
c.rc, rc = conn, conn
c.boundAddr = addr
ok = true
}
}
}
c.mu.Unlock()
return rc, ok, err
}
func cmdSlot(cmd string, args []interface{}) int {
slot := -1
if len(args) > 0 {
key := fmt.Sprintf("%s", args[0])
slot = Slot(key)
}
return slot
}
// BindConn is a convenience function that checks if c implements
// a Bind method with the right signature such as the one for
// a *Conn, and calls that method. If c doesn't implement that
// method, it returns an error.
func BindConn(c redis.Conn, keys ...string) error {
if cc, ok := c.(interface {
Bind(...string) error
}); ok {
return cc.Bind(keys...)
}
return errors.New("redisc: no Bind method")
}
// Bind binds the connection to the cluster node corresponding to
// the slot of the provided keys. If the keys don't belong to the
// same slot, an error is returned and the connection is not bound.
// If the connection is already bound, an error is returned.
// If no key is provided, it binds to a random node.
func (c *Conn) Bind(keys ...string) error {
slot := -1
for _, k := range keys {
ks := Slot(k)
if slot != -1 && ks != slot {
return errors.New("redisc: keys do not belong to the same slot")
}
slot = ks
}
_, ok, err := c.bind(slot)
if err != nil {
return err
}
if !ok {
// was already bound
return errors.New("redisc: connection already bound to a node")
}
return nil
}
// ReadOnlyConn is a convenience function that checks if c implements
// a ReadOnly method with the right signature such as the one for
// a *Conn, and calls that method. If c doesn't implement that
// method, it returns an error.
func ReadOnlyConn(c redis.Conn) error {
if cc, ok := c.(interface {
ReadOnly() error
}); ok {
return cc.ReadOnly()
}
return errors.New("redisc: no ReadOnly method")
}
// ReadOnly marks the connection as read-only, meaning that when it is
// bound to a cluster node, it will attempt to connect to a replica instead
// of the master and will automatically emit a READONLY command so that
// the replica agrees to serve read commands. Be aware that reading
// from a replica may return stale data. Sending write commands on a
// read-only connection will fail with a MOVED error.
// See http://redis.io/commands/readonly for more details.
//
// If the connection is already bound to a node, either via a call to
// Do, Send, Receive or Bind, ReadOnly returns an error.
func (c *Conn) ReadOnly() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.err != nil {
return c.err
}
if c.rc != nil {
// was already bound
return errors.New("redisc: connection already bound to a node")
}
c.readOnly = true
return nil
}
// Do sends a command to the server and returns the received reply.
// If the connection is not yet bound to a cluster node, it will be
// after this call, based on the rules documented in the Conn type.
func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
rc, _, err := c.bind(cmdSlot(cmd, args))
if err != nil {
return nil, err
}
v, err := rc.Do(cmd, args...)
// handle redirections, if any
if re := ParseRedir(err); re != nil {
if re.Type == "MOVED" {
c.cluster.needsRefresh(re)
}
}
return v, err
}
// Send writes the command to the client's output buffer. If the
// connection is not yet bound to a cluster node, it will be after
// this call, based on the rules documented in the Conn type.
func (c *Conn) Send(cmd string, args ...interface{}) error {
rc, _, err := c.bind(cmdSlot(cmd, args))
if err != nil {
return err
}
return rc.Send(cmd, args...)
}
// Receive receives a single reply from the server. If the connection
// is not yet bound to a cluster node, it will be after this call,
// based on the rules documented in the Conn type.
func (c *Conn) Receive() (interface{}, error) {
rc, _, err := c.bind(-1)
if err != nil {
return nil, err
}
v, err := rc.Receive()
// handle redirections, if any
if re := ParseRedir(err); re != nil {
if re.Type == "MOVED" {
c.cluster.needsRefresh(re)
}
}
return v, err
}
// Flush flushes the output buffer to the server.
func (c *Conn) Flush() error {
c.mu.Lock()
err := c.err
if err == nil && c.rc != nil {
err = c.rc.Flush()
}
c.mu.Unlock()
return err
}
// Err returns a non-nil value if the connection is broken. Applications
// should close broken connections.
func (c *Conn) Err() error {
c.mu.Lock()
err := c.err
if err == nil && c.rc != nil {
err = c.rc.Err()
}
c.mu.Unlock()
return err
}
// Close closes the connection.
func (c *Conn) Close() error {
c.mu.Lock()
err := c.err
if err == nil {
c.err = errors.New("redisc: closed")
err = c.closeLocked()
}
c.mu.Unlock()
return err
}
func (c *Conn) closeLocked() (err error) {
if c.rc != nil {
// this may be a pooled connection, so make sure the readOnly flag is reset
if c.readOnly {
c.rc.Do("READWRITE")
}
err = c.rc.Close()
}
return err
}