Skip to content

Commit

Permalink
refactor: 整理代码
Browse files Browse the repository at this point in the history
  • Loading branch information
huangqing.zhu committed Aug 14, 2024
1 parent 6077edb commit af55249
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 15 deletions.
20 changes: 8 additions & 12 deletions cmd_safe.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,16 @@ func (c *client) SafeMGet(ctx context.Context, keys ...string) SliceCmd {
if len(slot2Keys) == 1 {
return c.MGet(ctx, keys...)
}
var wg sync.WaitGroup

var mx sync.Mutex
var scs = make(map[uint16]SliceCmd)
wg.Add(len(slot2Keys))
for i, sameSlotKeys := range slot2Keys {
go func(_i uint16, _keys []string) {
ret := c.MGet(WithSkipCheck(context.Background()), _keys...)
mx.Lock()
scs[_i] = ret
mx.Unlock()
wg.Done()
}(i, sameSlotKeys)
}
wg.Wait()

parallelK(c.maxp, slot2Keys, func(k uint16) {
ret := c.MGet(WithSkipCheck(context.Background()), slot2Keys[k]...)
mx.Lock()
scs[k] = ret
mx.Unlock()
})

var res = make([]any, len(keys))
for i, ret := range scs {
Expand Down
3 changes: 2 additions & 1 deletion connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/redis/rueidis"
"github.com/redis/rueidis/rueidiscompat"
"regexp"
"runtime"
"strings"
)

Expand Down Expand Up @@ -159,7 +160,7 @@ func (c *client) Close() error {
}

func Connect(v ConfInterface) (Cmdable, error) {
c := &client{v: v, handler: newBaseHandler(v)}
c := &client{v: v, handler: newBaseHandler(v), maxp: runtime.GOMAXPROCS(0)}
err := c.connect()
if err != nil {
for i := 0; i < len(reconnectErrors); i++ {
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,3 @@ require (
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

//replace github.com/sandwich-go/rueidis => ../rueidis
3 changes: 3 additions & 0 deletions redisson.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type client struct {
adapter rueidiscompat.Cmdable
ttl time.Duration
builder builder
maxp int

once sync.Once
}
Expand Down Expand Up @@ -58,6 +59,7 @@ func (c *client) ForEachNodes(ctx context.Context, f func(context.Context, Cmdab
cmd: v,
adapter: rueidiscompat.NewAdapter(v),
builder: c.builder,
maxp: c.maxp,
})
if err != nil {
errs.Push(err)
Expand All @@ -79,6 +81,7 @@ func (c *client) Cache(ttl time.Duration) CacheCmdable {
adapter: c.adapter,
ttl: ttl,
builder: c.builder,
maxp: c.maxp,
}
return cp
}
Expand Down
31 changes: 31 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/redis/rueidis"
"strconv"
"sync"
"time"
)

Expand Down Expand Up @@ -215,3 +216,33 @@ func toFloat64(val any) (float64, error) {
return 0, fmt.Errorf("redis: unexpected type=%T for Float64", t)
}
}

func worker[V any](wg *sync.WaitGroup, ch chan V, fn func(V)) {
for v := range ch {
fn(v)
}
wg.Done()
}

func closeThenParallel[V any](maxp int, ch chan V, fn func(V)) {
close(ch)
concurrency := len(ch)
if concurrency > maxp {
concurrency = maxp
}
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 1; i < concurrency; i++ {
go worker(&wg, ch, fn)
}
worker(&wg, ch, fn)
wg.Wait()
}

func parallelK[K comparable, V any](maxp int, p map[K]V, fn func(K)) {
ch := make(chan K, len(p))
for k := range p {
ch <- k
}
closeThenParallel(maxp, ch, fn)
}

0 comments on commit af55249

Please sign in to comment.