From af55249c00873ae06ff976156d1b0a5cafb667b2 Mon Sep 17 00:00:00 2001 From: "huangqing.zhu" Date: Wed, 14 Aug 2024 14:44:27 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E6=95=B4=E7=90=86=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd_safe.go | 20 ++++++++------------ connect.go | 3 ++- go.mod | 2 -- redisson.go | 3 +++ util.go | 31 +++++++++++++++++++++++++++++++ 5 files changed, 44 insertions(+), 15 deletions(-) diff --git a/cmd_safe.go b/cmd_safe.go index 90be9db..41d0e11 100644 --- a/cmd_safe.go +++ b/cmd_safe.go @@ -31,20 +31,16 @@ func (c *client) SafeMGet(ctx context.Context, keys ...string) SliceCmd { if len(slot2Keys) == 1 { return c.MGet(ctx, keys...) } - var wg sync.WaitGroup + var mx sync.Mutex var scs = make(map[uint16]SliceCmd) - wg.Add(len(slot2Keys)) - for i, sameSlotKeys := range slot2Keys { - go func(_i uint16, _keys []string) { - ret := c.MGet(WithSkipCheck(context.Background()), _keys...) - mx.Lock() - scs[_i] = ret - mx.Unlock() - wg.Done() - }(i, sameSlotKeys) - } - wg.Wait() + + parallelK(c.maxp, slot2Keys, func(k uint16) { + ret := c.MGet(WithSkipCheck(context.Background()), slot2Keys[k]...) + mx.Lock() + scs[k] = ret + mx.Unlock() + }) var res = make([]any, len(keys)) for i, ret := range scs { diff --git a/connect.go b/connect.go index 6110332..d6a96ea 100644 --- a/connect.go +++ b/connect.go @@ -8,6 +8,7 @@ import ( "github.com/redis/rueidis" "github.com/redis/rueidis/rueidiscompat" "regexp" + "runtime" "strings" ) @@ -159,7 +160,7 @@ func (c *client) Close() error { } func Connect(v ConfInterface) (Cmdable, error) { - c := &client{v: v, handler: newBaseHandler(v)} + c := &client{v: v, handler: newBaseHandler(v), maxp: runtime.GOMAXPROCS(0)} err := c.connect() if err != nil { for i := 0; i < len(reconnectErrors); i++ { diff --git a/go.mod b/go.mod index 8e6a826..2e6f17c 100644 --- a/go.mod +++ b/go.mod @@ -29,5 +29,3 @@ require ( google.golang.org/protobuf v1.28.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) - -//replace github.com/sandwich-go/rueidis => ../rueidis diff --git a/redisson.go b/redisson.go index f4ecff7..b69a6c9 100644 --- a/redisson.go +++ b/redisson.go @@ -30,6 +30,7 @@ type client struct { adapter rueidiscompat.Cmdable ttl time.Duration builder builder + maxp int once sync.Once } @@ -58,6 +59,7 @@ func (c *client) ForEachNodes(ctx context.Context, f func(context.Context, Cmdab cmd: v, adapter: rueidiscompat.NewAdapter(v), builder: c.builder, + maxp: c.maxp, }) if err != nil { errs.Push(err) @@ -79,6 +81,7 @@ func (c *client) Cache(ttl time.Duration) CacheCmdable { adapter: c.adapter, ttl: ttl, builder: c.builder, + maxp: c.maxp, } return cp } diff --git a/util.go b/util.go index c585718..6363946 100644 --- a/util.go +++ b/util.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/redis/rueidis" "strconv" + "sync" "time" ) @@ -215,3 +216,33 @@ func toFloat64(val any) (float64, error) { return 0, fmt.Errorf("redis: unexpected type=%T for Float64", t) } } + +func worker[V any](wg *sync.WaitGroup, ch chan V, fn func(V)) { + for v := range ch { + fn(v) + } + wg.Done() +} + +func closeThenParallel[V any](maxp int, ch chan V, fn func(V)) { + close(ch) + concurrency := len(ch) + if concurrency > maxp { + concurrency = maxp + } + var wg sync.WaitGroup + wg.Add(concurrency) + for i := 1; i < concurrency; i++ { + go worker(&wg, ch, fn) + } + worker(&wg, ch, fn) + wg.Wait() +} + +func parallelK[K comparable, V any](maxp int, p map[K]V, fn func(K)) { + ch := make(chan K, len(p)) + for k := range p { + ch <- k + } + closeThenParallel(maxp, ch, fn) +}