Skip to content

Commit

Permalink
copr: fix the issue that busy threshold may redirect batch copr to fo…
Browse files Browse the repository at this point in the history
…llowers (#58193) (#58416)

close #58001
  • Loading branch information
ti-chi-bot authored Dec 20, 2024
1 parent 6722c39 commit 7902d46
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 46 deletions.
2 changes: 1 addition & 1 deletion store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ go_test(
embed = [":copr"],
flaky = True,
race = "on",
shard_count = 30,
shard_count = 31,
deps = [
"//kv",
"//store/driver/backoff",
Expand Down
7 changes: 7 additions & 0 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,17 @@ func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) {
// disable paging for batched task.
b.tasks[idx].paging = false
b.tasks[idx].pagingSize = 0
// The task and it's batched can be served only in the store we chose.
// If the task is redirected to other replica, the batched task may not meet region-miss or store-not-match error.
// So disable busy threshold for the task which carries batched tasks.
b.tasks[idx].busyThreshold = 0
}
if task.RowCountHint > 0 {
b.tasks[idx].RowCountHint += task.RowCountHint
}
batchedTask.task.paging = false
batchedTask.task.pagingSize = 0
batchedTask.task.busyThreshold = 0
b.tasks[idx].batchTaskList[task.taskID] = batchedTask
}
handled = true
Expand Down
51 changes: 51 additions & 0 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package copr
import (
"context"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -881,3 +882,53 @@ func TestSmallTaskConcurrencyLimit(t *testing.T) {
require.Equal(t, smallConcPerCore, conc)
require.Equal(t, smallTaskCount, count)
}

func TestBatchStoreCoprOnlySendToLeader(t *testing.T) {
// nil --- 'g' --- 'n' --- 't' --- nil
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()
_, _, _ = testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()
cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()

bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)
req := &kv.Request{
StoreBatchSize: 3,
StoreBusyThreshold: time.Second,
}
ranges := buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z")
tasks, err := buildCopTasks(bo, ranges, &buildCopTaskOpt{
req: req,
cache: cache,
rowHints: []int{1, 1, 3, 3},
})
require.Len(t, tasks, 1)
require.Zero(t, tasks[0].busyThreshold)
batched := tasks[0].batchTaskList
require.Len(t, batched, 3)
for _, task := range batched {
require.Zero(t, task.task.busyThreshold)
}

req = &kv.Request{
StoreBatchSize: 0,
StoreBusyThreshold: time.Second,
}
tasks, err = buildCopTasks(bo, ranges, &buildCopTaskOpt{
req: req,
cache: cache,
rowHints: []int{1, 1, 3, 3},
})
require.Len(t, tasks, 4)
for _, task := range tasks {
require.Equal(t, task.busyThreshold, time.Second)
}
}
56 changes: 11 additions & 45 deletions store/copr/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ package copr

import (
"bytes"
"math"
"strconv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
Expand Down Expand Up @@ -313,57 +311,25 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *tikv.Store

// BuildBatchTask fetches store and peer info for cop task, wrap it as `batchedCopTask`.
func (c *RegionCache) BuildBatchTask(bo *Backoffer, req *kv.Request, task *copTask, replicaRead kv.ReplicaReadType) (*batchedCopTask, error) {
var (
rpcContext *tikv.RPCContext
err error
)
if replicaRead == kv.ReplicaReadFollower {
followerStoreSeed := uint32(0)
leastEstWaitTime := time.Duration(math.MaxInt64)
var (
firstFollowerPeer *uint64
followerContext *tikv.RPCContext
)
for {
followerContext, err = c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), followerStoreSeed)
if err != nil {
return nil, err
}
if firstFollowerPeer == nil {
firstFollowerPeer = &rpcContext.Peer.Id
} else if *firstFollowerPeer == rpcContext.Peer.Id {
break
}
estWaitTime := followerContext.Store.EstimatedWaitTime()
// the wait time of this follower is under given threshold, choose it.
if estWaitTime > req.StoreBusyThreshold {
continue
}
if rpcContext == nil {
rpcContext = followerContext
} else if estWaitTime < leastEstWaitTime {
leastEstWaitTime = estWaitTime
rpcContext = followerContext
}
followerStoreSeed++
}
// all replicas are busy, fallback to leader.
if rpcContext == nil {
replicaRead = kv.ReplicaReadLeader
}
if replicaRead != kv.ReplicaReadLeader {
return nil, nil
}

if replicaRead == kv.ReplicaReadLeader {
rpcContext, err = c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0)
if err != nil {
return nil, err
}
rpcContext, err := c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0)
if err != nil {
return nil, err
}

// fallback to non-batch path
if rpcContext == nil {
return nil, nil
}

// when leader is busy, we don't batch the cop task to allow the load balance to work.
if rpcContext.Store.EstimatedWaitTime() > req.StoreBusyThreshold {
return nil, nil
}

return &batchedCopTask{
task: task,
region: coprocessor.RegionInfo{
Expand Down

0 comments on commit 7902d46

Please sign in to comment.