Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](binlog) Add Support recover binlog #284

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2567,6 +2567,55 @@ func (j *Job) handleDropRollupRecord(commitSeq int64, dropRollup *record.DropRol
return j.IDest.DropRollup(destTableName, dropRollup.IndexName)
}

func (j *Job) handleRecoverInfo(binlog *festruct.TBinlog) error {
log.Infof("handle recoverInfo binlog, prevCommitSeq: %d, commitSeq: %d",
j.progress.PrevCommitSeq, j.progress.CommitSeq)

data := binlog.GetData()
recoverInfo, err := record.NewRecoverInfoFromJson(data)
if err != nil {
return err
}

return j.handleRecoverInfoRecord(binlog.GetCommitSeq(), recoverInfo)
}

func isRecoverTable(recoverInfo *record.RecoverInfo) bool {
if recoverInfo.PartitionName == "" || recoverInfo.PartitionId == -1 {
return true
}
return false
}

func (j *Job) handleRecoverInfoRecord(commitSeq int64, recoverInfo *record.RecoverInfo) error {
if j.isBinlogCommitted(recoverInfo.TableId, commitSeq) {
return nil
}

if isRecoverTable(recoverInfo) {
var tableName string
if recoverInfo.NewTableName != "" {
tableName = recoverInfo.NewTableName
} else {
tableName = recoverInfo.TableName
}
log.Infof("recover info with for table %s, will trigger partial sync", tableName)
return j.newPartialSnapshot(recoverInfo.TableId, tableName, nil, true)
}

var partitions []string
if recoverInfo.NewPartitionName != "" {
partitions = append(partitions, recoverInfo.NewPartitionName)
} else {
partitions = append(partitions, recoverInfo.PartitionName)
}
log.Infof("recover info with for partition(%s) for table %s, will trigger partial sync",
partitions, recoverInfo.TableName)
// if source does multiple recover of partition, then there is a race
// condition and some recover might miss due to commitseq change after snapshot.
return j.newPartialSnapshot(recoverInfo.TableId, recoverInfo.TableName, nil, true)
}

func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
data := binlog.GetData()
barrierLog, err := record.NewBarrierLogFromJson(data)
Expand Down Expand Up @@ -2645,6 +2694,12 @@ func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
return err
}
return j.handleModifyCommentRecord(commitSeq, modifyComment)
case festruct.TBinlogType_RECOVER_INFO:
recoverInfo, err := record.NewRecoverInfoFromJson(barrierLog.Binlog)
if err != nil {
return err
}
return j.handleRecoverInfoRecord(commitSeq, recoverInfo)
case festruct.TBinlogType_BARRIER:
log.Info("handle barrier binlog, ignore it")
default:
Expand Down Expand Up @@ -2757,6 +2812,8 @@ func (j *Job) handleBinlog(binlog *festruct.TBinlog) error {
return j.handleRenameRollup(binlog)
case festruct.TBinlogType_DROP_ROLLUP:
return j.handleDropRollup(binlog)
case festruct.TBinlogType_RECOVER_INFO:
return j.handleRecoverInfo(binlog)
default:
return xerror.Errorf(xerror.Normal, "unknown binlog type: %v", binlog.GetType())
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/ccr/record/recover_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package record

import (
"encoding/json"
"fmt"

"github.com/selectdb/ccr_syncer/pkg/xerror"
)

type RecoverInfo struct {
DbId int64 `json:"dbId"`
NewDbName string `json:"newDbName"`
TableId int64 `json:"tableId"`
TableName string `json:"tableName"`
NewTableName string `json:"newTableName"`
PartitionId int64 `json:"partitionId"`
PartitionName string `json:"partitionName"`
NewPartitionName string `json:"newPartitionName"`
}

func NewRecoverInfoFromJson(data string) (*RecoverInfo, error) {
var recoverInfo RecoverInfo
err := json.Unmarshal([]byte(data), &recoverInfo)
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, "unmarshal create table error")
}

