-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcmd_safe.go
56 lines (50 loc) · 1.34 KB
/
cmd_safe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package redisson
import (
"context"
"sync"
)
type SafeCmdable interface {
// SafeMGet
// Available since: 1.0.0
// Time complexity: O(N) where N is the number of keys to retrieve.
// ACL categories: @read @string @fast
// Like MGet, but safe in cluster mode.
// RESP2 / RESP3 Reply:
// - Array reply: a list of values at the specified keys.
SafeMGet(ctx context.Context, keys ...string) SliceCmd
}
func (c *client) SafeMGet(ctx context.Context, keys ...string) SliceCmd {
ctx = WithSkipCheck(ctx)
if len(keys) <= 1 {
return c.MGet(ctx, keys...)
}
var slot2Keys = make(map[uint16][]string)
var keyIndexes = make(map[string]int)
for i, key := range keys {
keySlot := slot(key)
slot2Keys[keySlot] = append(slot2Keys[keySlot], key)
keyIndexes[key] = i
}
if len(slot2Keys) == 1 {
return c.MGet(ctx, keys...)
}
var mx sync.Mutex
var scs = make(map[uint16]SliceCmd)
parallelK(c.maxp, slot2Keys, func(k uint16) {
ret := c.MGet(WithSkipCheck(context.Background()), slot2Keys[k]...)
mx.Lock()
scs[k] = ret
mx.Unlock()
})
var res = make([]any, len(keys))
for i, ret := range scs {
if err := ret.Err(); err != nil {
return newSliceCmdFromSlice(nil, err, keys...)
}
_values := ret.Val()
for _i, _key := range slot2Keys[i] {
res[keyIndexes[_key]] = _values[_i]
}
}
return newSliceCmdFromSlice(res, nil, keys...)
}