Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Jan 8, 2025
1 parent 257c287 commit 9bf682b
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 35 deletions.
6 changes: 2 additions & 4 deletions br/pkg/restore/log_client/batch_meta_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,8 @@ func (mp *MetaKVInfoProcessor) ProcessBatch(
} else if !meta.IsDBkey(rawKey.Key) {
// also see RewriteMetaKvEntry
continue
}

// collect table history indexed by table id, same id may have different table names in history
if meta.IsTableKey(rawKey.Field) {
} else if meta.IsTableKey(rawKey.Field) {
// collect table history indexed by table id, same id may have different table names in history
var tableInfo model.TableInfo
if err := json.Unmarshal(value, &tableInfo); err != nil {
return nil, errors.Trace(err)
Expand Down
4 changes: 1 addition & 3 deletions br/pkg/stream/table_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, t
}

// update table ID and partition ID.
tableInfo.ID = tableReplace.TableID
partitions := tableInfo.GetPartitionInfo()
if partitions != nil {
for i, partition := range partitions.Definitions {
for _, partition := range partitions.Definitions {
newID, exist := tableReplace.PartitionMap[partition.ID]
if !exist {
newID, exist = tm.globalIdMap[partition.ID]
Expand All @@ -131,7 +130,6 @@ func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, t
}
tableReplace.PartitionMap[partition.ID] = newID
}
partitions.Definitions[i].ID = newID
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ go_test(
],
embed = [":task"],
flaky = True,
shard_count = 39,
shard_count = 40,
deps = [
"//br/pkg/backup",
"//br/pkg/config",
Expand Down
20 changes: 12 additions & 8 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,8 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s
if cfg.logTableHistoryManager != nil {
// adjust tables to restore in the snapshot restore phase since it will later be renamed during
// log restore and will fall into or out of the filter range.
err := adjustTablesToRestoreAndCreateTableTracker(cfg.logTableHistoryManager, cfg.RestoreConfig, client, fileMap, tableMap)
err := AdjustTablesToRestoreAndCreateTableTracker(cfg.logTableHistoryManager, cfg.RestoreConfig,
client.GetDatabaseMap(), fileMap, tableMap)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1452,15 +1453,13 @@ func filterRestoreFiles(
return
}

func adjustTablesToRestoreAndCreateTableTracker(
func AdjustTablesToRestoreAndCreateTableTracker(
logBackupTableHistory *stream.LogBackupTableHistoryManager,
cfg *RestoreConfig,
client *snapclient.SnapClient,
snapshotDBMap map[int64]*metautil.Database,
fileMap map[string]*backuppb.File,
tableMap map[int64]*metautil.Table,
) (err error) {
snapshotDBMap := client.GetDatabaseMap()

// build tracker for pitr restore to use later
piTRTableTracker := utils.NewPiTRTableTracker()

Expand All @@ -1476,6 +1475,11 @@ func adjustTablesToRestoreAndCreateTableTracker(
tableHistory := logBackupTableHistory.GetTableHistory()

for tableID, dbIDAndTableName := range tableHistory {
if _, exists := tableMap[tableID]; exists {
// going to restore anyway, skip
continue
}

start := dbIDAndTableName[0]
end := dbIDAndTableName[1]

Expand Down Expand Up @@ -1504,15 +1508,15 @@ func adjustTablesToRestoreAndCreateTableTracker(
// we need to restore original table
if utils.MatchTable(cfg.TableFilter, dbName, end.TableName) {
// put this db/table id into pitr tracker as it matches with user's filter
// have to update filter here since table might be empty or not in snapshot so nothing will be returned .
// have to update tracker here since table might be empty or not in snapshot so nothing will be returned .
// but we still need to capture this table id to restore during log restore.
piTRTableTracker.AddTable(end.DbID, tableID)

// check if snapshot contains the original db/table
originalDB, exists := snapshotDBMap[start.DbID]
if !exists {
// original db created during log backup, snapshot doesn't have information about this db so doesn't
// need to restore at snapshot
// original db created during log backup, or full backup has a filter that filters out this db,
// either way snapshot doesn't have information about this db so doesn't need to restore at snapshot
continue
}

Expand Down
141 changes: 140 additions & 1 deletion br/pkg/task/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ import (
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/task"
utiltest "github.com/pingcap/tidb/br/pkg/utiltest"
"github.com/pingcap/tidb/br/pkg/utiltest"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -523,3 +525,140 @@ func TestCheckTikvSpace(t *testing.T) {
store := pdhttp.StoreInfo{Store: pdhttp.MetaStore{ID: 1}, Status: pdhttp.StoreStatus{Available: "500PB"}}
require.NoError(t, task.CheckStoreSpace(400*pb, &store))
}

func TestAdjustTablesToRestoreAndCreateTableTracker(t *testing.T) {
// test setup
// create test database and table maps
dbInfo1 := model.DBInfo{
ID: 1,
Name: ast.NewCIStr("test_db_1"),
}
dbInfo2 := model.DBInfo{
ID: 2,
Name: ast.NewCIStr("test_db_2"),
}
snapshotDBMap := map[int64]*metautil.Database{
1: {
Info: &dbInfo1,
},
2: {
Info: &dbInfo2,
},
}
fileMap := map[string]*backuppb.File{
"test_file": {
Name: "test_file",
},
}
tableMap := map[int64]*metautil.Table{
11: {
DB: &dbInfo1,
Info: &model.TableInfo{
ID: 11,
Name: ast.NewCIStr("test_table_11"),
},
},
12: {
DB: &dbInfo1,
Info: &model.TableInfo{
ID: 12,
Name: ast.NewCIStr("test_table_12"),
},
},
21: {
DB: &dbInfo2,
Info: &model.TableInfo{
ID: 21,
Name: ast.NewCIStr("test_table_21"),
},
},
}

// Test case 1: Basic table tracking
logBackupTableHistory := stream.NewTableHistoryManager()
logBackupTableHistory.AddTableHistory(11, "test_table_11", 1)
logBackupTableHistory.AddTableHistory(12, "test_table_12", 1)
logBackupTableHistory.AddTableHistory(21, "test_table_21", 2)
testFilter, err := filter.Parse([]string{"test_db*.*"})
require.NoError(t, err)
cfg := &task.RestoreConfig{
Config: task.Config{
TableFilter: testFilter,
},
}
err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap)
require.NoError(t, err)
require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 11))
require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 12))
require.True(t, cfg.PiTRTableTracker.ContainsTable(2, 21))

// Test case 2: Table not in filter
tableFilter, err := filter.Parse([]string{"other_db.other_table"})
cfg.TableFilter = tableFilter
logBackupTableHistory = stream.NewTableHistoryManager()
logBackupTableHistory.AddTableHistory(11, "test_table_11", 1)
logBackupTableHistory.AddTableHistory(12, "test_table_12", 1)
logBackupTableHistory.AddTableHistory(21, "test_table_21", 2)
err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap)
require.NoError(t, err)
require.False(t, cfg.PiTRTableTracker.ContainsTable(1, 11))
require.False(t, cfg.PiTRTableTracker.ContainsTable(1, 12))
require.False(t, cfg.PiTRTableTracker.ContainsTable(2, 21))

// Test case 3: New table created during log backup
logBackupTableHistory = stream.NewTableHistoryManager()
testFilter, err = filter.Parse([]string{"test_db*.*"})
cfg.TableFilter = testFilter
logBackupTableHistory.AddTableHistory(11, "test_table_11", 1)
logBackupTableHistory.AddTableHistory(12, "test_table_12", 1)
logBackupTableHistory.AddTableHistory(21, "test_table_21", 2)
logBackupTableHistory.AddTableHistory(13, "new_table", 1)
err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap)
require.NoError(t, err)
require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 11))
require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 12))
require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 13))
require.True(t, cfg.PiTRTableTracker.ContainsTable(2, 21))

