Skip to content

Commit

Permalink
[feat](binlog) Add Support recover binlog (#284)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Dec 11, 2024
1 parent 60d48d9 commit 44229b7
Show file tree
Hide file tree
Showing 21 changed files with 1,482 additions and 9 deletions.
57 changes: 57 additions & 0 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2567,6 +2567,55 @@ func (j *Job) handleDropRollupRecord(commitSeq int64, dropRollup *record.DropRol
return j.IDest.DropRollup(destTableName, dropRollup.IndexName)
}

func (j *Job) handleRecoverInfo(binlog *festruct.TBinlog) error {
log.Infof("handle recoverInfo binlog, prevCommitSeq: %d, commitSeq: %d",
j.progress.PrevCommitSeq, j.progress.CommitSeq)

data := binlog.GetData()
recoverInfo, err := record.NewRecoverInfoFromJson(data)
if err != nil {
return err
}

return j.handleRecoverInfoRecord(binlog.GetCommitSeq(), recoverInfo)
}

func isRecoverTable(recoverInfo *record.RecoverInfo) bool {
if recoverInfo.PartitionName == "" || recoverInfo.PartitionId == -1 {
return true
}
return false
}

func (j *Job) handleRecoverInfoRecord(commitSeq int64, recoverInfo *record.RecoverInfo) error {
if j.isBinlogCommitted(recoverInfo.TableId, commitSeq) {
return nil
}

if isRecoverTable(recoverInfo) {
var tableName string
if recoverInfo.NewTableName != "" {
tableName = recoverInfo.NewTableName
} else {
tableName = recoverInfo.TableName
}
log.Infof("recover info with for table %s, will trigger partial sync", tableName)
return j.newPartialSnapshot(recoverInfo.TableId, tableName, nil, true)
}

var partitions []string
if recoverInfo.NewPartitionName != "" {
partitions = append(partitions, recoverInfo.NewPartitionName)
} else {
partitions = append(partitions, recoverInfo.PartitionName)
}
log.Infof("recover info with for partition(%s) for table %s, will trigger partial sync",
partitions, recoverInfo.TableName)
// if source does multiple recover of partition, then there is a race
// condition and some recover might miss due to commitseq change after snapshot.
return j.newPartialSnapshot(recoverInfo.TableId, recoverInfo.TableName, nil, true)
}

func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
data := binlog.GetData()
barrierLog, err := record.NewBarrierLogFromJson(data)
Expand Down Expand Up @@ -2645,6 +2694,12 @@ func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
return err
}
return j.handleModifyCommentRecord(commitSeq, modifyComment)
case festruct.TBinlogType_RECOVER_INFO:
recoverInfo, err := record.NewRecoverInfoFromJson(barrierLog.Binlog)
if err != nil {
return err
}
return j.handleRecoverInfoRecord(commitSeq, recoverInfo)
case festruct.TBinlogType_BARRIER:
log.Info("handle barrier binlog, ignore it")
default:
Expand Down Expand Up @@ -2757,6 +2812,8 @@ func (j *Job) handleBinlog(binlog *festruct.TBinlog) error {
return j.handleRenameRollup(binlog)
case festruct.TBinlogType_DROP_ROLLUP:
return j.handleDropRollup(binlog)
case festruct.TBinlogType_RECOVER_INFO:
return j.handleRecoverInfo(binlog)
default:
return xerror.Errorf(xerror.Normal, "unknown binlog type: %v", binlog.GetType())
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/ccr/record/recover_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package record

import (
"encoding/json"
"fmt"

"github.com/selectdb/ccr_syncer/pkg/xerror"
)

type RecoverInfo struct {
DbId int64 `json:"dbId"`
NewDbName string `json:"newDbName"`
TableId int64 `json:"tableId"`
TableName string `json:"tableName"`
NewTableName string `json:"newTableName"`
PartitionId int64 `json:"partitionId"`
PartitionName string `json:"partitionName"`
NewPartitionName string `json:"newPartitionName"`
}

func NewRecoverInfoFromJson(data string) (*RecoverInfo, error) {
var recoverInfo RecoverInfo
err := json.Unmarshal([]byte(data), &recoverInfo)
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, "unmarshal create table error")
}

