Skip to content

Commit

Permalink
[feat](binlog) Add Support recover binlog
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp committed Dec 9, 2024
1 parent 50933d9 commit 974c395
Show file tree
Hide file tree
Showing 16 changed files with 1,122 additions and 8 deletions.
50 changes: 50 additions & 0 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2567,6 +2567,48 @@ func (j *Job) handleDropRollupRecord(commitSeq int64, dropRollup *record.DropRol
return j.IDest.DropRollup(destTableName, dropRollup.IndexName)
}

func (j *Job) handleRecoverInfo(binlog *festruct.TBinlog) error {
log.Infof("handle recoverInfo binlog, prevCommitSeq: %d, commitSeq: %d",
j.progress.PrevCommitSeq, j.progress.CommitSeq)

data := binlog.GetData()
recoverInfo, err := record.NewRecoverInfoFromJson(data)
if err != nil {
return err
}

return j.handleRecoverInfoRecord(binlog.GetCommitSeq(), recoverInfo)
}

func (j *Job) handleRecoverInfoRecord(commitSeq int64, recoverInfo *record.RecoverInfo) error {
if j.isBinlogCommitted(recoverInfo.TableId, commitSeq) {
return nil
}

if recoverInfo.PartitionName == "" {
var tableName string
if recoverInfo.NewTableName != "" {
tableName = recoverInfo.NewTableName
} else {
tableName = recoverInfo.TableName
}
log.Infof("recover info with for table %s, will trigger partial sync", tableName)
return j.newPartialSnapshot(recoverInfo.TableId, tableName, nil, true)
}

var partitions []string
if recoverInfo.NewPartitionName != "" {
partitions = append(partitions, recoverInfo.NewPartitionName)
} else {
partitions = append(partitions, recoverInfo.PartitionName)
}
log.Infof("recover info with for partition(%s) for table %s, will trigger partial sync",
partitions, recoverInfo.TableName)
// if source does multiple recover of partition, then there is a race
// condition and some recover might miss due to commitseq change after snapshot.
return j.newPartialSnapshot(recoverInfo.TableId, recoverInfo.TableName, nil, true)
}

func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
data := binlog.GetData()
barrierLog, err := record.NewBarrierLogFromJson(data)
Expand Down Expand Up @@ -2645,6 +2687,12 @@ func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
return err
}
return j.handleModifyCommentRecord(commitSeq, modifyComment)
case festruct.TBinlogType_RECOVER_INFO:
recoverInfo, err := record.NewRecoverInfoFromJson(barrierLog.Binlog)
if err != nil {
return err
}
return j.handleRecoverInfoRecord(commitSeq, recoverInfo)
case festruct.TBinlogType_BARRIER:
log.Info("handle barrier binlog, ignore it")
default:
Expand Down Expand Up @@ -2757,6 +2805,8 @@ func (j *Job) handleBinlog(binlog *festruct.TBinlog) error {
return j.handleRenameRollup(binlog)
case festruct.TBinlogType_DROP_ROLLUP:
return j.handleDropRollup(binlog)
case festruct.TBinlogType_RECOVER_INFO:
return j.handleRecoverInfo(binlog)
default:
return xerror.Errorf(xerror.Normal, "unknown binlog type: %v", binlog.GetType())
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/ccr/record/recover_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package record

import (
"encoding/json"
"fmt"

"github.com/selectdb/ccr_syncer/pkg/xerror"
)

type RecoverInfo struct {
DbId int64 `json:"dbId"`
NewDbName string `json:"newDbName"`
TableId int64 `json:"tableId"`
TableName string `json:"tableName"`
NewTableName string `json:"newTableName"`
PartitionId int64 `json:"partitionId"`
PartitionName string `json:"partitionName"`
NewPartitionName string `json:"newPartitionName"`
}

func NewRecoverInfoFromJson(data string) (*RecoverInfo, error) {
var recoverInfo RecoverInfo
err := json.Unmarshal([]byte(data), &recoverInfo)
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, "unmarshal create table error")
}

if recoverInfo.TableId == 0 {
return nil, xerror.Errorf(xerror.Normal, "table id not found")
}

/* need check for db level not supported.
*/

return &recoverInfo, nil
}

// String
func (c *RecoverInfo) String() string {
return fmt.Sprintf("RecoverInfo: DbId: %d, NewDbName: %s, TableId: %d, TableName: %s, NewTableName: %s, PartitionId: %d, PartitionName: %s, NewPartitionName: %s",
c.DbId, c.NewDbName, c.TableId, c.TableName, c.NewTableName, c.PartitionId, c.PartitionName, c.NewPartitionName)
}
12 changes: 6 additions & 6 deletions pkg/rpc/kitex_gen/frontendservice/FrontendService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/rpc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ enum TBinlogType {
RENAME_ROLLUP = 21,
RENAME_PARTITION = 22,
DROP_ROLLUP = 23,

RECOVER_INFO = 24,
// Keep some IDs for allocation so that when new binlog types are added in the
// future, the changes can be picked back to the old versions without breaking
// compatibility.
Expand All @@ -1213,7 +1213,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
MIN_UNKNOWN = 24,
MIN_UNKNOWN = 25,
UNKNOWN_9 = 25,
UNKNOWN_10 = 26,
UNKNOWN_11 = 27,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
3 0
3 1
3 2
5 0
5 1
5 2

-- !sql_source_content --
3 0
3 1
3 2
5 0
5 1
5 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
3 0
3 1
3 2
5 0
5 1
5 2

-- !sql_source_content --
3 0
3 1
3 2
5 0
5 1
5 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
0 0
0 1
0 2
2 0
2 1
2 2

-- !sql_source_content --
0 0
0 1
0 2
2 0
2 1
2 2

-- !target_sql_content_2 --
0 0
0 1
0 2
2 0
2 1
2 2
3 0
3 1
3 2

-- !sql_source_content_2 --
0 0
0 1
0 2
2 0
2 1
2 2
3 0
3 1
3 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
0 0
0 1
0 2
2 0
2 1
2 2

-- !sql_source_content --
0 0
0 1
0 2
2 0
2 1
2 2

-- !target_sql_content_2 --
0 0
0 1
0 2
2 0
2 1
2 2
3 0
3 1
3 2

-- !sql_source_content_2 --
0 0
0 1
0 2
2 0
2 1
2 2
3 0
3 1
3 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
3 0
3 1
3 2
5 0
5 1
5 2

-- !sql_source_content --
3 0
3 1
3 2
5 0
5 1
5 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
3 0
3 1
3 2
5 0
5 1
5 2

-- !sql_source_content --
3 0
3 1
3 2
5 0
5 1
5 2

Loading

0 comments on commit 974c395

Please sign in to comment.