Skip to content

Commit

Permalink
[feat](binlog) Add Support recover binlog
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp committed Dec 1, 2024
1 parent d89f04e commit 2a6113a
Show file tree
Hide file tree
Showing 9 changed files with 662 additions and 41 deletions.
34 changes: 0 additions & 34 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,40 +534,6 @@ func (s *Spec) RenameTable(destTableName string, renameTable *record.RenameTable
return s.DbExec(sql)
}

func (s *Spec) RecoverInfo(destTableName string, recoverInfo *record.RecoverInfo) error {
destTableName = utils.FormatKeywordName(destTableName)
// recover info may be 'recover table', 'recover partition'
var sql string
// RECOVER PARTITION p1 from table_name
// RECOVER PARTITION p1 as p2 from table_name
if recoverInfo.PartitionName != "" {
partitionName := utils.FormatKeywordName(recoverInfo.PartitionName)
if recoverInfo.NewPartitionName != "" {
newPartitionName := utils.FormatKeywordName(recoverInfo.NewPartitionName)
sql = fmt.Sprintf("RECOVER PARTITION %s as %s from %s ", partitionName, newPartitionName, destTableName)
} else {
sql = fmt.Sprintf("RECOVER PARTITION %s from %s ", partitionName, destTableName)
}
log.Infof("recover info sql: %s", sql)
return s.DbExec(sql)
}
// RECOVER TABLE table_name
// RECOVER table table_name as new_table
if recoverInfo.TableName != "" {
if recoverInfo.NewTableName != "" {
newTableName := utils.FormatKeywordName(recoverInfo.NewTableName)
sql = fmt.Sprintf("RECOVER TABLE %s as %s", destTableName, newTableName)
} else {
sql = fmt.Sprintf("RECOVER TABLE %s", destTableName)
}
log.Infof("recover info sql: %s", sql)
return s.DbExec(sql)
}

// this can come for recover db case. we dont support it currently
return xerror.Errorf(xerror.Normal, "Recover info not currently supported")
}

func (s *Spec) RenameTableWithName(oldName, newName string) error {
oldName = utils.FormatKeywordName(oldName)
newName = utils.FormatKeywordName(newName)
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ type Specer interface {
RenameRollup(destTableName, oldRollup, newRollup string) error
DropRollup(destTableName, rollupName string) error

RecoverInfo(destTableName string, recoverInfo *record.RecoverInfo) error

DesyncTables(tables ...string) error

utils.Subject[SpecEvent]
Expand Down
23 changes: 18 additions & 5 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2592,13 +2592,26 @@ func (j *Job) handleRecoverInfoRecord(commitSeq int64, recoverInfo *record.Recov
return nil
}

var destTableName string
if j.SyncType == TableSync {
destTableName = j.Dest.Table
if recoverInfo.PartitionName == "" {
log.Infof("recover info with for table level, will trigger partial sync")
var tableName string
if recoverInfo.NewTableName != "" {
tableName = recoverInfo.NewTableName
} else {
tableName = recoverInfo.TableName
}
return j.newPartialSnapshot(recoverInfo.TableId, tableName, nil, true)
}

log.Infof("recover info with for partition level, will trigger partial sync")
var partitions []string
if recoverInfo.NewPartitionName != "" {
partitions = append(partitions, recoverInfo.NewPartitionName)
} else {
destTableName = recoverInfo.TableName
partitions = append(partitions, recoverInfo.PartitionName)
}
return j.IDest.RecoverInfo(destTableName, recoverInfo)
// here we are intrested only in sync on partition level. so replace parameter passed as false.
return j.newPartialSnapshot(recoverInfo.TableId, recoverInfo.TableName, partitions, false)
}

func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_ds_part_recover") {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", "helper.groovy"))

def tableName = "tbl_" + helper.randomSuffix()
def test_num = 0
def insert_num = 90 // insert into last partition
def opPartitonName = "less"

def exist = { res -> Boolean
return res.size() != 0
}
def notExist = { res -> Boolean
return res.size() == 0
}

