Skip to content

Commit

Permalink
Upgrade the PD client to adopt the latest refactor
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Dec 20, 2024
1 parent d9749cd commit 85d7ad9
Show file tree
Hide file tree
Showing 47 changed files with 237 additions and 172 deletions.
4 changes: 2 additions & 2 deletions br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -103,7 +103,7 @@ type CliEnv struct {
}

func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
stores, err := c.Cache.PDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := c.Cache.PDClient().GetAllStores(ctx, opt.WithExcludeTombstone())
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/conn/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/util/engine"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
)

// StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV
Expand All @@ -34,7 +34,7 @@ type StoreMeta interface {
// GetAllStores gets all stores from pd.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error)
GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error)
}

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
Expand All @@ -45,7 +45,7 @@ func GetAllTiKVStores(
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := pdClient.GetAllStores(ctx, opt.WithExcludeTombstone())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/pingcap/tidb/pkg/util/codec"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
"github.com/tikv/pd/client/pkg/retry"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -154,11 +156,11 @@ func NewPdController(
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)),
}
pdClient, err := pd.NewClientWithContext(
ctx, pdAddrs, securityOption,
pd.WithGRPCDialOptions(maxCallMsgSize...),
ctx, caller.Component("br-pd-controller"), pdAddrs, securityOption,
opt.WithGRPCDialOptions(maxCallMsgSize...),
// If the time too short, we may scatter a region many times, because
// the interface `ScatterRegions` may time out.
pd.WithCustomTimeoutOption(60*time.Second),
opt.WithCustomTimeoutOption(60*time.Second),
)
if err != nil {
log.Error("fail to create pd client", zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/internal/rawkv/rawkv_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/rawkv"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
)

// RawkvClient is the interface for rawkv.client
Expand All @@ -29,7 +29,7 @@ func NewRawkvClient(ctx context.Context, pdAddrs []string, security config.Secur
ctx,
pdAddrs,
security,
pd.WithCustomTimeoutOption(10*time.Second))
opt.WithCustomTimeoutOption(10*time.Second))
}

type KVPair struct {
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/restore/snap_client/placement_rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
)