if recoverInfo.TableId == 0 {
return nil, xerror.Errorf(xerror.Normal, "table id not found")
}

// table name must exist. partition name not checked since optional.
if recoverInfo.TableName == "" {
return nil, xerror.Errorf(xerror.Normal, "Table Name can not be null")
}
return &recoverInfo, nil
}

// String
func (c *RecoverInfo) String() string {
return fmt.Sprintf("RecoverInfo: DbId: %d, NewDbName: %s, TableId: %d, TableName: %s, NewTableName: %s, PartitionId: %d, PartitionName: %s, NewPartitionName: %s",
c.DbId, c.NewDbName, c.TableId, c.TableName, c.NewTableName, c.PartitionId, c.PartitionName, c.NewPartitionName)
}
12 changes: 6 additions & 6 deletions pkg/rpc/kitex_gen/frontendservice/FrontendService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions pkg/rpc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ enum TBinlogType {
RENAME_ROLLUP = 21,
RENAME_PARTITION = 22,
DROP_ROLLUP = 23,

RECOVER_INFO = 24,
// Keep some IDs for allocation so that when new binlog types are added in the
// future, the changes can be picked back to the old versions without breaking
// compatibility.
Expand All @@ -1213,8 +1213,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
MIN_UNKNOWN = 24,
UNKNOWN_9 = 25,
MIN_UNKNOWN = 25,
UNKNOWN_10 = 26,
UNKNOWN_11 = 27,
UNKNOWN_12 = 28,
Expand Down
6 changes: 6 additions & 0 deletions regression-test/common/helper.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ class Helper {
"""
}

void disableDbBinlog() {
suite.sql """
ALTER DATABASE ${context.dbName} SET properties ("binlog.enable" = "false")
"""
}

Boolean checkShowTimesOf(sqlString, myClosure, times, func = "sql") {
Boolean ret = false
List<List<Object>> res
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
3 0
3 1
3 2
5 0
5 1
5 2

-- !sql_source_content --
3 0
3 1
3 2
5 0
5 1
5 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
3 0
3 1
3 2
5 0
5 1
5 2

-- !sql_source_content --
3 0
3 1
3 2
5 0
5 1
5 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
0 0
0 1
0 2
2 0
2 1
2 2

-- !sql_source_content --
0 0
0 1
0 2
2 0
2 1
2 2

-- !target_sql_content_2 --
0 0
0 1
0 2
2 0
2 1
2 2
3 0
3 1
3 2

-- !sql_source_content_2 --
0 0
0 1
0 2
2 0
2 1
2 2
3 0
3 1
3 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
0 0
0 1
0 2
2 0
2 1
2 2

-- !sql_source_content --
0 0
0 1
0 2
2 0
2 1
2 2

-- !target_sql_content_2 --
0 0
0 1
0 2
2 0
2 1
2 2
3 0
3 1
3 2

-- !sql_source_content_2 --
0 0
0 1
0 2
2 0
2 1
2 2
3 0
3 1
3 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content_2 --
0 0
0 1
0 2
10 0
10 1
10 2
11 0
11 1
11 2
12 0
12 1
12 2
13 0
13 1
13 2
14 0
14 1
14 2
15 0
15 1
15 2
16 0
16 1
16 2
17 0
17 1
17 2
18 0
18 1
18 2
19 0
19 1
19 2
20 0
20 1
20 2

-- !sql_source_content_2 --
0 0
0 1
0 2
10 0
10 1
10 2
11 0
11 1
11 2
12 0
12 1
12 2
13 0
13 1
13 2
14 0
14 1
14 2
15 0
15 1
15 2
16 0
16 1
16 2
17 0
17 1
17 2
18 0
18 1
18 2
19 0
19 1
19 2
20 0
20 1
20 2

Loading

0 comments on commit 44229b7

Please sign in to comment.