From 0e8065dae5e777777e77b9dd65cb43e32aa046cd Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 20 Dec 2024 10:50:30 +0800 Subject: [PATCH] Upgrade the PD client to adopt the latest refactor Signed-off-by: JmPotato --- br/pkg/backup/prepare_snap/env.go | 4 +- br/pkg/conn/util/util.go | 6 +- br/pkg/pdutil/pd.go | 10 ++-- br/pkg/restore/internal/rawkv/rawkv_client.go | 4 +- .../placement_rule_manager_test.go | 6 +- br/pkg/restore/split/client.go | 3 +- br/pkg/restore/split/mock_pd_client.go | 48 ++++++++-------- br/pkg/streamhelper/advancer_env.go | 3 +- br/pkg/streamhelper/basic_lib_for_test.go | 12 ++-- br/pkg/task/config_test.go | 3 +- br/pkg/utils/storewatch/watching_test.go | 4 +- br/pkg/version/version.go | 3 +- br/pkg/version/version_test.go | 3 +- br/tests/br_key_locked/codec.go | 14 +++-- br/tests/br_key_locked/locker.go | 3 +- br/tests/br_z_gc_safepoint/gc.go | 3 +- dumpling/export/dump.go | 3 +- go.mod | 6 +- go.sum | 12 ++-- lightning/pkg/importer/import.go | 9 ++- pkg/ddl/index.go | 4 +- pkg/ddl/ingest/backend_mgr.go | 8 +-- pkg/ddl/ingest/mock.go | 4 +- pkg/ddl/partition.go | 4 +- pkg/ddl/tests/tiflash/ddl_tiflash_test.go | 4 +- pkg/disttask/importinto/planner_test.go | 4 +- pkg/domain/domain.go | 3 +- pkg/domain/domain_sysvars.go | 11 ++-- pkg/domain/domain_test.go | 3 +- .../infosync/resource_manager_client.go | 3 +- pkg/domain/infosync/tiflash_manager.go | 3 +- pkg/executor/importer/table_import.go | 3 +- pkg/executor/importer/table_import_test.go | 8 ++- .../calibrate_resource_test.go | 3 +- pkg/lightning/backend/local/local.go | 17 +++--- pkg/lightning/backend/local/local_test.go | 8 ++- pkg/lightning/tikv/local_sst_writer_test.go | 3 +- .../handler/tikvhandler/tikv_handler.go | 5 +- pkg/store/copr/copr_test/coprocessor_test.go | 3 +- pkg/store/copr/mpp.go | 3 +- pkg/store/driver/tikv_driver.go | 10 ++-- pkg/store/mockstore/unistore/pd.go | 55 ++++++++++++------- pkg/store/mockstore/unistore/pd/client.go | 17 +++--- .../mockstore/unistore/tikv/mock_region.go | 31 ++++++----- pkg/ttl/cache/split_test.go | 18 +++--- pkg/util/metricsutil/common.go | 12 ++-- tests/realtikvtest/brietest/operator_test.go | 3 +- 47 files changed, 237 insertions(+), 172 deletions(-) diff --git a/br/pkg/backup/prepare_snap/env.go b/br/pkg/backup/prepare_snap/env.go index f0b2301f76477..cb50c2dea1fa3 100644 --- a/br/pkg/backup/prepare_snap/env.go +++ b/br/pkg/backup/prepare_snap/env.go @@ -29,7 +29,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/util/engine" "github.com/tikv/client-go/v2/tikv" - pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -103,7 +103,7 @@ type CliEnv struct { } func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) { - stores, err := c.Cache.PDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) + stores, err := c.Cache.PDClient().GetAllStores(ctx, opt.WithExcludeTombstone()) if err != nil { return nil, err } diff --git a/br/pkg/conn/util/util.go b/br/pkg/conn/util/util.go index 9f8e1531acfd7..06b2dcd7df4fc 100644 --- a/br/pkg/conn/util/util.go +++ b/br/pkg/conn/util/util.go @@ -9,7 +9,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/pkg/util/engine" - pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" ) // StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV @@ -34,7 +34,7 @@ type StoreMeta interface { // GetAllStores gets all stores from pd. // The store may expire later. Caller is responsible for caching and taking care // of store change. - GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) + GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) } // GetAllTiKVStores returns all TiKV stores registered to the PD client. The @@ -45,7 +45,7 @@ func GetAllTiKVStores( storeBehavior StoreBehavior, ) ([]*metapb.Store, error) { // get all live stores. - stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone()) + stores, err := pdClient.GetAllStores(ctx, opt.WithExcludeTombstone()) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index 31f20cd8af433..bc5682eefdc91 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -23,7 +23,9 @@ import ( "github.com/pingcap/tidb/pkg/util/codec" pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" - "github.com/tikv/pd/client/retry" + "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" + "github.com/tikv/pd/client/pkg/retry" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -154,11 +156,11 @@ func NewPdController( grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)), } pdClient, err := pd.NewClientWithContext( - ctx, pdAddrs, securityOption, - pd.WithGRPCDialOptions(maxCallMsgSize...), + ctx, caller.Component("br-pd-controller"), pdAddrs, securityOption, + opt.WithGRPCDialOptions(maxCallMsgSize...), // If the time too short, we may scatter a region many times, because // the interface `ScatterRegions` may time out. - pd.WithCustomTimeoutOption(60*time.Second), + opt.WithCustomTimeoutOption(60*time.Second), ) if err != nil { log.Error("fail to create pd client", zap.Error(err)) diff --git a/br/pkg/restore/internal/rawkv/rawkv_client.go b/br/pkg/restore/internal/rawkv/rawkv_client.go index e353aac231849..4a005dca50322 100644 --- a/br/pkg/restore/internal/rawkv/rawkv_client.go +++ b/br/pkg/restore/internal/rawkv/rawkv_client.go @@ -11,7 +11,7 @@ import ( "github.com/pingcap/tidb/pkg/util/hack" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/rawkv" - pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" ) // RawkvClient is the interface for rawkv.client @@ -29,7 +29,7 @@ func NewRawkvClient(ctx context.Context, pdAddrs []string, security config.Secur ctx, pdAddrs, security, - pd.WithCustomTimeoutOption(10*time.Second)) + opt.WithCustomTimeoutOption(10*time.Second)) } type KVPair struct { diff --git a/br/pkg/restore/snap_client/placement_rule_manager_test.go b/br/pkg/restore/snap_client/placement_rule_manager_test.go index 1e60aaa5b93d1..2b2cfe3084476 100644 --- a/br/pkg/restore/snap_client/placement_rule_manager_test.go +++ b/br/pkg/restore/snap_client/placement_rule_manager_test.go @@ -31,7 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" "github.com/stretchr/testify/require" - pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" ) func generateTables() []*snapclient.CreatedTable { @@ -104,8 +104,8 @@ func TestContextManagerOnlineNoStores(t *testing.T) { require.NoError(t, err) } -func generateRegions() []*pd.Region { - return []*pd.Region{ +func generateRegions() []*router.Region { + return []*router.Region{ { Meta: &metapb.Region{ Id: 0, diff --git a/br/pkg/restore/split/client.go b/br/pkg/restore/split/client.go index 117809300dde7..e029db110aae0 100644 --- a/br/pkg/restore/split/client.go +++ b/br/pkg/restore/split/client.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/pkg/util/intest" pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/opt" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -210,7 +211,7 @@ func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionIn logutil.Key("end", v.Region.EndKey), zap.Uint64("id", v.Region.Id)) } - resp, err := c.client.ScatterRegions(ctx, regionsID, pd.WithSkipStoreLimit()) + resp, err := c.client.ScatterRegions(ctx, regionsID, opt.WithSkipStoreLimit()) if err != nil { return err } diff --git a/br/pkg/restore/split/mock_pd_client.go b/br/pkg/restore/split/mock_pd_client.go index 535b064700d41..a8fbf0cf5e57f 100644 --- a/br/pkg/restore/split/mock_pd_client.go +++ b/br/pkg/restore/split/mock_pd_client.go @@ -17,7 +17,9 @@ import ( "github.com/pingcap/tidb/pkg/store/pdtypes" "github.com/pingcap/tidb/pkg/util/codec" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" pdhttp "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/opt" "google.golang.org/grpc/codes" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" @@ -248,8 +250,8 @@ func (c *MockPDClientForSplit) ScanRegions( _ context.Context, key, endKey []byte, limit int, - _ ...pd.GetRegionOption, -) ([]*pd.Region, error) { + _ ...opt.GetRegionOption, +) ([]*router.Region, error) { c.mu.Lock() defer c.mu.Unlock() @@ -264,9 +266,9 @@ func (c *MockPDClientForSplit) ScanRegions( } regions := c.Regions.ScanRange(key, endKey, limit) - ret := make([]*pd.Region, 0, len(regions)) + ret := make([]*router.Region, 0, len(regions)) for _, r := range regions { - ret = append(ret, &pd.Region{ + ret = append(ret, &router.Region{ Meta: r.Meta, Leader: r.Leader, }) @@ -276,10 +278,10 @@ func (c *MockPDClientForSplit) ScanRegions( func (c *MockPDClientForSplit) BatchScanRegions( _ context.Context, - keyRanges []pd.KeyRange, + keyRanges []router.KeyRange, limit int, - _ ...pd.GetRegionOption, -) ([]*pd.Region, error) { + _ ...opt.GetRegionOption, +) ([]*router.Region, error) { c.mu.Lock() defer c.mu.Unlock() @@ -293,7 +295,7 @@ func (c *MockPDClientForSplit) BatchScanRegions( c.scanRegions.beforeHook() } - regions := make([]*pd.Region, 0, len(keyRanges)) + regions := make([]*router.Region, 0, len(keyRanges)) var lastRegion *pdtypes.Region for _, keyRange := range keyRanges { if lastRegion != nil { @@ -307,7 +309,7 @@ func (c *MockPDClientForSplit) BatchScanRegions( rs := c.Regions.ScanRange(keyRange.StartKey, keyRange.EndKey, limit) for _, r := range rs { lastRegion = r - regions = append(regions, &pd.Region{ + regions = append(regions, &router.Region{ Meta: r.Meta, Leader: r.Leader, }) @@ -316,13 +318,13 @@ func (c *MockPDClientForSplit) BatchScanRegions( return regions, nil } -func (c *MockPDClientForSplit) GetRegionByID(_ context.Context, regionID uint64, _ ...pd.GetRegionOption) (*pd.Region, error) { +func (c *MockPDClientForSplit) GetRegionByID(_ context.Context, regionID uint64, _ ...opt.GetRegionOption) (*router.Region, error) { c.mu.Lock() defer c.mu.Unlock() for _, r := range c.Regions.Regions { if r.Meta.Id == regionID { - return &pd.Region{ + return &router.Region{ Meta: r.Meta, Leader: r.Leader, }, nil @@ -370,7 +372,7 @@ func (c *MockPDClientForSplit) ScatterRegion(_ context.Context, regionID uint64) return newRegionNotFullyReplicatedErr(regionID) } -func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uint64, _ ...pd.RegionsOption) (*pdpb.ScatterRegionResponse, error) { +func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uint64, _ ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) { c.mu.Lock() defer c.mu.Unlock() @@ -517,7 +519,7 @@ func (fpdh *FakePDHTTPClient) DeletePlacementRule(_ context.Context, groupID str type FakePDClient struct { pd.Client stores []*metapb.Store - regions []*pd.Region + regions []*router.Region notLeader bool retryTimes *int @@ -540,11 +542,11 @@ func NewFakePDClient(stores []*metapb.Store, notLeader bool, retryTime *int) *Fa } } -func (fpdc *FakePDClient) SetRegions(regions []*pd.Region) { +func (fpdc *FakePDClient) SetRegions(regions []*router.Region) { fpdc.regions = regions } -func (fpdc *FakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) { +func (fpdc *FakePDClient) GetAllStores(context.Context, ...opt.GetStoreOption) ([]*metapb.Store, error) { return append([]*metapb.Store{}, fpdc.stores...), nil } @@ -552,9 +554,9 @@ func (fpdc *FakePDClient) ScanRegions( ctx context.Context, key, endKey []byte, limit int, - opts ...pd.GetRegionOption, -) ([]*pd.Region, error) { - regions := make([]*pd.Region, 0, len(fpdc.regions)) + opts ...opt.GetRegionOption, +) ([]*router.Region, error) { + regions := make([]*router.Region, 0, len(fpdc.regions)) fpdc.peerStoreId = fpdc.peerStoreId + 1 peerStoreId := (fpdc.peerStoreId + 1) / 2 for _, region := range fpdc.regions { @@ -572,11 +574,11 @@ func (fpdc *FakePDClient) ScanRegions( func (fpdc *FakePDClient) BatchScanRegions( ctx context.Context, - ranges []pd.KeyRange, + ranges []router.KeyRange, limit int, - opts ...pd.GetRegionOption, -) ([]*pd.Region, error) { - regions := make([]*pd.Region, 0, len(fpdc.regions)) + opts ...opt.GetRegionOption, +) ([]*router.Region, error) { + regions := make([]*router.Region, 0, len(fpdc.regions)) fpdc.peerStoreId = fpdc.peerStoreId + 1 peerStoreId := (fpdc.peerStoreId + 1) / 2 for _, region := range fpdc.regions { @@ -633,7 +635,7 @@ func (f *FakeSplitClient) AppendRegion(startKey, endKey []byte) { }) } -func (f *FakeSplitClient) AppendPdRegion(region *pd.Region) { +func (f *FakeSplitClient) AppendPdRegion(region *router.Region) { f.regions = append(f.regions, &RegionInfo{ Region: region.Meta, Leader: region.Leader, diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index 26abd1e0605f8..b7f45b3d49566 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -16,6 +16,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/txnlock" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -89,7 +90,7 @@ func (c PDRegionScanner) RegionScan(ctx context.Context, key, endKey []byte, lim } func (c PDRegionScanner) Stores(ctx context.Context) ([]Store, error) { - res, err := c.Client.GetAllStores(ctx, pd.WithExcludeTombstone()) + res, err := c.Client.GetAllStores(ctx, opt.WithExcludeTombstone()) if err != nil { return nil, err } diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 22a66c18e27af..22fa031854fbe 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -34,6 +34,8 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnlock" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -855,12 +857,12 @@ type mockPDClient struct { fakeRegions []*region } -func (p *mockPDClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, _ ...pd.GetRegionOption) ([]*pd.Region, error) { +func (p *mockPDClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, _ ...opt.GetRegionOption) ([]*router.Region, error) { sort.Slice(p.fakeRegions, func(i, j int) bool { return bytes.Compare(p.fakeRegions[i].rng.StartKey, p.fakeRegions[j].rng.StartKey) < 0 }) - result := make([]*pd.Region, 0, len(p.fakeRegions)) + result := make([]*router.Region, 0, len(p.fakeRegions)) for _, region := range p.fakeRegions { if spans.Overlaps(kv.KeyRange{StartKey: key, EndKey: endKey}, region.rng) && len(result) < limit { regionInfo := newMockRegion(region.id, region.rng.StartKey, region.rng.EndKey) @@ -879,7 +881,7 @@ func (p *mockPDClient) GetStore(_ context.Context, storeID uint64) (*metapb.Stor }, nil } -func (p *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { +func (p *mockPDClient) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) { // only used for GetRegionCache once in resolve lock return []*metapb.Store{ { @@ -893,14 +895,14 @@ func (p *mockPDClient) GetClusterID(ctx context.Context) uint64 { return 1 } -func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *pd.Region { +func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *router.Region { leader := &metapb.Peer{ Id: regionID, StoreId: 1, Role: metapb.PeerRole_Voter, } - return &pd.Region{ + return &router.Region{ Meta: &metapb.Region{ Id: regionID, StartKey: startKey, diff --git a/br/pkg/task/config_test.go b/br/pkg/task/config_test.go index 9d225163d5f9c..8bd7d72e4f3bb 100644 --- a/br/pkg/task/config_test.go +++ b/br/pkg/task/config_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/pkg/tablecodec" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "google.golang.org/grpc/keepalive" ) @@ -56,7 +57,7 @@ func (m mockPDClient) GetClusterID(_ context.Context) uint64 { return 1 } -func (m mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { +func (m mockPDClient) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) { return []*metapb.Store{}, nil } diff --git a/br/pkg/utils/storewatch/watching_test.go b/br/pkg/utils/storewatch/watching_test.go index 49ab4034bcfd7..02781954b9b68 100644 --- a/br/pkg/utils/storewatch/watching_test.go +++ b/br/pkg/utils/storewatch/watching_test.go @@ -9,7 +9,7 @@ import ( "github.com/pingcap/tidb/br/pkg/conn/util" "github.com/pingcap/tidb/br/pkg/utils/storewatch" "github.com/stretchr/testify/require" - pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" ) type SequentialReturningStoreMeta struct { @@ -20,7 +20,7 @@ func NewSequentialReturningStoreMeta(sequence [][]*metapb.Store) util.StoreMeta return &SequentialReturningStoreMeta{sequence: sequence} } -func (s *SequentialReturningStoreMeta) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { +func (s *SequentialReturningStoreMeta) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) { if len(s.sequence) == 0 { return nil, fmt.Errorf("too many call to `GetAllStores` in test") } diff --git a/br/pkg/version/version.go b/br/pkg/version/version.go index 519797e0daafc..12064fca0d5b3 100644 --- a/br/pkg/version/version.go +++ b/br/pkg/version/version.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/pkg/util/dbutil" "github.com/pingcap/tidb/pkg/util/engine" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" ) @@ -86,7 +87,7 @@ type VerChecker func(store *metapb.Store, ver *semver.Version) error // CheckClusterVersion check TiKV version. func CheckClusterVersion(ctx context.Context, client pd.Client, checker VerChecker) error { - stores, err := client.GetAllStores(ctx, pd.WithExcludeTombstone()) + stores, err := client.GetAllStores(ctx, opt.WithExcludeTombstone()) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/version/version_test.go b/br/pkg/version/version_test.go index 853c992113526..67944be8cbf21 100644 --- a/br/pkg/version/version_test.go +++ b/br/pkg/version/version_test.go @@ -16,6 +16,7 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" ) type mockPDClient struct { @@ -27,7 +28,7 @@ func (m *mockPDClient) GetClusterID(_ context.Context) uint64 { return 1 } -func (m *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { +func (m *mockPDClient) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) { if m.getAllStores != nil { return m.getAllStores(), nil } diff --git a/br/tests/br_key_locked/codec.go b/br/tests/br_key_locked/codec.go index 5b7f4b93c31f0..bb727b4ca98fb 100644 --- a/br/tests/br_key_locked/codec.go +++ b/br/tests/br_key_locked/codec.go @@ -23,6 +23,8 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/pkg/util/codec" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/opt" ) type codecPDClient struct { @@ -31,13 +33,13 @@ type codecPDClient struct { // GetRegion encodes the key before send requests to pd-server and decodes the // returned StartKey && EndKey from pd-server. -func (c *codecPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *codecPDClient) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { encodedKey := codec.EncodeBytes(nil, key) region, err := c.Client.GetRegion(ctx, encodedKey) return processRegionResult(region, err) } -func (c *codecPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *codecPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { encodedKey := codec.EncodeBytes(nil, key) region, err := c.Client.GetPrevRegion(ctx, encodedKey) return processRegionResult(region, err) @@ -46,7 +48,7 @@ func (c *codecPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...p // GetRegionByID encodes the key before send requests to pd-server and decodes the // returned StartKey && EndKey from pd-server. func (c *codecPDClient) GetRegionByID(ctx context.Context, - regionID uint64, _ ...pd.GetRegionOption) (*pd.Region, error) { + regionID uint64, _ ...opt.GetRegionOption) (*router.Region, error) { region, err := c.Client.GetRegionByID(ctx, regionID) return processRegionResult(region, err) } @@ -56,8 +58,8 @@ func (c *codecPDClient) ScanRegions( startKey []byte, endKey []byte, limit int, - opts ...pd.GetRegionOption, -) ([]*pd.Region, error) { + opts ...opt.GetRegionOption, +) ([]*router.Region, error) { startKey = codec.EncodeBytes(nil, startKey) if len(endKey) > 0 { endKey = codec.EncodeBytes(nil, endKey) @@ -79,7 +81,7 @@ func (c *codecPDClient) ScanRegions( return regions, nil } -func processRegionResult(region *pd.Region, err error) (*pd.Region, error) { +func processRegionResult(region *router.Region, err error) (*router.Region, error) { if err != nil { return nil, errors.Trace(err) } diff --git a/br/tests/br_key_locked/locker.go b/br/tests/br_key_locked/locker.go index edba247f8ff7c..838eb6a1b9a71 100644 --- a/br/tests/br_key_locked/locker.go +++ b/br/tests/br_key_locked/locker.go @@ -45,6 +45,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/pkg/caller" "go.uber.org/zap" ) @@ -85,7 +86,7 @@ func main() { log.Panic("get table id failed", zap.Error(err)) } - pdclient, err := pd.NewClient([]string{*pdAddr}, pd.SecurityOption{ + pdclient, err := pd.NewClient(caller.TestComponent, []string{*pdAddr}, pd.SecurityOption{ CAPath: *ca, CertPath: *cert, KeyPath: *key, diff --git a/br/tests/br_z_gc_safepoint/gc.go b/br/tests/br_z_gc_safepoint/gc.go index 479f38227ce27..c1180fdceb711 100644 --- a/br/tests/br_z_gc_safepoint/gc.go +++ b/br/tests/br_z_gc_safepoint/gc.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/pkg/caller" "go.uber.org/zap" ) @@ -49,7 +50,7 @@ func main() { timeout := time.Second * 10 ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - pdclient, err := pd.NewClientWithContext(ctx, []string{*pdAddr}, pd.SecurityOption{ + pdclient, err := pd.NewClientWithContext(ctx, caller.TestComponent, []string{*pdAddr}, pd.SecurityOption{ CAPath: *ca, CertPath: *cert, KeyPath: *key, diff --git a/dumpling/export/dump.go b/dumpling/export/dump.go index 8174d32746742..8f4825ef001be 100644 --- a/dumpling/export/dump.go +++ b/dumpling/export/dump.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/codec" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/pkg/caller" gatomic "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -1462,7 +1463,7 @@ func tidbSetPDClientForGC(d *Dumper) error { if err != nil { tctx.L().Info("meet error while check whether fetched pd addr and TiDB belong to one cluster. This won't affect dump process", log.ShortError(err), zap.Strings("pdAddrs", pdAddrs)) } else if doPdGC { - pdClient, err := pd.NewClientWithContext(tctx, pdAddrs, pd.SecurityOption{}) + pdClient, err := pd.NewClientWithContext(tctx, caller.Component("dumpling-gc"), pdAddrs, pd.SecurityOption{}) if err != nil { tctx.L().Info("create pd client to control GC failed. This won't affect dump process", log.ShortError(err), zap.Strings("pdAddrs", pdAddrs)) } diff --git a/go.mod b/go.mod index ba0e41c96c7d2..0011256dcf69e 100644 --- a/go.mod +++ b/go.mod @@ -86,7 +86,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 github.com/pingcap/fn v1.0.0 - github.com/pingcap/kvproto v0.0.0-20241120022153-92b0414aeed8 + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e @@ -109,8 +109,8 @@ require ( github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 github.com/tidwall/btree v1.7.0 - github.com/tikv/client-go/v2 v2.0.8-0.20241209094930-06d7f4b9233b - github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 + github.com/tikv/client-go/v2 v2.0.8-0.20241220061251-c5d92baf4928 + github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a github.com/twmb/murmur3 v1.1.6 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index f888642e28e08..ea9e4baebafc8 100644 --- a/go.sum +++ b/go.sum @@ -670,8 +670,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20241120022153-92b0414aeed8 h1:aNNifhc6xCjXKejjiNYtJJLFNMXnoDiXxkJIg1JErQE= -github.com/pingcap/kvproto v0.0.0-20241120022153-92b0414aeed8/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= +github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8= @@ -826,10 +826,10 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= -github.com/tikv/client-go/v2 v2.0.8-0.20241209094930-06d7f4b9233b h1:x8E2J8UuUa2ysUkgVfNGgiXxZ9nfqBpQ43PBLwmCitU= -github.com/tikv/client-go/v2 v2.0.8-0.20241209094930-06d7f4b9233b/go.mod h1:NI2GfVlB9n7DsIGCxrKcD4psrcuFNEV8m1BgyzK1Amc= -github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 h1:oAYc4m5Eu1OY9ogJ103VO47AYPHvhtzbUPD8L8B67Qk= -github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31/go.mod h1:W5a0sDadwUpI9k8p7M77d3jo253ZHdmua+u4Ho4Xw8U= +github.com/tikv/client-go/v2 v2.0.8-0.20241220061251-c5d92baf4928 h1:+5tDGj7EJX1go76thvv39fHIM37hWiWyImYaziDUmaw= +github.com/tikv/client-go/v2 v2.0.8-0.20241220061251-c5d92baf4928/go.mod h1:n9NoEZrKGy/KzWPqRRRoRNcg3SZev0B7rZzZgH4rEaY= +github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d h1:V/LfOrlfS4Ct+AHvjbxVXAZZgqjcpuc53m0wxk0xYF4= +github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d/go.mod h1:Dsiy6gxwRqsqgJPfDSxDbdG+qFvk0cNzvHax0TOPj2Q= github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo= github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a/go.mod h1:mkjARE7Yr8qU23YcGMSALbIxTQ9r9QBVahQOBRfU460= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= diff --git a/lightning/pkg/importer/import.go b/lightning/pkg/importer/import.go index 498ee32cca634..2dfb86afadcaf 100644 --- a/lightning/pkg/importer/import.go +++ b/lightning/pkg/importer/import.go @@ -70,7 +70,8 @@ import ( kvutil "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" - "github.com/tikv/pd/client/retry" + "github.com/tikv/pd/client/pkg/caller" + "github.com/tikv/pd/client/pkg/retry" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/multierr" @@ -286,6 +287,8 @@ type ControllerParam struct { TaskType string } +var componentName = caller.Component("lightning-importer") + // NewImportController creates a new Controller instance. func NewImportController( ctx context.Context, @@ -363,7 +366,7 @@ func NewImportControllerWithPauser( } addrs := strings.Split(cfg.TiDB.PdAddr, ",") - pdCli, err = pd.NewClientWithContext(ctx, addrs, tls.ToPDSecurityOption()) + pdCli, err = pd.NewClientWithContext(ctx, componentName, addrs, tls.ToPDSecurityOption()) if err != nil { return nil, errors.Trace(err) } @@ -1178,7 +1181,7 @@ const ( func (rc *Controller) keepPauseGCForDupeRes(ctx context.Context) (<-chan struct{}, error) { tlsOpt := rc.tls.ToPDSecurityOption() addrs := strings.Split(rc.cfg.TiDB.PdAddr, ",") - pdCli, err := pd.NewClientWithContext(ctx, addrs, tlsOpt) + pdCli, err := pd.NewClientWithContext(ctx, componentName, addrs, tlsOpt) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 9c0e7296879cc..875e1a459722e 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -77,8 +77,8 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" kvutil "github.com/tikv/client-go/v2/util" - pd "github.com/tikv/pd/client" pdHttp "github.com/tikv/pd/client/http" + sd "github.com/tikv/pd/client/servicediscovery" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -2444,7 +2444,7 @@ func (w *worker) addTableIndex( return errors.Trace(err) } -func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo *reorgInfo, discovery pd.ServiceDiscovery) error { +func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo *reorgInfo, discovery sd.ServiceDiscovery) error { var bc ingest.BackendCtx var err error defer func() { diff --git a/pkg/ddl/ingest/backend_mgr.go b/pkg/ddl/ingest/backend_mgr.go index 048f883e4ceef..609e0a1e1a22e 100644 --- a/pkg/ddl/ingest/backend_mgr.go +++ b/pkg/ddl/ingest/backend_mgr.go @@ -30,7 +30,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/backend/local" "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/util/logutil" - pd "github.com/tikv/pd/client" + sd "github.com/tikv/pd/client/servicediscovery" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/zap" @@ -48,7 +48,7 @@ type BackendCtxMgr interface { jobID int64, hasUnique bool, etcdClient *clientv3.Client, - pdSvcDiscovery pd.ServiceDiscovery, + pdSvcDiscovery sd.ServiceDiscovery, resourceGroupName string, importConc int, maxWriteSpeed int, @@ -116,7 +116,7 @@ func (m *litBackendCtxMgr) Register( jobID int64, hasUnique bool, etcdClient *clientv3.Client, - pdSvcDiscovery pd.ServiceDiscovery, + pdSvcDiscovery sd.ServiceDiscovery, resourceGroupName string, concurrency int, maxWriteSpeed int, @@ -177,7 +177,7 @@ func (m *litBackendCtxMgr) EncodeJobSortPath(jobID int64) string { func createLocalBackend( ctx context.Context, cfg *local.BackendConfig, - pdSvcDiscovery pd.ServiceDiscovery, + pdSvcDiscovery sd.ServiceDiscovery, ) (*local.Backend, error) { tidbCfg := config.GetGlobalConfig() tls, err := common.NewTLS( diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index 88b50ad099255..b00547812064a 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -29,7 +29,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/backend/local" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table" - pd "github.com/tikv/pd/client" + sd "github.com/tikv/pd/client/servicediscovery" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -57,7 +57,7 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) { // Register implements BackendCtxMgr.Register interface. func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, - pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, maxWriteSpeed int, initTS uint64) (BackendCtx, error) { + pdSvcDiscovery sd.ServiceDiscovery, resourceGroupName string, importConc int, maxWriteSpeed int, initTS uint64) (BackendCtx, error) { logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID)) if mockCtx, ok := m.runningJobs[jobID]; ok { return mockCtx, nil diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 580acf932b7a4..459c021d9991c 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -65,7 +65,7 @@ import ( "github.com/pingcap/tidb/pkg/util/stringutil" "github.com/tikv/client-go/v2/tikv" kvutil "github.com/tikv/client-go/v2/util" - pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" "go.uber.org/zap" ) @@ -438,7 +438,7 @@ func checkPartitionReplica(replicaCount uint64, addingDefinitions []model.Partit } for _, pDef := range addingDefinitions { startKey, endKey := tablecodec.GetTableHandleKeyRange(pDef.ID) - regions, err := pdCli.BatchScanRegions(ctx, []pd.KeyRange{{StartKey: startKey, EndKey: endKey}}, -1) + regions, err := pdCli.BatchScanRegions(ctx, []router.KeyRange{{StartKey: startKey, EndKey: endKey}}, -1) if err != nil { return needWait, errors.Trace(err) } diff --git a/pkg/ddl/tests/tiflash/ddl_tiflash_test.go b/pkg/ddl/tests/tiflash/ddl_tiflash_test.go index 2884f4efb3eab..52cd7e523c530 100644 --- a/pkg/ddl/tests/tiflash/ddl_tiflash_test.go +++ b/pkg/ddl/tests/tiflash/ddl_tiflash_test.go @@ -54,7 +54,7 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" - pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" "go.uber.org/zap" ) @@ -1437,7 +1437,7 @@ func TestTiFlashReorgPartition(t *testing.T) { stores, _ := pdCli.GetAllStores(ctx) for _, pDef := range args.PartInfo.Definitions { startKey, endKey := tablecodec.GetTableHandleKeyRange(pDef.ID) - regions, _ := pdCli.BatchScanRegions(ctx, []pd.KeyRange{{StartKey: startKey, EndKey: endKey}}, -1) + regions, _ := pdCli.BatchScanRegions(ctx, []router.KeyRange{{StartKey: startKey, EndKey: endKey}}, -1) for i := range regions { // similar as storeHasEngineTiFlashLabel for _, store := range stores { diff --git a/pkg/disttask/importinto/planner_test.go b/pkg/disttask/importinto/planner_test.go index fe33ab8e6d2bf..ac09f98530e54 100644 --- a/pkg/disttask/importinto/planner_test.go +++ b/pkg/disttask/importinto/planner_test.go @@ -34,6 +34,8 @@ import ( pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" ) func TestLogicalPlan(t *testing.T) { @@ -327,7 +329,7 @@ func TestSplitForOneSubtask(t *testing.T) { t.Cleanup(func() { importer.NewClientWithContext = bak }) - importer.NewClientWithContext = func(_ context.Context, _ []string, _ pd.SecurityOption, _ ...pd.ClientOption) (pd.Client, error) { + importer.NewClientWithContext = func(_ context.Context, _ caller.Component, _ []string, _ pd.SecurityOption, _ ...opt.ClientOption) (pd.Client, error) { return nil, errors.New("mock error") } diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 2ac4ebda71bcc..a4283cf20c59e 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -103,6 +103,7 @@ import ( "github.com/tikv/client-go/v2/txnkv/transaction" pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/opt" rmclient "github.com/tikv/pd/client/resource_group/controller" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" @@ -1662,7 +1663,7 @@ func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) erro return nil } - stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone()) + stores, err := pdClient.GetAllStores(ctx, opt.WithExcludeTombstone()) if err != nil { return err } diff --git a/pkg/domain/domain_sysvars.go b/pkg/domain/domain_sysvars.go index 0c540c323ca44..866e79e080ae3 100644 --- a/pkg/domain/domain_sysvars.go +++ b/pkg/domain/domain_sysvars.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/sessionctx/variable" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" ) // initDomainSysVars() is called when a domain is initialized. @@ -62,14 +63,14 @@ func (do *Domain) setPDClientDynamicOption(name, sVal string) error { if err != nil { return err } - err = do.updatePDClient(pd.MaxTSOBatchWaitInterval, time.Duration(float64(time.Millisecond)*val)) + err = do.updatePDClient(opt.MaxTSOBatchWaitInterval, time.Duration(float64(time.Millisecond)*val)) if err != nil { return err } variable.MaxTSOBatchWaitInterval.Store(val) case variable.TiDBEnableTSOFollowerProxy: val := variable.TiDBOptOn(sVal) - err := do.updatePDClient(pd.EnableTSOFollowerProxy, val) + err := do.updatePDClient(opt.EnableTSOFollowerProxy, val) if err != nil { return err } @@ -78,7 +79,7 @@ func (do *Domain) setPDClientDynamicOption(name, sVal string) error { val := variable.TiDBOptOn(sVal) // Note: EnableFollowerHandle is only used for region API now. // If pd support more APIs in follower, the pd option may be changed. - err := do.updatePDClient(pd.EnableFollowerHandle, val) + err := do.updatePDClient(opt.EnableFollowerHandle, val) if err != nil { return err } @@ -97,7 +98,7 @@ func (do *Domain) setPDClientDynamicOption(name, sVal string) error { return variable.ErrWrongValueForVar.GenWithStackByArgs(name, sVal) } - err := do.updatePDClient(pd.TSOClientRPCConcurrency, concurrency) + err := do.updatePDClient(opt.TSOClientRPCConcurrency, concurrency) if err != nil { return err } @@ -118,7 +119,7 @@ func (do *Domain) setLowResolutionTSOUpdateInterval(interval time.Duration) erro } // updatePDClient is used to set the dynamic option into the PD client. -func (do *Domain) updatePDClient(option pd.DynamicOption, val any) error { +func (do *Domain) updatePDClient(option opt.DynamicOption, val any) error { store, ok := do.store.(interface{ GetPDClient() pd.Client }) if !ok { return nil diff --git a/pkg/domain/domain_test.go b/pkg/domain/domain_test.go index 92eff2e83dd98..e4a25577c20d4 100644 --- a/pkg/domain/domain_test.go +++ b/pkg/domain/domain_test.go @@ -42,6 +42,7 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "go.etcd.io/etcd/tests/v3/integration" ) @@ -423,7 +424,7 @@ type mockInfoPdClient struct { err error } -func (c *mockInfoPdClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) { +func (c *mockInfoPdClient) GetAllStores(context.Context, ...opt.GetStoreOption) ([]*metapb.Store, error) { return c.stores, c.err } diff --git a/pkg/domain/infosync/resource_manager_client.go b/pkg/domain/infosync/resource_manager_client.go index 4af1f78e61299..c41651abc4a22 100644 --- a/pkg/domain/infosync/resource_manager_client.go +++ b/pkg/domain/infosync/resource_manager_client.go @@ -26,6 +26,7 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/tidb/pkg/resourcegroup" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" ) type mockResourceManagerClient struct { @@ -144,7 +145,7 @@ func (*mockResourceManagerClient) LoadResourceGroups(context.Context) ([]*rmpb.R return nil, 0, nil } -func (m *mockResourceManagerClient) Watch(_ context.Context, key []byte, _ ...pd.OpOption) (chan []*meta_storagepb.Event, error) { +func (m *mockResourceManagerClient) Watch(_ context.Context, key []byte, _ ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error) { if bytes.Equal(pd.GroupSettingsPathPrefixBytes, key) { return m.eventCh, nil } diff --git a/pkg/domain/infosync/tiflash_manager.go b/pkg/domain/infosync/tiflash_manager.go index cb0278a8f1d91..f108c048e7042 100644 --- a/pkg/domain/infosync/tiflash_manager.go +++ b/pkg/domain/infosync/tiflash_manager.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/syncutil" "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/pd/client/clients/router" pd "github.com/tikv/pd/client/http" "go.uber.org/zap" ) @@ -294,7 +295,7 @@ func (m *TiFlashReplicaManagerCtx) PostAccelerateScheduleBatch(ctx context.Conte if len(tableIDs) == 0 { return nil } - input := make([]*pd.KeyRange, 0, len(tableIDs)) + input := make([]*router.KeyRange, 0, len(tableIDs)) for _, tableID := range tableIDs { startKey := tablecodec.GenTableRecordPrefix(tableID) endKey := tablecodec.EncodeTablePrefix(tableID + 1) diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index eecdee38d96db..db4dd815f03c0 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -61,6 +61,7 @@ import ( "github.com/pingcap/tidb/pkg/util/syncutil" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/util" + "github.com/tikv/pd/client/pkg/caller" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/multierr" "go.uber.org/zap" @@ -139,7 +140,7 @@ func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionS } tlsOpt := tls.ToPDSecurityOption() addrs := strings.Split(tidbCfg.Path, ",") - pdCli, err := NewClientWithContext(ctx, addrs, tlsOpt) + pdCli, err := NewClientWithContext(ctx, caller.Component("tidb-table-importer"), addrs, tlsOpt) if err != nil { return 0, 0, errors.Trace(err) } diff --git a/pkg/executor/importer/table_import_test.go b/pkg/executor/importer/table_import_test.go index 9e49d3574fbb6..fa4aa24af7587 100644 --- a/pkg/executor/importer/table_import_test.go +++ b/pkg/executor/importer/table_import_test.go @@ -27,6 +27,8 @@ import ( "github.com/pingcap/tidb/pkg/lightning/config" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" "go.uber.org/zap" ) @@ -183,7 +185,7 @@ type mockPDClient struct { } // GetAllStores return fake stores. -func (c *mockPDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) { +func (c *mockPDClient) GetAllStores(context.Context, ...opt.GetStoreOption) ([]*metapb.Store, error) { return nil, nil } @@ -195,13 +197,13 @@ func TestGetRegionSplitSizeKeys(t *testing.T) { t.Cleanup(func() { NewClientWithContext = bak }) - NewClientWithContext = func(_ context.Context, _ []string, _ pd.SecurityOption, _ ...pd.ClientOption) (pd.Client, error) { + NewClientWithContext = func(_ context.Context, _ caller.Component, _ []string, _ pd.SecurityOption, _ ...opt.ClientOption) (pd.Client, error) { return nil, errors.New("mock error") } _, _, err := GetRegionSplitSizeKeys(context.Background()) require.ErrorContains(t, err, "mock error") - NewClientWithContext = func(_ context.Context, _ []string, _ pd.SecurityOption, _ ...pd.ClientOption) (pd.Client, error) { + NewClientWithContext = func(_ context.Context, _ caller.Component, _ []string, _ pd.SecurityOption, _ ...opt.ClientOption) (pd.Client, error) { return &mockPDClient{}, nil } _, _, err = GetRegionSplitSizeKeys(context.Background()) diff --git a/pkg/executor/internal/calibrateresource/calibrate_resource_test.go b/pkg/executor/internal/calibrateresource/calibrate_resource_test.go index bb5d552f1e86f..9c49ddd9b746d 100644 --- a/pkg/executor/internal/calibrateresource/calibrate_resource_test.go +++ b/pkg/executor/internal/calibrateresource/calibrate_resource_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" rmclient "github.com/tikv/pd/client/resource_group/controller" ) @@ -781,7 +782,7 @@ type mockResourceGroupProvider struct { cfg rmclient.Config } -func (p *mockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) { +func (p *mockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error) { if !bytes.Equal(pd.ControllerConfigPathPrefixBytes, key) { return nil, errors.New("unsupported configPath") } diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index f378327061be7..28b824c39bd3d 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -60,7 +60,10 @@ import ( tikvclient "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" - "github.com/tikv/pd/client/retry" + "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" + "github.com/tikv/pd/client/pkg/retry" + sd "github.com/tikv/pd/client/servicediscovery" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -544,7 +547,7 @@ func NewBackend( ctx context.Context, tls *common.TLS, config BackendConfig, - pdSvcDiscovery pd.ServiceDiscovery, + pdSvcDiscovery sd.ServiceDiscovery, ) (b *Backend, err error) { var ( pdCli pd.Client @@ -593,11 +596,11 @@ func NewBackend( pdAddrs = strings.Split(config.PDAddr, ",") } pdCli, err = pd.NewClientWithContext( - ctx, pdAddrs, tls.ToPDSecurityOption(), - pd.WithGRPCDialOptions(maxCallMsgSize...), + ctx, caller.Component("lightning-local-backend"), pdAddrs, tls.ToPDSecurityOption(), + opt.WithGRPCDialOptions(maxCallMsgSize...), // If the time too short, we may scatter a region many times, because // the interface `ScatterRegions` may time out. - pd.WithCustomTimeoutOption(60*time.Second), + opt.WithCustomTimeoutOption(60*time.Second), ) if err != nil { return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err) @@ -692,7 +695,7 @@ func (local *Backend) TotalMemoryConsume() int64 { } func checkMultiIngestSupport(ctx context.Context, pdCli pd.Client, factory importClientFactory) (bool, error) { - stores, err := pdCli.GetAllStores(ctx, pd.WithExcludeTombstone()) + stores, err := pdCli.GetAllStores(ctx, opt.WithExcludeTombstone()) if err != nil { return false, errors.Trace(err) } @@ -1810,7 +1813,7 @@ func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) ( // GetRegionSplitSizeKeys return region split size, region split keys, error func GetRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) ( regionSplitSize int64, regionSplitKeys int64, err error) { - stores, err := cli.GetAllStores(ctx, pd.WithExcludeTombstone()) + stores, err := cli.GetAllStores(ctx, opt.WithExcludeTombstone()) if err != nil { return 0, 0, err } diff --git a/pkg/lightning/backend/local/local_test.go b/pkg/lightning/backend/local/local_test.go index 138d71b86bbf8..7fa78cae333a8 100644 --- a/pkg/lightning/backend/local/local_test.go +++ b/pkg/lightning/backend/local/local_test.go @@ -63,7 +63,9 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/opt" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding" @@ -665,14 +667,14 @@ func TestMergeSSTsDuplicated(t *testing.T) { type mockPdClient struct { pd.Client stores []*metapb.Store - regions []*pd.Region + regions []*router.Region } -func (c *mockPdClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { +func (c *mockPdClient) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) { return c.stores, nil } -func (c *mockPdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) { +func (c *mockPdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { return c.regions, nil } diff --git a/pkg/lightning/tikv/local_sst_writer_test.go b/pkg/lightning/tikv/local_sst_writer_test.go index 998d57a39fa38..8d50ac77ad988 100644 --- a/pkg/lightning/tikv/local_sst_writer_test.go +++ b/pkg/lightning/tikv/local_sst_writer_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/pkg/caller" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -78,7 +79,7 @@ func write2ImportService4Test( sortedKVs [][2][]byte, ts uint64, ) ([]*import_sstpb.SSTMeta, error) { - pdClient, err := pd.NewClient(pdAddrs, pd.SecurityOption{}) + pdClient, err := pd.NewClient(caller.TestComponent, pdAddrs, pd.SecurityOption{}) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/server/handler/tikvhandler/tikv_handler.go b/pkg/server/handler/tikvhandler/tikv_handler.go index da6888e660a83..e4cf046948b32 100644 --- a/pkg/server/handler/tikvhandler/tikv_handler.go +++ b/pkg/server/handler/tikvhandler/tikv_handler.go @@ -65,6 +65,7 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/pd/client/clients/router" pd "github.com/tikv/pd/client/http" "go.uber.org/zap" ) @@ -1348,7 +1349,7 @@ func (h *TableHandler) getRegionsByID(tbl table.Table, id int64, name string) (* startKey, endKey := tablecodec.GetTableHandleKeyRange(id) ctx := context.Background() pdCli := h.RegionCache.PDClient() - regions, err := pdCli.BatchScanRegions(ctx, []pd.KeyRange{{StartKey: startKey, EndKey: endKey}}, -1) + regions, err := pdCli.BatchScanRegions(ctx, []router.KeyRange{{StartKey: startKey, EndKey: endKey}}, -1) if err != nil { return nil, err } @@ -1371,7 +1372,7 @@ func (h *TableHandler) getRegionsByID(tbl table.Table, id int64, name string) (* indices[i].Name = index.Meta().Name.String() indices[i].ID = indexID startKey, endKey := tablecodec.GetTableIndexKeyRange(id, indexID) - regions, err := pdCli.BatchScanRegions(ctx, []pd.KeyRange{{StartKey: startKey, EndKey: endKey}}, -1) + regions, err := pdCli.BatchScanRegions(ctx, []router.KeyRange{{StartKey: startKey, EndKey: endKey}}, -1) if err != nil { return nil, err } diff --git a/pkg/store/copr/copr_test/coprocessor_test.go b/pkg/store/copr/copr_test/coprocessor_test.go index e39648ef627d2..9b40a77d9ceef 100644 --- a/pkg/store/copr/copr_test/coprocessor_test.go +++ b/pkg/store/copr/copr_test/coprocessor_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" rmclient "github.com/tikv/pd/client/resource_group/controller" ) @@ -200,7 +201,7 @@ type mockResourceGroupProvider struct { cfg rmclient.Config } -func (p *mockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) { +func (p *mockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error) { if !bytes.Equal(pd.ControllerConfigPathPrefixBytes, key) { return nil, errors.New("unsupported configPath") } diff --git a/pkg/store/copr/mpp.go b/pkg/store/copr/mpp.go index d0fdcaa0bd255..31be876c6f0af 100644 --- a/pkg/store/copr/mpp.go +++ b/pkg/store/copr/mpp.go @@ -36,6 +36,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -309,7 +310,7 @@ func (c *mppStoreCnt) getMPPStoreCount(ctx context.Context, pdClient pd.Client, // update mpp store cache cnt := 0 - stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone()) + stores, err := pdClient.GetAllStores(ctx, opt.WithExcludeTombstone()) failpoint.Inject("mppStoreCountPDError", func(value failpoint.Value) { if value.(bool) { diff --git a/pkg/store/driver/tikv_driver.go b/pkg/store/driver/tikv_driver.go index da1318e06a6ed..e58c781d7ad89 100644 --- a/pkg/store/driver/tikv_driver.go +++ b/pkg/store/driver/tikv_driver.go @@ -42,6 +42,8 @@ import ( "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -154,12 +156,12 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv } }() - pdCli, err = pd.NewClient(etcdAddrs, pd.SecurityOption{ + pdCli, err = pd.NewClient(caller.Component("tidb-tikv-driver"), etcdAddrs, pd.SecurityOption{ CAPath: d.security.ClusterSSLCA, CertPath: d.security.ClusterSSLCert, KeyPath: d.security.ClusterSSLKey, }, - pd.WithGRPCDialOptions( + opt.WithGRPCDialOptions( // keep the same with etcd, see // https://github.com/etcd-io/etcd/blob/5704c6148d798ea444db26a966394406d8c10526/server/etcdserver/api/v3rpc/grpc.go#L34 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), @@ -168,8 +170,8 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv Timeout: time.Duration(d.tikvConfig.GrpcKeepAliveTimeout) * time.Second, }), ), - pd.WithCustomTimeoutOption(time.Duration(d.pdConfig.PDServerTimeout)*time.Second), - pd.WithForwardingOption(config.GetGlobalConfig().EnableForwarding)) + opt.WithCustomTimeoutOption(time.Duration(d.pdConfig.PDServerTimeout)*time.Second), + opt.WithForwardingOption(config.GetGlobalConfig().EnableForwarding)) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/store/mockstore/unistore/pd.go b/pkg/store/mockstore/unistore/pd.go index d6faa9c4f326c..0e6ab78832861 100644 --- a/pkg/store/mockstore/unistore/pd.go +++ b/pkg/store/mockstore/unistore/pd.go @@ -32,6 +32,11 @@ import ( us "github.com/pingcap/tidb/pkg/store/mockstore/unistore/tikv" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/clients/tso" + "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" + sd "github.com/tikv/pd/client/servicediscovery" "google.golang.org/grpc" ) @@ -124,28 +129,28 @@ func (c *pdClient) GetLocalTS(ctx context.Context, dcLocation string) (int64, in return c.GetTS(ctx) } -func (c *pdClient) GetTSAsync(ctx context.Context) pd.TSFuture { +func (c *pdClient) GetTSAsync(ctx context.Context) tso.TSFuture { return &mockTSFuture{c, ctx, false} } -func (c *pdClient) GetLocalTSAsync(ctx context.Context, dcLocation string) pd.TSFuture { +func (c *pdClient) GetLocalTSAsync(ctx context.Context, dcLocation string) tso.TSFuture { return &mockTSFuture{c, ctx, false} } -func (c *pdClient) GetServiceDiscovery() pd.ServiceDiscovery { +func (c *pdClient) GetServiceDiscovery() sd.ServiceDiscovery { return NewMockPDServiceDiscovery(c.addrs) } var ( - _ pd.ServiceDiscovery = (*mockPDServiceDiscovery)(nil) - _ pd.ServiceClient = (*mockPDServiceClient)(nil) + _ sd.ServiceDiscovery = (*mockPDServiceDiscovery)(nil) + _ sd.ServiceClient = (*mockPDServiceClient)(nil) ) type mockPDServiceClient struct { addr string } -func newMockPDServiceClient(addr string) pd.ServiceClient { +func newMockPDServiceClient(addr string) sd.ServiceClient { if !strings.HasPrefix(addr, "http") { addr = fmt.Sprintf("%s://%s", "http", addr) } @@ -182,13 +187,13 @@ func (c *mockPDServiceClient) IsConnectedToLeader() bool { type mockPDServiceDiscovery struct { addrs []string - clis []pd.ServiceClient + clis []sd.ServiceClient } // NewMockPDServiceDiscovery returns a mock PD ServiceDiscovery -func NewMockPDServiceDiscovery(addrs []string) pd.ServiceDiscovery { +func NewMockPDServiceDiscovery(addrs []string) sd.ServiceDiscovery { addresses := make([]string, 0) - clis := make([]pd.ServiceClient, 0) + clis := make([]sd.ServiceClient, 0) for _, addr := range addrs { if check := govalidator.IsURL(addr); !check { continue @@ -209,6 +214,8 @@ func (c *mockPDServiceDiscovery) GetClusterID() uint64 { return 0 } func (c *mockPDServiceDiscovery) GetKeyspaceID() uint32 { return 0 } +func (c *mockPDServiceDiscovery) SetKeyspaceID(uint32) {} + func (c *mockPDServiceDiscovery) GetKeyspaceGroupID() uint32 { return 0 } func (c *mockPDServiceDiscovery) GetServiceURLs() []string { @@ -223,14 +230,18 @@ func (c *mockPDServiceDiscovery) GetServingURL() string { return "" } func (c *mockPDServiceDiscovery) GetBackupURLs() []string { return nil } -func (c *mockPDServiceDiscovery) GetServiceClient() pd.ServiceClient { +func (c *mockPDServiceDiscovery) GetServiceClient() sd.ServiceClient { if len(c.clis) > 0 { return c.clis[0] } return nil } -func (c *mockPDServiceDiscovery) GetAllServiceClients() []pd.ServiceClient { +func (c *mockPDServiceDiscovery) GetServiceClientByKind(sd.APIKind) sd.ServiceClient { + return c.GetServiceClient() +} + +func (c *mockPDServiceDiscovery) GetAllServiceClients() []sd.ServiceClient { return c.clis } @@ -303,23 +314,23 @@ func (c *pdClient) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { return nil, nil } -func (c *pdClient) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...pd.RegionsOption) (*pdpb.ScatterRegionResponse, error) { +func (c *pdClient) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) { return nil, nil } -func (c *pdClient) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...pd.RegionsOption) (*pdpb.SplitRegionsResponse, error) { +func (c *pdClient) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitRegionsResponse, error) { return nil, nil } -func (c *pdClient) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...pd.RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) { +func (c *pdClient) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) { return nil, nil } -func (c *pdClient) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *pdClient) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*router.Region, error) { return nil, nil } -func (c *pdClient) UpdateOption(option pd.DynamicOption, value any) error { +func (c *pdClient) UpdateOption(option opt.DynamicOption, value any) error { return nil } @@ -383,7 +394,7 @@ func (c *pdClient) GetTSWithinKeyspace(ctx context.Context, keyspaceID uint32) ( return 0, 0, nil } -func (c *pdClient) GetTSWithinKeyspaceAsync(ctx context.Context, keyspaceID uint32) pd.TSFuture { +func (c *pdClient) GetTSWithinKeyspaceAsync(ctx context.Context, keyspaceID uint32) tso.TSFuture { return nil } @@ -391,15 +402,15 @@ func (c *pdClient) GetLocalTSWithinKeyspace(ctx context.Context, dcLocation stri return 0, 0, nil } -func (c *pdClient) GetLocalTSWithinKeyspaceAsync(ctx context.Context, dcLocation string, keyspaceID uint32) pd.TSFuture { +func (c *pdClient) GetLocalTSWithinKeyspaceAsync(ctx context.Context, dcLocation string, keyspaceID uint32) tso.TSFuture { return nil } -func (c *pdClient) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) { +func (c *pdClient) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error) { return nil, nil } -func (c *pdClient) Put(ctx context.Context, key []byte, value []byte, opts ...pd.OpOption) (*meta_storagepb.PutResponse, error) { +func (c *pdClient) Put(ctx context.Context, key []byte, value []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.PutResponse, error) { return nil, nil } @@ -422,3 +433,7 @@ func (c *pdClient) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint func (c *pdClient) WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error) { panic("unimplemented") } + +func (c *pdClient) WithCallerComponent(component caller.Component) pd.Client { + return c +} diff --git a/pkg/store/mockstore/unistore/pd/client.go b/pkg/store/mockstore/unistore/pd/client.go index fafc434058efb..4b5415e1d431f 100644 --- a/pkg/store/mockstore/unistore/pd/client.go +++ b/pkg/store/mockstore/unistore/pd/client.go @@ -26,7 +26,8 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" - pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -41,8 +42,8 @@ type Client interface { IsBootstrapped(ctx context.Context) (bool, error) PutStore(ctx context.Context, store *metapb.Store) error GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) - GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) - GetRegionByID(ctx context.Context, regionID uint64, opts ...pd.GetRegionOption) (*pd.Region, error) + GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) + GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) ReportRegion(*pdpb.RegionHeartbeatRequest) AskSplit(ctx context.Context, region *metapb.Region) (*pdpb.AskSplitResponse, error) AskBatchSplit(ctx context.Context, region *metapb.Region, count int) (*pdpb.AskBatchSplitResponse, error) @@ -467,7 +468,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e return resp.Store, nil } -func (c *client) GetAllStores(ctx context.Context, _ ...pd.GetStoreOption) ([]*metapb.Store, error) { +func (c *client) GetAllStores(ctx context.Context, _ ...opt.GetStoreOption) ([]*metapb.Store, error) { var resp *pdpb.GetAllStoresResponse err := c.doRequest(ctx, func(ctx context.Context, client pdpb.PDClient) error { var err1 error @@ -504,7 +505,7 @@ func (c *client) GetClusterConfig(ctx context.Context) (*metapb.Cluster, error) return resp.Cluster, nil } -func (c *client) GetRegion(ctx context.Context, key []byte, _ ...pd.GetRegionOption) (*pd.Region, error) { +func (c *client) GetRegion(ctx context.Context, key []byte, _ ...opt.GetRegionOption) (*router.Region, error) { var resp *pdpb.GetRegionResponse err := c.doRequest(ctx, func(ctx context.Context, client pdpb.PDClient) error { var err1 error @@ -520,7 +521,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, _ ...pd.GetRegionOpt if herr := resp.Header.GetError(); herr != nil { return nil, errors.New(herr.String()) } - r := &pd.Region{ + r := &router.Region{ Meta: resp.Region, Leader: resp.Leader, PendingPeers: resp.PendingPeers, @@ -531,7 +532,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, _ ...pd.GetRegionOpt return r, nil } -func (c *client) GetRegionByID(ctx context.Context, regionID uint64, _ ...pd.GetRegionOption) (*pd.Region, error) { +func (c *client) GetRegionByID(ctx context.Context, regionID uint64, _ ...opt.GetRegionOption) (*router.Region, error) { var resp *pdpb.GetRegionResponse err := c.doRequest(ctx, func(ctx context.Context, client pdpb.PDClient) error { var err1 error @@ -547,7 +548,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, _ ...pd.Get if herr := resp.Header.GetError(); herr != nil { return nil, errors.New(herr.String()) } - r := &pd.Region{ + r := &router.Region{ Meta: resp.Region, Leader: resp.Leader, PendingPeers: resp.PendingPeers, diff --git a/pkg/store/mockstore/unistore/tikv/mock_region.go b/pkg/store/mockstore/unistore/tikv/mock_region.go index cdbee6f6550ea..6932fae0a32c1 100644 --- a/pkg/store/mockstore/unistore/tikv/mock_region.go +++ b/pkg/store/mockstore/unistore/tikv/mock_region.go @@ -38,7 +38,8 @@ import ( "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" "github.com/tikv/client-go/v2/oracle" - pdclient "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/opt" ) // MPPTaskHandlerMap is a map of *cophandler.MPPTaskHandler. @@ -642,11 +643,11 @@ func (rm *MockRegionManager) saveRegions(regions []*regionCtx) error { // Limit limits the maximum number of regions returned. // If a region has no leader, corresponding leader will be placed by a peer // with empty value (PeerID is 0). -func (rm *MockRegionManager) ScanRegions(startKey, endKey []byte, limit int, _ ...pdclient.GetRegionOption) []*pdclient.Region { +func (rm *MockRegionManager) ScanRegions(startKey, endKey []byte, limit int, _ ...opt.GetRegionOption) []*router.Region { rm.mu.RLock() defer rm.mu.RUnlock() - regions := make([]*pdclient.Region, 0, len(rm.regions)) + regions := make([]*router.Region, 0, len(rm.regions)) rm.sortedRegions.AscendGreaterOrEqual(newBtreeSearchItem(startKey), func(i btree.Item) bool { r := i.(*btreeItem).region if len(endKey) > 0 && bytes.Compare(r.Meta().StartKey, endKey) >= 0 { @@ -657,7 +658,7 @@ func (rm *MockRegionManager) ScanRegions(startKey, endKey []byte, limit int, _ . return true } - regions = append(regions, &pdclient.Region{ + regions = append(regions, &router.Region{ Meta: proto.Clone(r.Meta()).(*metapb.Region), Leader: proto.Clone(r.Meta().Peers[0]).(*metapb.Peer), }) @@ -773,13 +774,13 @@ func (pd *MockPD) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, } // GetRegion implements gRPC PDServer. -func (pd *MockPD) GetRegion(ctx context.Context, key []byte, opts ...pdclient.GetRegionOption) (*pdclient.Region, error) { +func (pd *MockPD) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { r, p, b, d := pd.rm.GetRegionByKey(key) - return &pdclient.Region{Meta: r, Leader: p, Buckets: b, DownPeers: d}, nil + return &router.Region{Meta: r, Leader: p, Buckets: b, DownPeers: d}, nil } // GetRegionByID implements gRPC PDServer. -func (pd *MockPD) GetRegionByID(ctx context.Context, regionID uint64, opts ...pdclient.GetRegionOption) (*pdclient.Region, error) { +func (pd *MockPD) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) { pd.rm.mu.RLock() defer pd.rm.mu.RUnlock() @@ -787,7 +788,7 @@ func (pd *MockPD) GetRegionByID(ctx context.Context, regionID uint64, opts ...pd if r == nil { return nil, nil } - return &pdclient.Region{Meta: proto.Clone(r.meta).(*metapb.Region), Leader: proto.Clone(r.meta.Peers[0]).(*metapb.Peer)}, nil + return &router.Region{Meta: proto.Clone(r.meta).(*metapb.Region), Leader: proto.Clone(r.meta.Peers[0]).(*metapb.Peer)}, nil } // ReportRegion implements gRPC PDServer. @@ -890,15 +891,15 @@ func GetTS() (int64, int64) { } // GetPrevRegion gets the previous region and its leader Peer of the region where the key is located. -func (pd *MockPD) GetPrevRegion(ctx context.Context, key []byte, opts ...pdclient.GetRegionOption) (*pdclient.Region, error) { +func (pd *MockPD) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { r, p := pd.rm.GetRegionByEndKey(key) - return &pdclient.Region{Meta: r, Leader: p}, nil + return &router.Region{Meta: r, Leader: p}, nil } // GetAllStores gets all stores from pd. // The store may expire later. Caller is responsible for caching and taking care // of store change. -func (pd *MockPD) GetAllStores(ctx context.Context, opts ...pdclient.GetStoreOption) ([]*metapb.Store, error) { +func (pd *MockPD) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) { return pd.rm.GetAllStores(), nil } @@ -906,16 +907,16 @@ func (pd *MockPD) GetAllStores(ctx context.Context, opts ...pdclient.GetStoreOpt // Limit limits the maximum number of regions returned. // If a region has no leader, corresponding leader will be placed by a peer // with empty value (PeerID is 0). -func (pd *MockPD) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...pdclient.GetRegionOption) ([]*pdclient.Region, error) { +func (pd *MockPD) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { regions := pd.rm.ScanRegions(startKey, endKey, limit, opts...) return regions, nil } // BatchScanRegions scans regions in batch, return flattened regions. // limit limits the maximum number of regions returned. -func (pd *MockPD) BatchScanRegions(ctx context.Context, keyRanges []pdclient.KeyRange, limit int, opts ...pdclient.GetRegionOption) ([]*pdclient.Region, error) { - regions := make([]*pdclient.Region, 0, len(keyRanges)) - var lastRegion *pdclient.Region +func (pd *MockPD) BatchScanRegions(ctx context.Context, keyRanges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { + regions := make([]*router.Region, 0, len(keyRanges)) + var lastRegion *router.Region for _, keyRange := range keyRanges { if lastRegion != nil && lastRegion.Meta != nil { endKey := lastRegion.Meta.EndKey diff --git a/pkg/ttl/cache/split_test.go b/pkg/ttl/cache/split_test.go index f62a2a19dec27..83bbc55a49594 100644 --- a/pkg/ttl/cache/split_test.go +++ b/pkg/ttl/cache/split_test.go @@ -36,16 +36,18 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/opt" ) -func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *pd.Region { +func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *router.Region { leader := &metapb.Peer{ Id: regionID, StoreId: 1, Role: metapb.PeerRole_Voter, } - return &pd.Region{ + return &router.Region{ Meta: &metapb.Region{ Id: regionID, StartKey: startKey, @@ -59,13 +61,13 @@ func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *pd.Region { type mockPDClient struct { t *testing.T pd.Client - regions []*pd.Region + regions []*router.Region regionsSorted bool } -func (c *mockPDClient) ScanRegions(_ context.Context, key, endKey []byte, limit int, _ ...pd.GetRegionOption) ([]*pd.Region, error) { +func (c *mockPDClient) ScanRegions(_ context.Context, key, endKey []byte, limit int, _ ...opt.GetRegionOption) ([]*router.Region, error) { if len(c.regions) == 0 { - return []*pd.Region{newMockRegion(1, []byte{}, []byte{0xFF, 0xFF})}, nil + return []*router.Region{newMockRegion(1, []byte{}, []byte{0xFF, 0xFF})}, nil } if !c.regionsSorted { @@ -75,11 +77,11 @@ func (c *mockPDClient) ScanRegions(_ context.Context, key, endKey []byte, limit c.regionsSorted = true } - regions := []*pd.Region{newMockRegion(1, []byte{}, c.regions[0].Meta.StartKey)} + regions := []*router.Region{newMockRegion(1, []byte{}, c.regions[0].Meta.StartKey)} regions = append(regions, c.regions...) regions = append(regions, newMockRegion(2, c.regions[len(c.regions)-1].Meta.EndKey, []byte{0xFF, 0xFF, 0xFF})) - result := make([]*pd.Region, 0) + result := make([]*router.Region, 0) for _, r := range regions { if kv.Key(r.Meta.StartKey).Cmp(endKey) >= 0 { continue @@ -165,7 +167,7 @@ func (s *mockTiKVStore) addRegion(key, endKey []byte) *mockTiKVStore { Role: metapb.PeerRole_Voter, } - s.pdClient.regions = append(s.pdClient.regions, &pd.Region{ + s.pdClient.regions = append(s.pdClient.regions, &router.Region{ Meta: &metapb.Region{ Id: regionID, StartKey: key, diff --git a/pkg/util/metricsutil/common.go b/pkg/util/metricsutil/common.go index 0964c7a36ee92..024d5d403372b 100644 --- a/pkg/util/metricsutil/common.go +++ b/pkg/util/metricsutil/common.go @@ -40,8 +40,12 @@ import ( topsqlreporter_metrics "github.com/pingcap/tidb/pkg/util/topsql/reporter/metrics" tikvconfig "github.com/tikv/client-go/v2/config" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" ) +var componentName = caller.Component("tidb-metrics-util") + // RegisterMetrics register metrics with const label 'keyspace_id' if keyspaceName set. func RegisterMetrics() error { cfg := config.GetGlobalConfig() @@ -55,11 +59,11 @@ func RegisterMetrics() error { } timeoutSec := time.Duration(cfg.PDClient.PDServerTimeout) * time.Second - pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{ + pdCli, err := pd.NewClient(componentName, pdAddrs, pd.SecurityOption{ CAPath: cfg.Security.ClusterSSLCA, CertPath: cfg.Security.ClusterSSLCert, KeyPath: cfg.Security.ClusterSSLKey, - }, pd.WithCustomTimeoutOption(timeoutSec)) + }, opt.WithCustomTimeoutOption(timeoutSec)) if err != nil { return err } @@ -80,8 +84,8 @@ func RegisterMetricsForBR(pdAddrs []string, keyspaceName string) error { } timeoutSec := 10 * time.Second - pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{}, - pd.WithCustomTimeoutOption(timeoutSec)) + pdCli, err := pd.NewClient(componentName, pdAddrs, pd.SecurityOption{}, + opt.WithCustomTimeoutOption(timeoutSec)) if err != nil { return err } diff --git a/tests/realtikvtest/brietest/operator_test.go b/tests/realtikvtest/brietest/operator_test.go index 37eae760a96d8..d8746c0038e43 100644 --- a/tests/realtikvtest/brietest/operator_test.go +++ b/tests/realtikvtest/brietest/operator_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/pkg/caller" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -83,7 +84,7 @@ func verifyGCNotStopped(t *require.Assertions, cfg operator.PauseGcConfig) { func verifyLightningStopped(t *require.Assertions, cfg operator.PauseGcConfig) { cx := context.Background() - pdc, err := pd.NewClient(cfg.Config.PD, pd.SecurityOption{}) + pdc, err := pd.NewClient(caller.TestComponent, cfg.Config.PD, pd.SecurityOption{}) t.NoError(err) defer pdc.Close() t.NoError(err)