Skip to content

Commit

Permalink
server: store the updated labels into etcd (#51451) (#51564)
Browse files Browse the repository at this point in the history
close #51427
  • Loading branch information
ti-chi-bot authored Mar 7, 2024
1 parent 1dc6edf commit a8b650f
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 0 deletions.
33 changes: 33 additions & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,39 @@ func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
return is.getAllServerInfo(ctx)
}

// UpdateServerLabel updates the server label for global info syncer.
func UpdateServerLabel(ctx context.Context, labels map[string]string) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return err
}
// when etcdCli is nil, the server infos are generated from the latest config, no need to update.
if is.etcdCli == nil {
return nil
}
selfInfo, err := is.getServerInfoByID(ctx, is.info.ID)
if err != nil {
return err
}
changed := false
for k, v := range labels {
if selfInfo.Labels[k] != v {
changed = true
selfInfo.Labels[k] = v
}
}
if !changed {
return nil
}
infoBuf, err := selfInfo.Marshal()
if err != nil {
return errors.Trace(err)
}
str := string(hack.String(infoBuf))
err = util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.serverInfoPath, str, clientv3.WithLease(is.session.Lease()))
return err
}

// DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress.
func DeleteTiFlashTableSyncProgress(tableInfo *model.TableInfo) error {
is, err := getGlobalInfoSyncer()
Expand Down
1 change: 1 addition & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ go_test(
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@io_etcd_go_etcd_tests_v3//integration",
"@io_opencensus_go//stats/view",
"@org_golang_x_exp//slices",
"@org_uber_go_goleak//:goleak",
Expand Down
5 changes: 5 additions & 0 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,11 @@ func (h labelHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := infosync.UpdateServerLabel(ctx, labels); err != nil {
logutil.BgLogger().Error("update etcd labels failed", zap.Any("labels", cfg.Labels), zap.Error(err))
}
cancel()
cfg.Labels = labels
config.StoreGlobalConfig(&cfg)
logutil.BgLogger().Info("update server labels", zap.Any("labels", cfg.Labels))
Expand Down
61 changes: 61 additions & 0 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand All @@ -59,6 +61,7 @@ import (
"github.com/pingcap/tidb/util/rowcodec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"go.etcd.io/etcd/tests/v3/integration"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1202,6 +1205,64 @@ func TestSetLabels(t *testing.T) {
})
}

func TestSetLabelsWithEtcd(t *testing.T) {
ts := createBasicHTTPHandlerTestSuite()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ts.startServer(t)
defer ts.stopServer(t)

integration.BeforeTestExternal(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
client := cluster.RandClient()
infosync.SetEtcdClient(client)
ts.domain.InfoSyncer().Restart(ctx)

testUpdateLabels := func(labels, expected map[string]string) {
buffer := bytes.NewBuffer([]byte{})
require.Nil(t, json.NewEncoder(buffer).Encode(labels))
resp, err := ts.postStatus("/labels", "application/json", buffer)
require.NoError(t, err)
require.NotNil(t, resp)
defer func() {
require.NoError(t, resp.Body.Close())
}()
require.Equal(t, http.StatusOK, resp.StatusCode)
newLabels := config.GetGlobalConfig().Labels
require.Equal(t, newLabels, expected)
servers, err := infosync.GetAllServerInfo(ctx)
require.NoError(t, err)
for _, server := range servers {
for k, expectV := range expected {
v, ok := server.Labels[k]
require.True(t, ok)
require.Equal(t, expectV, v)
}
return
}
require.Fail(t, "no server found")
}

labels := map[string]string{
"zone": "us-west-1",
"test": "123",
}
testUpdateLabels(labels, labels)

updated := map[string]string{
"zone": "bj-1",
}
labels["zone"] = "bj-1"
testUpdateLabels(updated, labels)

// reset the global variable
config.UpdateGlobal(func(conf *config.Config) {
conf.Labels = map[string]string{}
})
}

func TestSetLabelsConcurrentWithGetLabel(t *testing.T) {
ts := createBasicHTTPHandlerTestSuite()

Expand Down

0 comments on commit a8b650f

Please sign in to comment.