if recoverInfo.TableId == 0 {
return nil, xerror.Errorf(xerror.Normal, "table id not found")
}

// table name must exist. partition name not checked since optional.
if recoverInfo.TableName == "" {
return nil, xerror.Errorf(xerror.Normal, "Table Name can not be null")
}
return &recoverInfo, nil
}

// String
func (c *RecoverInfo) String() string {
return fmt.Sprintf("RecoverInfo: DbId: %d, NewDbName: %s, TableId: %d, TableName: %s, NewTableName: %s, PartitionId: %d, PartitionName: %s, NewPartitionName: %s",
c.DbId, c.NewDbName, c.TableId, c.TableName, c.NewTableName, c.PartitionId, c.PartitionName, c.NewPartitionName)
}
12 changes: 6 additions & 6 deletions pkg/rpc/kitex_gen/frontendservice/FrontendService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions pkg/rpc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ enum TBinlogType {
RENAME_ROLLUP = 21,
RENAME_PARTITION = 22,
DROP_ROLLUP = 23,

RECOVER_INFO = 24,
// Keep some IDs for allocation so that when new binlog types are added in the
// future, the changes can be picked back to the old versions without breaking
// compatibility.
Expand All @@ -1213,8 +1213,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
MIN_UNKNOWN = 24,
UNKNOWN_9 = 25,
MIN_UNKNOWN = 25,
UNKNOWN_10 = 26,
UNKNOWN_11 = 27,
UNKNOWN_12 = 28,
Expand Down
6 changes: 6 additions & 0 deletions regression-test/common/helper.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ class Helper {
"""
}

void disableDbBinlog() {
suite.sql """
ALTER DATABASE ${context.dbName} SET properties ("binlog.enable" = "false")
"""
}

Boolean checkShowTimesOf(sqlString, myClosure, times, func = "sql") {
Boolean ret = false
List<List<Object>> res
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
3 0
3 1
3 2
5 0
5 1
5 2

-- !sql_source_content --
3 0
3 1
3 2
5 0
5 1
5 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
3 0
3 1
3 2
5 0
5 1
5 2

-- !sql_source_content --
3 0
3 1
3 2
5 0
5 1
5 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
0 0
0 1
0 2
2 0
2 1
2 2

-- !sql_source_content --
0 0
0 1
0 2
2 0
2 1
2 2

-- !target_sql_content_2 --
0 0
0 1
0 2
2 0
2 1
2 2
3 0
3 1
3 2

-- !sql_source_content_2 --
0 0
0 1
0 2
2 0
2 1
2 2
3 0
3 1
3 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content --
0 0
0 1
0 2
2 0
2 1
2 2

-- !sql_source_content --
0 0
0 1
0 2
2 0
2 1
2 2

-- !target_sql_content_2 --
0 0
0 1
0 2
2 0
2 1
2 2
3 0
3 1
3 2

-- !sql_source_content_2 --
0 0
0 1
0 2
2 0
2 1
2 2
3 0
3 1
3 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !target_sql_content_2 --
0 0
0 1
0 2
10 0
10 1
10 2
11 0
11 1
11 2
12 0
12 1
12 2
13 0
13 1
13 2
14 0
14 1
14 2
15 0
15 1
15 2
16 0
16 1
16 2
17 0
17 1
17 2
18 0
18 1
18 2
19 0
19 1
19 2
20 0
20 1
20 2

-- !sql_source_content_2 --
0 0
0 1
0 2
10 0
10 1
10 2
11 0
11 1
11 2
12 0
12 1
12 2
13 0
13 1
13 2
14 0
14 1
14 2
15 0
15 1
15 2
16 0
16 1
16 2
17 0
17 1
17 2
18 0
18 1
18 2
19 0
19 1
19 2
20 0
20 1
20 2

Loading
Loading