// Test case 4: Table renamed into filter during log backup
logBackupTableHistory = stream.NewTableHistoryManager()
logBackupTableHistory.AddTableHistory(11, "test_table_11", 1) // drop
logBackupTableHistory.AddTableHistory(11, "renamed_table", 2) // create
logBackupTableHistory.AddTableHistory(12, "test_table_12", 1)
logBackupTableHistory.AddTableHistory(21, "test_table_21", 2)
tableFilter, err = filter.Parse([]string{"test_db_2.*"})
cfg.TableFilter = tableFilter
err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap)
require.NoError(t, err)
require.False(t, cfg.PiTRTableTracker.ContainsTable(1, 11))
require.False(t, cfg.PiTRTableTracker.ContainsTable(1, 12))
require.True(t, cfg.PiTRTableTracker.ContainsTable(2, 11))
require.True(t, cfg.PiTRTableTracker.ContainsTable(2, 21))

// Test case 5: Table renamed out of filter during log backup
tableFilter, err = filter.Parse([]string{"test_db_1.*"})
cfg.TableFilter = tableFilter
err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap)
require.NoError(t, err)
require.False(t, cfg.PiTRTableTracker.ContainsTable(1, 11))
require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 12))
require.False(t, cfg.PiTRTableTracker.ContainsTable(2, 11))
require.False(t, cfg.PiTRTableTracker.ContainsTable(2, 21))

// Test case 6: Log backup table not in snapshot (due to full backup with a different filter)
snapshotDBMap = map[int64]*metautil.Database{}
tableMap = map[int64]*metautil.Table{}
logBackupTableHistory.AddTableHistory(11, "test_table", 1)
err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap)
require.NoError(t, err)
require.False(t, cfg.PiTRTableTracker.ContainsTable(1, 11))

// Test case 7: DB created during log backup
snapshotDBMap = map[int64]*metautil.Database{}
tableMap = map[int64]*metautil.Table{}
logBackupTableHistory.RecordDBIdToName(1, "test_db_1")
logBackupTableHistory.AddTableHistory(11, "test_table", 1)
err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap)
require.NoError(t, err)
require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 11))
}
Loading

0 comments on commit 9bf682b

Please sign in to comment.