helper.enableDbBinlog()

sql """
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
PARTITION BY RANGE(`id`)
(
PARTITION `${opPartitonName}_0` VALUES LESS THAN ("0"),
PARTITION `${opPartitonName}_1` VALUES LESS THAN ("10"),
PARTITION `${opPartitonName}_2` VALUES LESS THAN ("20"),
PARTITION `${opPartitonName}_3` VALUES LESS THAN ("30"),
PARTITION `${opPartitonName}_4` VALUES LESS THAN ("40"),
PARTITION `${opPartitonName}_5` VALUES LESS THAN ("50"),
PARTITION `${opPartitonName}_6` VALUES LESS THAN ("60"),
PARTITION `${opPartitonName}_7` VALUES LESS THAN ("70"),
PARTITION `${opPartitonName}_8` VALUES LESS THAN ("80"),
PARTITION `${opPartitonName}_9` VALUES LESS THAN ("90")
)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
)
"""

helper.ccrJobDelete()
helper.ccrJobCreate()

assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30))


logger.info("=== Test 1: Check partitions in src before sync case ===")
assertTrue(helper.checkShowTimesOf("""
SHOW PARTITIONS
FROM TEST_${context.dbName}.${tableName}
WHERE PartitionName = \"${opPartitonName}_9\"
""",
exist, 30, "target"))
assertTrue(helper.checkShowTimesOf("""
SHOW PARTITIONS
FROM TEST_${context.dbName}.${tableName}
WHERE PartitionName = \"${opPartitonName}_8\"
""",
exist, 30, "target"))

// save the backup num of source cluster
def show_backup_result = sql "SHOW BACKUP"
def backup_num = show_backup_result.size()
logger.info("backups before drop partition: ${show_backup_result}")

logger.info("=== Test 2: Insert data in valid partitions case ===")
test_num = 3
for (int index = 0; index < insert_num; index++) {
sql """
INSERT INTO ${tableName} VALUES (${test_num}, ${index})
"""
}
sql "sync"

logger.info("=== Test 3: pause ===")

helper.ccrJobPause()

test_num = 4
for (int index = 0; index < insert_num; index++) {
sql """
INSERT INTO ${tableName} VALUES (${test_num}, ${index})
"""
}

sql "sync"

logger.info("=== Test 4: Drop partitions case ===")
sql """
ALTER TABLE ${tableName}
DROP PARTITION IF EXISTS ${opPartitonName}_9
"""

sql """
ALTER TABLE ${tableName}
DROP PARTITION IF EXISTS ${opPartitonName}_8
"""
sql "sync"

logger.info("=== Test 5: pause and verify ===")

helper.ccrJobResume()

assertTrue(helper.checkShowTimesOf("""
SHOW PARTITIONS
FROM TEST_${context.dbName}.${tableName}
WHERE PartitionName = \"${opPartitonName}_9\"
""",
notExist, 30, "target"))
assertTrue(helper.checkShowTimesOf("""
SHOW PARTITIONS
FROM TEST_${context.dbName}.${tableName}
WHERE PartitionName = \"${opPartitonName}_8\"
""",
notExist, 30, "target"))

logger.info("=== Test 5: recover a partitions ===")

sql """
RECOVER PARTITION ${opPartitonName}_9 from ${tableName}
"""

logger.info("=== Test 56: check partition recovered ===")
assertTrue(helper.checkShowTimesOf("""
SHOW PARTITIONS
FROM TEST_${context.dbName}.${tableName}
WHERE PartitionName = \"${opPartitonName}_9\"
""",
exist, 30, "target"))


sql """
RECOVER PARTITION ${opPartitonName}_8 from ${tableName}
"""
assertTrue(helper.checkShowTimesOf("""
SHOW PARTITIONS
FROM TEST_${context.dbName}.${tableName}
WHERE PartitionName = \"${opPartitonName}_8\"
""",
exist, 30, "target"))
}


Loading

0 comments on commit 2a6113a

Please sign in to comment.