Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Jan 3, 2025
1 parent d296f2f commit 45bcdca
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 172 deletions.
2 changes: 1 addition & 1 deletion br/cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (
tidbGlue = gluetidb.New()
envLogToTermKey = "BR_LOG_TO_TERM"

filterOutSysAndMemKeepPrivilege = []string{
filterOutSysAndMemKeepAuthAndBind = []string{
"*.*",
fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")),
"!mysql.*",
Expand Down
4 changes: 2 additions & 2 deletions br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func newFullRestoreCommand() *cobra.Command {
return runRestoreCommand(cmd, task.FullRestoreCmd)
},
}
task.DefineFilterFlags(command, filterOutSysAndMemKeepPrivilege, false)
task.DefineFilterFlags(command, filterOutSysAndMemKeepAuthAndBind, false)
task.DefineRestoreSnapshotFlags(command)
return command
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func newStreamRestoreCommand() *cobra.Command {
return runRestoreCommand(command, task.PointRestoreCmd)
},
}
task.DefineFilterFlags(command, filterOutSysAndMemKeepPrivilege, true)
task.DefineFilterFlags(command, filterOutSysAndMemKeepAuthAndBind, true)
task.DefineStreamRestoreFlags(command)
return command
}
4 changes: 2 additions & 2 deletions br/pkg/restore/log_client/batch_meta_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (mp *MetaKVInfoProcessor) ProcessBatch(
mp.tableHistoryManager.RecordDBIdToName(dbInfo.ID, dbInfo.Name.O)

// update the id map
if err = mp.tableMappingManager.ProcessDBValueAndUpdateIdMapping(dbInfo); err != nil {
if err = mp.tableMappingManager.ProcessDBValueAndUpdateIdMapping(&dbInfo); err != nil {
return nil, errors.Trace(err)
}
} else if !meta.IsDBkey(rawKey.Key) {
Expand All @@ -210,7 +210,7 @@ func (mp *MetaKVInfoProcessor) ProcessBatch(
mp.tableHistoryManager.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID)

// update the id map
if err = mp.tableMappingManager.ProcessTableValueAndUpdateIdMapping(dbID, tableInfo); err != nil {
if err = mp.tableMappingManager.ProcessTableValueAndUpdateIdMapping(dbID, &tableInfo); err != nil {
return nil, errors.Trace(err)
}
}
Expand Down
12 changes: 3 additions & 9 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,15 +1025,9 @@ func (rc *LogClient) GetBaseIDMap(
if err != nil {
return nil, errors.Trace(err)
}
existTiFlashTable := false
rc.dom.InfoSchema().ListTablesWithSpecialAttribute(func(tableInfo *model.TableInfo) bool {
if tableInfo.TiFlashReplica != nil && tableInfo.TiFlashReplica.Count > 0 {
existTiFlashTable = true
}
return false
})
if existTiFlashTable {
return nil, errors.Errorf("exist table(s) have tiflash replica, please remove it before restore")
err := rc.validateNoTiFlashReplica()
if err != nil {
return nil, errors.Trace(err)
}
}

Expand Down
1 change: 0 additions & 1 deletion br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"go.uber.org/zap"
)

type RewriteStatus int
type UpstreamID = int64
type DownstreamID = int64

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) {
value, err := json.Marshal(&t1Copy)
require.Nil(t, err)

err = tc.ProcessTableValueAndUpdateIdMapping(dbID1, *t1Copy)
err = tc.ProcessTableValueAndUpdateIdMapping(dbID1, t1Copy)
require.Nil(t, err)

sr := NewSchemasReplace(
Expand All @@ -434,7 +434,7 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) {
// rewrite no partition table
value, err = json.Marshal(&t2Copy)
require.Nil(t, err)
err = tc.ProcessTableValueAndUpdateIdMapping(dbID2, *t2Copy)
err = tc.ProcessTableValueAndUpdateIdMapping(dbID2, t2Copy)
require.Nil(t, err)
value, err = sr.rewriteTableInfo(value, dbID2)
require.Nil(t, err)
Expand Down
95 changes: 29 additions & 66 deletions br/pkg/stream/table_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ const InitialTempId int64 = 0
// the dummy ids, it builds the final state of the db replace map
type TableMappingManager struct {
DBReplaceMap map[UpstreamID]*DBReplace
globalIdMap map[UpstreamID]DownstreamID

// used during scanning log to identify already seen id mapping. For example after exchange partition, the
// exchanged-in table already had an id mapping can be identified in the partition so don't allocate a new id.
globalIdMap map[UpstreamID]DownstreamID

// a counter for temporary IDs, need to get real global id
// once full restore completes
Expand All @@ -67,22 +70,13 @@ func (tm *TableMappingManager) FromDBReplaceMap(dbReplaceMap map[UpstreamID]*DBR
if dbReplaceMap == nil {
dbReplaceMap = make(map[UpstreamID]*DBReplace)
}
globalTableIdMap := make(map[UpstreamID]DownstreamID)
for _, dr := range dbReplaceMap {
for tblID, tr := range dr.TableMap {
globalTableIdMap[tblID] = tr.TableID
for oldpID, newpID := range tr.PartitionMap {
globalTableIdMap[oldpID] = newpID
}
}
}
tm.globalIdMap = globalTableIdMap
tm.DBReplaceMap = dbReplaceMap

// doesn't even need to build globalIdMap since loading DBReplaceMap from saved checkpoint
tm.DBReplaceMap = dbReplaceMap
return nil
}

func (tm *TableMappingManager) ProcessDBValueAndUpdateIdMapping(dbInfo model.DBInfo) error {
func (tm *TableMappingManager) ProcessDBValueAndUpdateIdMapping(dbInfo *model.DBInfo) error {
if dr, exist := tm.DBReplaceMap[dbInfo.ID]; !exist {
newID := tm.generateTempID()
tm.DBReplaceMap[dbInfo.ID] = NewDBReplace(dbInfo.Name.O, newID)
Expand All @@ -93,7 +87,7 @@ func (tm *TableMappingManager) ProcessDBValueAndUpdateIdMapping(dbInfo model.DBI
return nil
}

func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, tableInfo model.TableInfo) error {
func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, tableInfo *model.TableInfo) error {
var (
exist bool
dbReplace *DBReplace
Expand Down Expand Up @@ -144,18 +138,6 @@ func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, t
}

func (tm *TableMappingManager) MergeBaseDBReplace(baseMap map[UpstreamID]*DBReplace) {
// update globalIdMap
for upstreamID, dbReplace := range baseMap {
tm.globalIdMap[upstreamID] = dbReplace.DbID

for tableUpID, tableReplace := range dbReplace.TableMap {
tm.globalIdMap[tableUpID] = tableReplace.TableID
for partUpID, partDownID := range tableReplace.PartitionMap {
tm.globalIdMap[partUpID] = partDownID
}
}
}

// merge baseMap to DBReplaceMap
for upstreamID, baseDBReplace := range baseMap {
if existingDBReplace, exists := tm.DBReplaceMap[upstreamID]; exists {
Expand All @@ -179,7 +161,7 @@ func (tm *TableMappingManager) MergeBaseDBReplace(baseMap map[UpstreamID]*DBRepl
}

func (tm *TableMappingManager) IsEmpty() bool {
return len(tm.DBReplaceMap) == 0 && len(tm.globalIdMap) == 0
return len(tm.DBReplaceMap) == 0
}

func (tm *TableMappingManager) ReplaceTemporaryIDs(
Expand All @@ -192,38 +174,43 @@ func (tm *TableMappingManager) ReplaceTemporaryIDs(
// find actually used temporary IDs
usedTempIDs := make(map[DownstreamID]struct{})

// Helper function to check and add temporary ID
addTempIDIfNeeded := func(id DownstreamID) error {
if id < 0 {
if _, exists := usedTempIDs[id]; exists {
return errors.Annotate(berrors.ErrRestoreInvalidRewrite,
fmt.Sprintf("found duplicate temporary ID: %d", id))
}
usedTempIDs[id] = struct{}{}
}
return nil
}

// check DBReplaceMap for used temporary IDs
// any value less than 0 is temporary ID
for _, dr := range tm.DBReplaceMap {
if dr.DbID < 0 {
usedTempIDs[dr.DbID] = struct{}{}
if err := addTempIDIfNeeded(dr.DbID); err != nil {
return err
}
for _, tr := range dr.TableMap {
if tr.TableID < 0 {
usedTempIDs[tr.TableID] = struct{}{}
if err := addTempIDIfNeeded(tr.TableID); err != nil {
return err
}
for _, partID := range tr.PartitionMap {
if partID < 0 {
usedTempIDs[partID] = struct{}{}
if err := addTempIDIfNeeded(partID); err != nil {
return err
}
}
}
}

// check in globalIdMap as well just be safe
for _, downID := range tm.globalIdMap {
if downID < 0 {
usedTempIDs[downID] = struct{}{}
}
}

tempIDs := make([]DownstreamID, 0, len(usedTempIDs))
// convert to sorted slice
for id := range usedTempIDs {
tempIDs = append(tempIDs, id)
}

// sort to -1, -2, -4 ... etc
// sort to -1, -2, -4, -8 ... etc
sort.Slice(tempIDs, func(i, j int) bool {
return tempIDs[i] > tempIDs[j]
})
Expand All @@ -246,13 +233,6 @@ func (tm *TableMappingManager) ReplaceTemporaryIDs(
idMapping[tempID] = newIDs[i]
}

// replace temp id in globalIdMap
for upID, downID := range tm.globalIdMap {
if newID, exists := idMapping[downID]; exists {
tm.globalIdMap[upID] = newID
}
}

// replace temp id in DBReplaceMap
for _, dr := range tm.DBReplaceMap {
if newID, exists := idMapping[dr.DbID]; exists {
Expand All @@ -277,9 +257,6 @@ func (tm *TableMappingManager) ReplaceTemporaryIDs(
}

func (tm *TableMappingManager) FilterDBReplaceMap(filter *utils.PiTRTableTracker) {
// collect all IDs that should be kept
keepIDs := make(map[UpstreamID]struct{})

// iterate through existing DBReplaceMap
for dbID, dbReplace := range tm.DBReplaceMap {
// remove entire database if not in filter
Expand All @@ -288,27 +265,13 @@ func (tm *TableMappingManager) FilterDBReplaceMap(filter *utils.PiTRTableTracker
continue
}

keepIDs[dbID] = struct{}{}

// filter tables in this database
for tableID, tableReplace := range dbReplace.TableMap {
for tableID := range dbReplace.TableMap {
if !filter.ContainsTable(dbID, tableID) {
delete(dbReplace.TableMap, tableID)
} else {
keepIDs[tableID] = struct{}{}
for partitionID := range tableReplace.PartitionMap {
keepIDs[partitionID] = struct{}{}
}
}
}
}

// remove any ID from globalIdMap that isn't in keepIDs
for id := range tm.globalIdMap {
if _, ok := keepIDs[id]; !ok {
delete(tm.globalIdMap, id)
}
}
}

// ToProto produces schemas id maps from up-stream to down-stream.
Expand Down
79 changes: 0 additions & 79 deletions br/pkg/stream/table_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,61 +557,10 @@ func TestFilterDBReplaceMap(t *testing.T) {
tm := NewTableMappingManager()
tm.DBReplaceMap = tt.initial

// create a copy of globalIdMap before filtering
globalIdMap := make(map[UpstreamID]DownstreamID)
for dbID, dbReplace := range tt.initial {
globalIdMap[dbID] = dbReplace.DbID
for tblID, tblReplace := range dbReplace.TableMap {
globalIdMap[tblID] = tblReplace.TableID
for partID, partDownID := range tblReplace.PartitionMap {
globalIdMap[partID] = partDownID
}
}
}
tm.globalIdMap = globalIdMap

tm.FilterDBReplaceMap(tt.filter)

// verify DBReplaceMap is as expected
require.Equal(t, tt.expected, tm.DBReplaceMap)

// verify globalIdMap is properly filtered as well
for dbID, dbReplace := range tt.expected {
require.Equal(t, dbReplace.DbID, tm.globalIdMap[dbID])
for tblID, tblReplace := range dbReplace.TableMap {
require.Equal(t, tblReplace.TableID, tm.globalIdMap[tblID])
for partID, partDownID := range tblReplace.PartitionMap {
require.Equal(t, partDownID, tm.globalIdMap[partID])
}
}
}

// verify that filtered IDs are removed from globalIdMap
for upID := range globalIdMap {
found := false
for dbID, dbReplace := range tt.expected {
if upID == dbID {
found = true
break
}
for tblID, tblReplace := range dbReplace.TableMap {
if upID == tblID {
found = true
break
}
for partID := range tblReplace.PartitionMap {
if upID == partID {
found = true
break
}
}
}
}
if !found {
_, exists := tm.globalIdMap[upID]
require.False(t, exists, "ID %d should have been removed from globalIdMap", upID)
}
}
})
}
}
Expand Down Expand Up @@ -913,18 +862,6 @@ func TestReplaceTemporaryIDs(t *testing.T) {
tm.DBReplaceMap = tt.initial
tm.tempIDCounter = tt.tempCounter

globalIdMap := make(map[UpstreamID]DownstreamID)
for dbID, dbReplace := range tt.initial {
globalIdMap[dbID] = dbReplace.DbID
for tblID, tblReplace := range dbReplace.TableMap {
globalIdMap[tblID] = tblReplace.TableID
for partID, partDownID := range tblReplace.PartitionMap {
globalIdMap[partID] = partDownID
}
}
}
tm.globalIdMap = globalIdMap

err := tm.ReplaceTemporaryIDs(context.Background(), tt.genGlobalIDs)

if tt.expectedErr != nil {
Expand All @@ -936,22 +873,6 @@ func TestReplaceTemporaryIDs(t *testing.T) {
require.NoError(t, err)
require.Equal(t, tt.expected, tm.DBReplaceMap)
require.Equal(t, InitialTempId, tm.tempIDCounter)

// verify globalIdMap is properly updated as well
for dbID, dbReplace := range tt.expected {
require.Equal(t, dbReplace.DbID, tm.globalIdMap[dbID])
for tblID, tblReplace := range dbReplace.TableMap {
require.Equal(t, tblReplace.TableID, tm.globalIdMap[tblID])
for partID, partDownID := range tblReplace.PartitionMap {
require.Equal(t, partDownID, tm.globalIdMap[partID])
}
}
}

// verify no temporary IDs remain
for _, id := range tm.globalIdMap {
require.False(t, id < 0, "temporary ID %d still exists in globalIdMap", id)
}
})
}
}
6 changes: 1 addition & 5 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2022,11 +2022,7 @@ func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) err
}

func isCurrentIdMapSaved(checkpointTaskInfo *checkpoint.CheckpointTaskInfoForLogRestore) bool {
newTask := false
if checkpointTaskInfo != nil && checkpointTaskInfo.Progress == checkpoint.InLogRestoreAndIdMapPersisted {
newTask = true
}
return newTask
return checkpointTaskInfo != nil && checkpointTaskInfo.Progress == checkpoint.InLogRestoreAndIdMapPersisted
}

func buildSchemaReplace(
Expand Down
Loading

0 comments on commit 45bcdca

Please sign in to comment.