func generateTables() []*snapclient.CreatedTable {
Expand Down Expand Up @@ -104,8 +104,8 @@ func TestContextManagerOnlineNoStores(t *testing.T) {
require.NoError(t, err)
}

func generateRegions() []*pd.Region {
return []*pd.Region{
func generateRegions() []*router.Region {
return []*router.Region{
{
Meta: &metapb.Region{
Id: 0,
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/pkg/util/intest"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/opt"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -210,7 +211,7 @@ func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionIn
logutil.Key("end", v.Region.EndKey),
zap.Uint64("id", v.Region.Id))
}
resp, err := c.client.ScatterRegions(ctx, regionsID, pd.WithSkipStoreLimit())
resp, err := c.client.ScatterRegions(ctx, regionsID, opt.WithSkipStoreLimit())
if err != nil {
return err
}
Expand Down
48 changes: 25 additions & 23 deletions br/pkg/restore/split/mock_pd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"github.com/pingcap/tidb/pkg/store/pdtypes"
"github.com/pingcap/tidb/pkg/util/codec"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
pdhttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/opt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -248,8 +250,8 @@ func (c *MockPDClientForSplit) ScanRegions(
_ context.Context,
key, endKey []byte,
limit int,
_ ...pd.GetRegionOption,
) ([]*pd.Region, error) {
_ ...opt.GetRegionOption,
) ([]*router.Region, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -264,9 +266,9 @@ func (c *MockPDClientForSplit) ScanRegions(
}

regions := c.Regions.ScanRange(key, endKey, limit)
ret := make([]*pd.Region, 0, len(regions))
ret := make([]*router.Region, 0, len(regions))
for _, r := range regions {
ret = append(ret, &pd.Region{
ret = append(ret, &router.Region{
Meta: r.Meta,
Leader: r.Leader,
})
Expand All @@ -276,10 +278,10 @@ func (c *MockPDClientForSplit) ScanRegions(

func (c *MockPDClientForSplit) BatchScanRegions(
_ context.Context,
keyRanges []pd.KeyRange,
keyRanges []router.KeyRange,
limit int,
_ ...pd.GetRegionOption,
) ([]*pd.Region, error) {
_ ...opt.GetRegionOption,
) ([]*router.Region, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -293,7 +295,7 @@ func (c *MockPDClientForSplit) BatchScanRegions(
c.scanRegions.beforeHook()
}

regions := make([]*pd.Region, 0, len(keyRanges))
regions := make([]*router.Region, 0, len(keyRanges))
var lastRegion *pdtypes.Region
for _, keyRange := range keyRanges {
if lastRegion != nil {
Expand All @@ -307,7 +309,7 @@ func (c *MockPDClientForSplit) BatchScanRegions(
rs := c.Regions.ScanRange(keyRange.StartKey, keyRange.EndKey, limit)
for _, r := range rs {
lastRegion = r
regions = append(regions, &pd.Region{
regions = append(regions, &router.Region{
Meta: r.Meta,
Leader: r.Leader,
})
Expand All @@ -316,13 +318,13 @@ func (c *MockPDClientForSplit) BatchScanRegions(
return regions, nil
}

func (c *MockPDClientForSplit) GetRegionByID(_ context.Context, regionID uint64, _ ...pd.GetRegionOption) (*pd.Region, error) {
func (c *MockPDClientForSplit) GetRegionByID(_ context.Context, regionID uint64, _ ...opt.GetRegionOption) (*router.Region, error) {
c.mu.Lock()
defer c.mu.Unlock()

for _, r := range c.Regions.Regions {
if r.Meta.Id == regionID {
return &pd.Region{
return &router.Region{
Meta: r.Meta,
Leader: r.Leader,
}, nil
Expand Down Expand Up @@ -370,7 +372,7 @@ func (c *MockPDClientForSplit) ScatterRegion(_ context.Context, regionID uint64)
return newRegionNotFullyReplicatedErr(regionID)
}

func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uint64, _ ...pd.RegionsOption) (*pdpb.ScatterRegionResponse, error) {
func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uint64, _ ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -517,7 +519,7 @@ func (fpdh *FakePDHTTPClient) DeletePlacementRule(_ context.Context, groupID str
type FakePDClient struct {
pd.Client
stores []*metapb.Store
regions []*pd.Region
regions []*router.Region

notLeader bool
retryTimes *int
Expand All @@ -540,21 +542,21 @@ func NewFakePDClient(stores []*metapb.Store, notLeader bool, retryTime *int) *Fa
}
}

func (fpdc *FakePDClient) SetRegions(regions []*pd.Region) {
func (fpdc *FakePDClient) SetRegions(regions []*router.Region) {
fpdc.regions = regions
}

func (fpdc *FakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) {
func (fpdc *FakePDClient) GetAllStores(context.Context, ...opt.GetStoreOption) ([]*metapb.Store, error) {
return append([]*metapb.Store{}, fpdc.stores...), nil
}

func (fpdc *FakePDClient) ScanRegions(
ctx context.Context,
key, endKey []byte,
limit int,
opts ...pd.GetRegionOption,
) ([]*pd.Region, error) {
regions := make([]*pd.Region, 0, len(fpdc.regions))
opts ...opt.GetRegionOption,
) ([]*router.Region, error) {
regions := make([]*router.Region, 0, len(fpdc.regions))
fpdc.peerStoreId = fpdc.peerStoreId + 1
peerStoreId := (fpdc.peerStoreId + 1) / 2
for _, region := range fpdc.regions {
Expand All @@ -572,11 +574,11 @@ func (fpdc *FakePDClient) ScanRegions(

func (fpdc *FakePDClient) BatchScanRegions(
ctx context.Context,
ranges []pd.KeyRange,
ranges []router.KeyRange,
limit int,
opts ...pd.GetRegionOption,
) ([]*pd.Region, error) {
regions := make([]*pd.Region, 0, len(fpdc.regions))
opts ...opt.GetRegionOption,
) ([]*router.Region, error) {
regions := make([]*router.Region, 0, len(fpdc.regions))
fpdc.peerStoreId = fpdc.peerStoreId + 1
peerStoreId := (fpdc.peerStoreId + 1) / 2
for _, region := range fpdc.regions {
Expand Down Expand Up @@ -633,7 +635,7 @@ func (f *FakeSplitClient) AppendRegion(startKey, endKey []byte) {
})
}

func (f *FakeSplitClient) AppendPdRegion(region *pd.Region) {
func (f *FakeSplitClient) AppendPdRegion(region *router.Region) {
f.regions = append(f.regions, &RegionInfo{
Region: region.Meta,
Leader: region.Leader,
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (c PDRegionScanner) RegionScan(ctx context.Context, key, endKey []byte, lim
}

func (c PDRegionScanner) Stores(ctx context.Context) ([]Store, error) {
res, err := c.Client.GetAllStores(ctx, pd.WithExcludeTombstone())
res, err := c.Client.GetAllStores(ctx, opt.WithExcludeTombstone())
if err != nil {
return nil, err
}
Expand Down
12 changes: 7 additions & 5 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -855,12 +857,12 @@ type mockPDClient struct {
fakeRegions []*region
}

func (p *mockPDClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, _ ...pd.GetRegionOption) ([]*pd.Region, error) {
func (p *mockPDClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, _ ...opt.GetRegionOption) ([]*router.Region, error) {
sort.Slice(p.fakeRegions, func(i, j int) bool {
return bytes.Compare(p.fakeRegions[i].rng.StartKey, p.fakeRegions[j].rng.StartKey) < 0
})

result := make([]*pd.Region, 0, len(p.fakeRegions))
result := make([]*router.Region, 0, len(p.fakeRegions))
for _, region := range p.fakeRegions {
if spans.Overlaps(kv.KeyRange{StartKey: key, EndKey: endKey}, region.rng) && len(result) < limit {
regionInfo := newMockRegion(region.id, region.rng.StartKey, region.rng.EndKey)
Expand All @@ -879,7 +881,7 @@ func (p *mockPDClient) GetStore(_ context.Context, storeID uint64) (*metapb.Stor
}, nil
}

func (p *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
func (p *mockPDClient) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) {
// only used for GetRegionCache once in resolve lock
return []*metapb.Store{
{
Expand All @@ -893,14 +895,14 @@ func (p *mockPDClient) GetClusterID(ctx context.Context) uint64 {
return 1
}

func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *pd.Region {
func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *router.Region {
leader := &metapb.Peer{
Id: regionID,
StoreId: 1,
Role: metapb.PeerRole_Voter,
}

return &pd.Region{
return &router.Region{
Meta: &metapb.Region{
Id: regionID,
StartKey: startKey,
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/task/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
"google.golang.org/grpc/keepalive"
)

Expand All @@ -56,7 +57,7 @@ func (m mockPDClient) GetClusterID(_ context.Context) uint64 {
return 1
}

func (m mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
func (m mockPDClient) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) {
return []*metapb.Store{}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/utils/storewatch/watching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/pingcap/tidb/br/pkg/conn/util"
"github.com/pingcap/tidb/br/pkg/utils/storewatch"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
)

type SequentialReturningStoreMeta struct {
Expand All @@ -20,7 +20,7 @@ func NewSequentialReturningStoreMeta(sequence [][]*metapb.Store) util.StoreMeta
return &SequentialReturningStoreMeta{sequence: sequence}
}

func (s *SequentialReturningStoreMeta) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
func (s *SequentialReturningStoreMeta) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) {
if len(s.sequence) == 0 {
return nil, fmt.Errorf("too many call to `GetAllStores` in test")
}
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/pkg/util/dbutil"
"github.com/pingcap/tidb/pkg/util/engine"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -86,7 +87,7 @@ type VerChecker func(store *metapb.Store, ver *semver.Version) error

// CheckClusterVersion check TiKV version.
func CheckClusterVersion(ctx context.Context, client pd.Client, checker VerChecker) error {
stores, err := client.GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := client.GetAllStores(ctx, opt.WithExcludeTombstone())
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 85d7ad9

Please sign in to comment.