Skip to content

Commit

Permalink
br: fix debug decode backupmeta (#56627) (#57891)
Browse files Browse the repository at this point in the history
close #56296
  • Loading branch information
ti-chi-bot authored Dec 12, 2024
1 parent 6a147d5 commit 4df9aa8
Show file tree
Hide file tree
Showing 8 changed files with 643 additions and 12 deletions.
15 changes: 14 additions & 1 deletion br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,16 @@ func decodeBackupMetaCommand() *cobra.Command {

fieldName, _ := cmd.Flags().GetString("field")
if fieldName == "" {
if err := utils.DecodeMetaFile(ctx, s, &cfg.CipherInfo, backupMeta.FileIndex); err != nil {
return errors.Trace(err)
}
if err := utils.DecodeMetaFile(ctx, s, &cfg.CipherInfo, backupMeta.RawRangeIndex); err != nil {
return errors.Trace(err)
}
if err := utils.DecodeMetaFile(ctx, s, &cfg.CipherInfo, backupMeta.SchemaIndex); err != nil {
return errors.Trace(err)
}

// No field flag, write backupmeta to external storage in JSON format.
backupMetaJSON, err := utils.MarshalBackupMeta(backupMeta)
if err != nil {
Expand All @@ -292,7 +302,7 @@ func decodeBackupMetaCommand() *cobra.Command {
if err != nil {
return errors.Trace(err)
}
cmd.Printf("backupmeta decoded at %s\n", path.Join(cfg.Storage, metautil.MetaJSONFile))
cmd.Printf("backupmeta decoded at %s\n", path.Join(s.URI(), metautil.MetaJSONFile))
return nil
}

Expand Down Expand Up @@ -351,6 +361,9 @@ func encodeBackupMetaCommand() *cobra.Command {
if err != nil {
return errors.Trace(err)
}
if backupMetaJSON.Version == metautil.MetaV2 {
return errors.Errorf("encoding backupmeta v2 is unimplemented")
}
backupMeta, err := proto.Marshal(backupMetaJSON)
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/metautil/metafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// MetaFile represents file name
MetaFile = "backupmeta"
// MetaJSONFile represents backup meta json file name
MetaJSONFile = "backupmeta.json"
MetaJSONFile = "jsons/backupmeta.json"
// MaxBatchSize represents the internal channel buffer size of MetaWriter and MetaReader.
MaxBatchSize = 1024

Expand Down
7 changes: 6 additions & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"backoff.go",
"cdc.go",
"db.go",
"debug.go",
"dyn_pprof_other.go",
"dyn_pprof_unix.go",
"env.go",
Expand All @@ -31,6 +32,7 @@ go_library(
"//br/pkg/errors",
"//br/pkg/logutil",
"//br/pkg/metautil",
"//br/pkg/storage",
"//pkg/errno",
"//pkg/kv",
"//pkg/parser/model",
Expand All @@ -43,6 +45,7 @@ go_library(
"//pkg/util/sqlexec",
"@com_github_cheggaaa_pb_v3//:pb",
"@com_github_docker_go_units//:go-units",
"@com_github_gogo_protobuf//proto",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down Expand Up @@ -76,6 +79,7 @@ go_test(
"backoff_test.go",
"cdc_test.go",
"db_test.go",
"debug_test.go",
"env_test.go",
"json_test.go",
"key_test.go",
Expand All @@ -91,7 +95,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 37,
shard_count = 39,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand All @@ -107,6 +111,7 @@ go_test(
"//pkg/types",
"//pkg/util/chunk",
"//pkg/util/sqlexec",
"@com_github_gogo_protobuf//proto",
"@com_github_golang_protobuf//proto",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
91 changes: 91 additions & 0 deletions br/pkg/utils/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"bytes"
"context"
"crypto/sha256"
"fmt"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/storage"
"golang.org/x/sync/errgroup"
)

const (
// JSONFileFormat represents json file name format
JSONFileFormat = "jsons/%s.json"
)

// DecodeMetaFile decodes the meta file to json format, it is called by br debug
func DecodeMetaFile(
ctx context.Context,
s storage.ExternalStorage,
cipher *backuppb.CipherInfo,
metaIndex *backuppb.MetaFile,
) error {
if metaIndex == nil {
return nil
}
eg, ectx := errgroup.WithContext(ctx)
workers := NewWorkerPool(8, "download files workers")
for _, node := range metaIndex.MetaFiles {
workers.ApplyOnErrorGroup(eg, func() error {
content, err := s.ReadFile(ectx, node.Name)
if err != nil {
return errors.Trace(err)
}

decryptContent, err := metautil.Decrypt(content, cipher, node.CipherIv)
if err != nil {
return errors.Trace(err)
}

checksum := sha256.Sum256(decryptContent)
if !bytes.Equal(node.Sha256, checksum[:]) {
return berrors.ErrInvalidMetaFile.GenWithStackByArgs(fmt.Sprintf(
"checksum mismatch expect %x, got %x", node.Sha256, checksum[:]))
}

child := &backuppb.MetaFile{}
if err = proto.Unmarshal(decryptContent, child); err != nil {
return errors.Trace(err)
}

// the max depth of the root metafile is only 1.
// ASSERT: len(child.MetaFiles) == 0
if len(child.MetaFiles) > 0 {
return errors.Errorf("the metafile has unexpected level: %v", child)
}

jsonContent, err := MarshalMetaFile(child)
if err != nil {
return errors.Trace(err)
}

if err := s.WriteFile(ctx, fmt.Sprintf(JSONFileFormat, node.Name), jsonContent); err != nil {
return errors.Trace(err)
}

return errors.Trace(err)
})
}
return eg.Wait()
}
183 changes: 183 additions & 0 deletions br/pkg/utils/debug_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils_test

import (
"context"
"crypto/sha256"
"fmt"
"math/rand"
"testing"

"github.com/gogo/protobuf/proto"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/stretchr/testify/require"
)

func flushMetaFile(
ctx context.Context,
t *testing.T,
fname string,
metaFile *backuppb.MetaFile,
storage storage.ExternalStorage,
cipher *backuppb.CipherInfo,
) *backuppb.File {
content, err := metaFile.Marshal()
require.NoError(t, err)

encyptedContent, iv, err := metautil.Encrypt(content, cipher)
require.NoError(t, err)

err = storage.WriteFile(ctx, fname, encyptedContent)
require.NoError(t, err)

checksum := sha256.Sum256(content)
file := &backuppb.File{
Name: fname,
Sha256: checksum[:],
Size_: uint64(len(content)),
CipherIv: iv,
}

return file
}

func flushStatsFile(
ctx context.Context,
t *testing.T,
fname string,
statsFile *backuppb.StatsFile,
storage storage.ExternalStorage,
cipher *backuppb.CipherInfo,
) *backuppb.StatsFileIndex {
content, err := proto.Marshal(statsFile)
require.NoError(t, err)

checksum := sha256.Sum256(content)
sizeOri := uint64(len(content))
encryptedContent, iv, err := metautil.Encrypt(content, cipher)
require.NoError(t, err)

err = storage.WriteFile(ctx, fname, encryptedContent)
require.NoError(t, err)

return &backuppb.StatsFileIndex{
Name: fname,
Sha256: checksum[:],
SizeEnc: uint64(len(encryptedContent)),
SizeOri: sizeOri,
CipherIv: iv,
InlineData: []byte(fmt.Sprintf("%d", rand.Int())),
}
}

func TestDecodeMetaFile(t *testing.T) {
ctx := context.Background()
base := t.TempDir()
s, err := storage.NewLocalStorage(base)
require.NoError(t, err)
cipher := &backuppb.CipherInfo{CipherType: 1}
file1 := flushMetaFile(ctx, t, "data", &backuppb.MetaFile{
DataFiles: []*backuppb.File{
{
Name: "1.sst",
Sha256: []byte("1.sst"),
StartKey: []byte("start"),
EndKey: []byte("end"),
EndVersion: 1,
Crc64Xor: 1,
TotalKvs: 2,
TotalBytes: 3,
Cf: "write",
CipherIv: []byte("1.sst"),
},
},
}, s, cipher)
stats := flushStatsFile(ctx, t, "stats", &backuppb.StatsFile{Blocks: []*backuppb.StatsBlock{
{
PhysicalId: 1,
JsonTable: []byte("1"),
},
{
PhysicalId: 2,
JsonTable: []byte("2"),
},
}}, s, cipher)
metaFile2 := &backuppb.MetaFile{
Schemas: []*backuppb.Schema{
{
Db: []byte(`{"db_name":{"L":"test","O":"test"},"id":1,"state":5}`),
Table: []byte(`{"id":2,"state":5}`),
Crc64Xor: 1,
TotalKvs: 2,
TotalBytes: 3,
TiflashReplicas: 4,
Stats: []byte(`{"a":1}`),
StatsIndex: []*backuppb.StatsFileIndex{stats},
},
},
}
file2 := flushMetaFile(ctx, t, "schema", metaFile2, s, cipher)

{
err = utils.DecodeMetaFile(ctx, s, cipher, &backuppb.MetaFile{MetaFiles: []*backuppb.File{file1}})
require.NoError(t, err)
content, err := s.ReadFile(ctx, "jsons/data.json")
require.NoError(t, err)
metaFile, err := utils.UnmarshalMetaFile(content)
require.NoError(t, err)
require.Equal(t, 1, len(metaFile.DataFiles))
require.Equal(t, "1.sst", metaFile.DataFiles[0].Name)
require.Equal(t, []byte("1.sst"), metaFile.DataFiles[0].Sha256)
require.Equal(t, []byte("start"), metaFile.DataFiles[0].StartKey)
require.Equal(t, []byte("end"), metaFile.DataFiles[0].EndKey)
require.Equal(t, uint64(1), metaFile.DataFiles[0].EndVersion)
require.Equal(t, uint64(1), metaFile.DataFiles[0].Crc64Xor)
require.Equal(t, uint64(2), metaFile.DataFiles[0].TotalKvs)
require.Equal(t, uint64(3), metaFile.DataFiles[0].TotalBytes)
require.Equal(t, "write", metaFile.DataFiles[0].Cf)
require.Equal(t, []byte("1.sst"), metaFile.DataFiles[0].CipherIv)
}

{
err = utils.DecodeMetaFile(ctx, s, cipher, &backuppb.MetaFile{MetaFiles: []*backuppb.File{file2}})
require.NoError(t, err)
{
content, err := s.ReadFile(ctx, "jsons/schema.json")
require.NoError(t, err)
metaFile, err := utils.UnmarshalMetaFile(content)
require.NoError(t, err)
require.Equal(t, 1, len(metaFile.Schemas))
require.Equal(t, metaFile2.Schemas[0].Db, metaFile.Schemas[0].Db)
require.Equal(t, metaFile2.Schemas[0].Table, metaFile.Schemas[0].Table)
require.Equal(t, uint64(1), metaFile.Schemas[0].Crc64Xor)
require.Equal(t, uint64(2), metaFile.Schemas[0].TotalKvs)
require.Equal(t, uint64(3), metaFile.Schemas[0].TotalBytes)
require.Equal(t, uint32(4), metaFile.Schemas[0].TiflashReplicas)
require.Equal(t, metaFile2.Schemas[0].Stats, metaFile.Schemas[0].Stats)
statsIndex := metaFile.Schemas[0].StatsIndex
require.Equal(t, 1, len(statsIndex))
require.Equal(t, stats.Name, statsIndex[0].Name)
require.Equal(t, stats.Sha256, statsIndex[0].Sha256)
require.Equal(t, stats.SizeEnc, statsIndex[0].SizeEnc)
require.Equal(t, stats.SizeOri, statsIndex[0].SizeOri)
require.Equal(t, stats.CipherIv, statsIndex[0].CipherIv)
require.Equal(t, stats.InlineData, statsIndex[0].InlineData)
}
}
}
Loading

0 comments on commit 4df9aa8

Please sign in to comment.