From 974c3955f20c635db307ebecd5fd5d50ba2de097 Mon Sep 17 00:00:00 2001 From: Vallish Date: Sat, 30 Nov 2024 13:28:55 +0000 Subject: [PATCH 1/2] [feat](binlog) Add Support recover binlog --- pkg/ccr/job.go | 50 ++++++ pkg/ccr/record/recover_info.go | 42 +++++ .../frontendservice/FrontendService.go | 12 +- pkg/rpc/thrift/FrontendService.thrift | 4 +- .../recover/test_ds_part_recover.out | 17 ++ .../recover1/test_ds_part_recover_new.out | 17 ++ .../recover/test_ds_tbl_drop_recover.out | 39 +++++ .../recover1/test_ds_tbl_drop_recover_new.out | 39 +++++ .../recover/test_tbl_part_recover.out | 17 ++ .../recover1/test_tbl_part_recover_new.out | 17 ++ .../recover/test_ds_part_recover.groovy | 147 +++++++++++++++++ .../recover1/test_ds_part_recover_new.groovy | 147 +++++++++++++++++ .../recover/test_ds_tbl_drop_recover.groovy | 150 ++++++++++++++++++ .../test_ds_tbl_drop_recover_new.groovy | 143 +++++++++++++++++ .../recover/test_tbl_part_recover.groovy | 145 +++++++++++++++++ .../recover1/test_tbl_part_recover_new.groovy | 144 +++++++++++++++++ 16 files changed, 1122 insertions(+), 8 deletions(-) create mode 100644 pkg/ccr/record/recover_info.go create mode 100644 regression-test/data/db_sync/partition/recover/test_ds_part_recover.out create mode 100644 regression-test/data/db_sync/partition/recover1/test_ds_part_recover_new.out create mode 100644 regression-test/data/db_sync/table/recover/test_ds_tbl_drop_recover.out create mode 100644 regression-test/data/db_sync/table/recover1/test_ds_tbl_drop_recover_new.out create mode 100644 regression-test/data/table_sync/partition/recover/test_tbl_part_recover.out create mode 100644 regression-test/data/table_sync/partition/recover1/test_tbl_part_recover_new.out create mode 100644 regression-test/suites/db_sync/partition/recover/test_ds_part_recover.groovy create mode 100644 regression-test/suites/db_sync/partition/recover1/test_ds_part_recover_new.groovy create mode 100644 regression-test/suites/db_sync/table/recover/test_ds_tbl_drop_recover.groovy create mode 100644 regression-test/suites/db_sync/table/recover1/test_ds_tbl_drop_recover_new.groovy create mode 100644 regression-test/suites/table_sync/partition/recover/test_tbl_part_recover.groovy create mode 100644 regression-test/suites/table_sync/partition/recover1/test_tbl_part_recover_new.groovy diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 4f715922..bcc28b36 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -2567,6 +2567,48 @@ func (j *Job) handleDropRollupRecord(commitSeq int64, dropRollup *record.DropRol return j.IDest.DropRollup(destTableName, dropRollup.IndexName) } +func (j *Job) handleRecoverInfo(binlog *festruct.TBinlog) error { + log.Infof("handle recoverInfo binlog, prevCommitSeq: %d, commitSeq: %d", + j.progress.PrevCommitSeq, j.progress.CommitSeq) + + data := binlog.GetData() + recoverInfo, err := record.NewRecoverInfoFromJson(data) + if err != nil { + return err + } + + return j.handleRecoverInfoRecord(binlog.GetCommitSeq(), recoverInfo) +} + +func (j *Job) handleRecoverInfoRecord(commitSeq int64, recoverInfo *record.RecoverInfo) error { + if j.isBinlogCommitted(recoverInfo.TableId, commitSeq) { + return nil + } + + if recoverInfo.PartitionName == "" { + var tableName string + if recoverInfo.NewTableName != "" { + tableName = recoverInfo.NewTableName + } else { + tableName = recoverInfo.TableName + } + log.Infof("recover info with for table %s, will trigger partial sync", tableName) + return j.newPartialSnapshot(recoverInfo.TableId, tableName, nil, true) + } + + var partitions []string + if recoverInfo.NewPartitionName != "" { + partitions = append(partitions, recoverInfo.NewPartitionName) + } else { + partitions = append(partitions, recoverInfo.PartitionName) + } + log.Infof("recover info with for partition(%s) for table %s, will trigger partial sync", + partitions, recoverInfo.TableName) + // if source does multiple recover of partition, then there is a race + // condition and some recover might miss due to commitseq change after snapshot. + return j.newPartialSnapshot(recoverInfo.TableId, recoverInfo.TableName, nil, true) +} + func (j *Job) handleBarrier(binlog *festruct.TBinlog) error { data := binlog.GetData() barrierLog, err := record.NewBarrierLogFromJson(data) @@ -2645,6 +2687,12 @@ func (j *Job) handleBarrier(binlog *festruct.TBinlog) error { return err } return j.handleModifyCommentRecord(commitSeq, modifyComment) + case festruct.TBinlogType_RECOVER_INFO: + recoverInfo, err := record.NewRecoverInfoFromJson(barrierLog.Binlog) + if err != nil { + return err + } + return j.handleRecoverInfoRecord(commitSeq, recoverInfo) case festruct.TBinlogType_BARRIER: log.Info("handle barrier binlog, ignore it") default: @@ -2757,6 +2805,8 @@ func (j *Job) handleBinlog(binlog *festruct.TBinlog) error { return j.handleRenameRollup(binlog) case festruct.TBinlogType_DROP_ROLLUP: return j.handleDropRollup(binlog) + case festruct.TBinlogType_RECOVER_INFO: + return j.handleRecoverInfo(binlog) default: return xerror.Errorf(xerror.Normal, "unknown binlog type: %v", binlog.GetType()) } diff --git a/pkg/ccr/record/recover_info.go b/pkg/ccr/record/recover_info.go new file mode 100644 index 00000000..aecd71c7 --- /dev/null +++ b/pkg/ccr/record/recover_info.go @@ -0,0 +1,42 @@ +package record + +import ( + "encoding/json" + "fmt" + + "github.com/selectdb/ccr_syncer/pkg/xerror" +) + +type RecoverInfo struct { + DbId int64 `json:"dbId"` + NewDbName string `json:"newDbName"` + TableId int64 `json:"tableId"` + TableName string `json:"tableName"` + NewTableName string `json:"newTableName"` + PartitionId int64 `json:"partitionId"` + PartitionName string `json:"partitionName"` + NewPartitionName string `json:"newPartitionName"` +} + +func NewRecoverInfoFromJson(data string) (*RecoverInfo, error) { + var recoverInfo RecoverInfo + err := json.Unmarshal([]byte(data), &recoverInfo) + if err != nil { + return nil, xerror.Wrap(err, xerror.Normal, "unmarshal create table error") + } + + if recoverInfo.TableId == 0 { + return nil, xerror.Errorf(xerror.Normal, "table id not found") + } + + /* need check for db level not supported. + */ + + return &recoverInfo, nil +} + +// String +func (c *RecoverInfo) String() string { + return fmt.Sprintf("RecoverInfo: DbId: %d, NewDbName: %s, TableId: %d, TableName: %s, NewTableName: %s, PartitionId: %d, PartitionName: %s, NewPartitionName: %s", + c.DbId, c.NewDbName, c.TableId, c.TableName, c.NewTableName, c.PartitionId, c.PartitionName, c.NewPartitionName) +} diff --git a/pkg/rpc/kitex_gen/frontendservice/FrontendService.go b/pkg/rpc/kitex_gen/frontendservice/FrontendService.go index ec4200b4..dd8cca03 100644 --- a/pkg/rpc/kitex_gen/frontendservice/FrontendService.go +++ b/pkg/rpc/kitex_gen/frontendservice/FrontendService.go @@ -637,8 +637,8 @@ const ( TBinlogType_RENAME_ROLLUP TBinlogType = 21 TBinlogType_RENAME_PARTITION TBinlogType = 22 TBinlogType_DROP_ROLLUP TBinlogType = 23 - TBinlogType_MIN_UNKNOWN TBinlogType = 24 - TBinlogType_UNKNOWN_9 TBinlogType = 25 + TBinlogType_RECOVER_INFO TBinlogType = 24 + TBinlogType_MIN_UNKNOWN TBinlogType = 25 TBinlogType_UNKNOWN_10 TBinlogType = 26 TBinlogType_UNKNOWN_11 TBinlogType = 27 TBinlogType_UNKNOWN_12 TBinlogType = 28 @@ -782,10 +782,10 @@ func (p TBinlogType) String() string { return "RENAME_PARTITION" case TBinlogType_DROP_ROLLUP: return "DROP_ROLLUP" + case TBinlogType_RECOVER_INFO: + return "RECOVER_INFO" case TBinlogType_MIN_UNKNOWN: return "MIN_UNKNOWN" - case TBinlogType_UNKNOWN_9: - return "UNKNOWN_9" case TBinlogType_UNKNOWN_10: return "UNKNOWN_10" case TBinlogType_UNKNOWN_11: @@ -1022,10 +1022,10 @@ func TBinlogTypeFromString(s string) (TBinlogType, error) { return TBinlogType_RENAME_PARTITION, nil case "DROP_ROLLUP": return TBinlogType_DROP_ROLLUP, nil + case "RECOVER_INFO": + return TBinlogType_RECOVER_INFO, nil case "MIN_UNKNOWN": return TBinlogType_MIN_UNKNOWN, nil - case "UNKNOWN_9": - return TBinlogType_UNKNOWN_9, nil case "UNKNOWN_10": return TBinlogType_UNKNOWN_10, nil case "UNKNOWN_11": diff --git a/pkg/rpc/thrift/FrontendService.thrift b/pkg/rpc/thrift/FrontendService.thrift index e2af8937..b0af81db 100644 --- a/pkg/rpc/thrift/FrontendService.thrift +++ b/pkg/rpc/thrift/FrontendService.thrift @@ -1197,7 +1197,7 @@ enum TBinlogType { RENAME_ROLLUP = 21, RENAME_PARTITION = 22, DROP_ROLLUP = 23, - + RECOVER_INFO = 24, // Keep some IDs for allocation so that when new binlog types are added in the // future, the changes can be picked back to the old versions without breaking // compatibility. @@ -1213,7 +1213,7 @@ enum TBinlogType { // MODIFY_XXX = 17, // MIN_UNKNOWN = 18, // UNKNOWN_3 = 19, - MIN_UNKNOWN = 24, + MIN_UNKNOWN = 25, UNKNOWN_9 = 25, UNKNOWN_10 = 26, UNKNOWN_11 = 27, diff --git a/regression-test/data/db_sync/partition/recover/test_ds_part_recover.out b/regression-test/data/db_sync/partition/recover/test_ds_part_recover.out new file mode 100644 index 00000000..44d17364 --- /dev/null +++ b/regression-test/data/db_sync/partition/recover/test_ds_part_recover.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + +-- !sql_source_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + diff --git a/regression-test/data/db_sync/partition/recover1/test_ds_part_recover_new.out b/regression-test/data/db_sync/partition/recover1/test_ds_part_recover_new.out new file mode 100644 index 00000000..44d17364 --- /dev/null +++ b/regression-test/data/db_sync/partition/recover1/test_ds_part_recover_new.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + +-- !sql_source_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + diff --git a/regression-test/data/db_sync/table/recover/test_ds_tbl_drop_recover.out b/regression-test/data/db_sync/table/recover/test_ds_tbl_drop_recover.out new file mode 100644 index 00000000..28abc0ba --- /dev/null +++ b/regression-test/data/db_sync/table/recover/test_ds_tbl_drop_recover.out @@ -0,0 +1,39 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 + +-- !sql_source_content -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 + +-- !target_sql_content_2 -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 +3 0 +3 1 +3 2 + +-- !sql_source_content_2 -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 +3 0 +3 1 +3 2 + diff --git a/regression-test/data/db_sync/table/recover1/test_ds_tbl_drop_recover_new.out b/regression-test/data/db_sync/table/recover1/test_ds_tbl_drop_recover_new.out new file mode 100644 index 00000000..28abc0ba --- /dev/null +++ b/regression-test/data/db_sync/table/recover1/test_ds_tbl_drop_recover_new.out @@ -0,0 +1,39 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 + +-- !sql_source_content -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 + +-- !target_sql_content_2 -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 +3 0 +3 1 +3 2 + +-- !sql_source_content_2 -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 +3 0 +3 1 +3 2 + diff --git a/regression-test/data/table_sync/partition/recover/test_tbl_part_recover.out b/regression-test/data/table_sync/partition/recover/test_tbl_part_recover.out new file mode 100644 index 00000000..44d17364 --- /dev/null +++ b/regression-test/data/table_sync/partition/recover/test_tbl_part_recover.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + +-- !sql_source_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + diff --git a/regression-test/data/table_sync/partition/recover1/test_tbl_part_recover_new.out b/regression-test/data/table_sync/partition/recover1/test_tbl_part_recover_new.out new file mode 100644 index 00000000..44d17364 --- /dev/null +++ b/regression-test/data/table_sync/partition/recover1/test_tbl_part_recover_new.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + +-- !sql_source_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + diff --git a/regression-test/suites/db_sync/partition/recover/test_ds_part_recover.groovy b/regression-test/suites/db_sync/partition/recover/test_ds_part_recover.groovy new file mode 100644 index 00000000..44350930 --- /dev/null +++ b/regression-test/suites/db_sync/partition/recover/test_ds_part_recover.groovy @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_ds_part_recover") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "part" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.enableDbBinlog() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("10"), + PARTITION `${opPartitonName}_2` VALUES LESS THAN ("100") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + + logger.info("=== Test 1: Check partitions in src before sync case ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + exist, 30, "target")) + + + + logger.info("=== Test 3: Insert data in valid partitions case ===") + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + + + logger.info("=== Test 4: Drop partitions case ===") + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_1 + """ + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_2 + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + notExist, 30, "target")) + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + notExist, 30, "target")) + + logger.info("=== Test 4: recover partitions case ===") + sql """ + RECOVER PARTITION ${opPartitonName}_1 from ${tableName} + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + sql """ + RECOVER PARTITION ${opPartitonName}_2 from ${tableName} + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + exist, 30, "target")) + + test_num = 5 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + order_qt_target_sql_content("SELECT * FROM ${tableName}") + order_qt_sql_source_content("SELECT * FROM ${tableName}") +} diff --git a/regression-test/suites/db_sync/partition/recover1/test_ds_part_recover_new.groovy b/regression-test/suites/db_sync/partition/recover1/test_ds_part_recover_new.groovy new file mode 100644 index 00000000..dca2b8ca --- /dev/null +++ b/regression-test/suites/db_sync/partition/recover1/test_ds_part_recover_new.groovy @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_ds_part_recover_new") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "part" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.enableDbBinlog() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("10"), + PARTITION `${opPartitonName}_2` VALUES LESS THAN ("100") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + + logger.info("=== Test 1: Check partitions in src before sync case ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + exist, 30, "target")) + + + + logger.info("=== Test 3: Insert data in valid partitions case ===") + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + + + logger.info("=== Test 4: Drop partitions case ===") + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_1 + """ + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_2 + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + notExist, 30, "target")) + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + notExist, 30, "target")) + + logger.info("=== Test 4: recover partitions case ===") + sql """ + RECOVER PARTITION ${opPartitonName}_1 as ${opPartitonName}_11 from ${tableName} + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_11\" + """, + exist, 30, "target")) + sql """ + RECOVER PARTITION ${opPartitonName}_2 as ${opPartitonName}_21 from ${tableName} + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_21\" + """, + exist, 30, "target")) + + test_num = 5 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + order_qt_target_sql_content("SELECT * FROM ${tableName}") + order_qt_sql_source_content("SELECT * FROM ${tableName}") +} diff --git a/regression-test/suites/db_sync/table/recover/test_ds_tbl_drop_recover.groovy b/regression-test/suites/db_sync/table/recover/test_ds_tbl_drop_recover.groovy new file mode 100644 index 00000000..e91da172 --- /dev/null +++ b/regression-test/suites/db_sync/table/recover/test_ds_tbl_drop_recover.groovy @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_ds_tbl_drop_recover") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_recover" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "less" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.enableDbBinlog() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName}_1 + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_0` VALUES LESS THAN ("0"), + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_1", 60)) + + logger.info("=== Test 1: Check table and backup size ===") + sql "sync" + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "target")) + + helper.ccrJobPause() + + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + + sql """ + DROP TABLE ${tableName}_1 + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "sql")) + + logger.info("=== Test 5: Resume and verify ===") + helper.ccrJobResume() + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "target")) + + // not both source and target dont have this table. it should be in recycle bin. + // lets try recover. + helper.ccrJobPause() + sql """ + RECOVER TABLE ${tableName}_1 + """ + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "sql")) // check recovered in local + helper.ccrJobResume() + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "target")) // check recovered in target + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_1", 60)) + + def resSql = target_sql "SELECT * FROM ${tableName}_1 WHERE test=0" + def resSrcSql = sql "SELECT * FROM ${context.dbName}.${tableName}_1 WHERE test=0" + logger.info("=== {} vs {} ===", resSql.size(), resSrcSql.size()) + assertTrue(resSql.size() == resSrcSql.size()) + + test_num = 2 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_1 WHERE test=${test_num}", + insert_num, 30)) + + qt_target_sql_content("SELECT * FROM ${tableName}_1") + qt_sql_source_content("SELECT * FROM ${tableName}_1") + + logger.info("=== Test 6: Drop again and try recover and insert ===") + sql """ + DROP TABLE ${tableName}_1 + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "sql")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "target")) + + // not both source and target dont have this table. it should be in recycle bin. + sql """ + RECOVER TABLE ${tableName}_1 + """ + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "sql")) // check recovered in local + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "target")) // check recovered in target + + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_1 WHERE test=${test_num}", + insert_num, 30)) + order_qt_target_sql_content_2("SELECT * FROM ${tableName}_1") + order_qt_sql_source_content_2("SELECT * FROM ${tableName}_1") +} diff --git a/regression-test/suites/db_sync/table/recover1/test_ds_tbl_drop_recover_new.groovy b/regression-test/suites/db_sync/table/recover1/test_ds_tbl_drop_recover_new.groovy new file mode 100644 index 00000000..11f9965b --- /dev/null +++ b/regression-test/suites/db_sync/table/recover1/test_ds_tbl_drop_recover_new.groovy @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_ds_tbl_drop_recover_new") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_recover" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "part" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.enableDbBinlog() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName}_1 + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_0` VALUES LESS THAN ("0"), + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_1", 60)) + + logger.info("=== Test 1: Check table and backup size ===") + sql "sync" + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "target")) + + helper.ccrJobPause() + + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + + sql """ + DROP TABLE ${tableName}_1 + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "sql")) + + logger.info("=== Test 5: Resume and verify ===") + helper.ccrJobResume() + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "target")) + + // not both source and target dont have this table. it should be in recycle bin. + // lets try recover. + helper.ccrJobPause() + sql """ + RECOVER TABLE ${tableName}_1 as ${tableName}_10 + """ + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_10" """, exist, 60, "sql")) // check recovered in local + helper.ccrJobResume() + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_10" """, exist, 60, "target")) // check recovered in target + + test_num = 2 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_10 VALUES (${test_num}, ${index}) + """ + } + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_10 WHERE test=${test_num}", + insert_num, 30)) + + qt_target_sql_content("SELECT * FROM ${tableName}_10") + qt_sql_source_content("SELECT * FROM ${tableName}_10") + + logger.info("=== Test 6: Drop again and try recover and insert ===") + sql """ + DROP TABLE ${tableName}_10 + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_10" + """, + notExist, 60, "sql")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_10" + """, + notExist, 60, "target")) + + // not both source and target dont have this table. it should be in recycle bin. + sql """ + RECOVER TABLE ${tableName}_10 as ${tableName}_100 + """ + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_100" """, exist, 60, "sql")) // check recovered in local + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_100" """, exist, 60, "target")) // check recovered in target + + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_100 VALUES (${test_num}, ${index}) + """ + } + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_100 WHERE test=${test_num}", + insert_num, 30)) + order_qt_target_sql_content_2("SELECT * FROM ${tableName}_100") + order_qt_sql_source_content_2("SELECT * FROM ${tableName}_100") +} diff --git a/regression-test/suites/table_sync/partition/recover/test_tbl_part_recover.groovy b/regression-test/suites/table_sync/partition/recover/test_tbl_part_recover.groovy new file mode 100644 index 00000000..dda58aaa --- /dev/null +++ b/regression-test/suites/table_sync/partition/recover/test_tbl_part_recover.groovy @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_tbl_part_recover") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "part" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("10"), + PARTITION `${opPartitonName}_2` VALUES LESS THAN ("100") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobCreate(tableName) + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + + logger.info("=== Test 1: Check partitions in src before sync case ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + exist, 30, "target")) + + + + logger.info("=== Test 3: Insert data in valid partitions case ===") + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + + + logger.info("=== Test 4: Drop partitions case ===") + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_1 + """ + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_2 + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + notExist, 30, "target")) + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + notExist, 30, "target")) + + logger.info("=== Test 4: recover partitions case ===") + sql """ + RECOVER PARTITION ${opPartitonName}_1 from ${tableName} + """ + sql """ + RECOVER PARTITION ${opPartitonName}_2 from ${tableName} + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + exist, 30, "target")) + + test_num = 5 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + order_qt_target_sql_content("SELECT * FROM ${tableName}") + order_qt_sql_source_content("SELECT * FROM ${tableName}") +} diff --git a/regression-test/suites/table_sync/partition/recover1/test_tbl_part_recover_new.groovy b/regression-test/suites/table_sync/partition/recover1/test_tbl_part_recover_new.groovy new file mode 100644 index 00000000..1cf9a5ab --- /dev/null +++ b/regression-test/suites/table_sync/partition/recover1/test_tbl_part_recover_new.groovy @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_tbl_part_recover_new") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "part" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("10"), + PARTITION `${opPartitonName}_2` VALUES LESS THAN ("100") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobCreate(tableName) + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + + logger.info("=== Test 1: Check partitions in src before sync case ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + exist, 30, "target")) + + + + logger.info("=== Test 3: Insert data in valid partitions case ===") + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + + + logger.info("=== Test 4: Drop partitions case ===") + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_1 + """ + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_2 + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + notExist, 30, "target")) + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + notExist, 30, "target")) + + logger.info("=== Test 4: recover partitions case ===") + sql """ + RECOVER PARTITION ${opPartitonName}_1 as ${opPartitonName}_11 from ${tableName} + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_11\" + """, + exist, 30, "target")) + sql """ + RECOVER PARTITION ${opPartitonName}_2 as ${opPartitonName}_22 from ${tableName} + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_22\" + """, + exist, 30, "target")) + + test_num = 5 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + order_qt_target_sql_content("SELECT * FROM ${tableName}") + order_qt_sql_source_content("SELECT * FROM ${tableName}") +} From 131983c5f27c8fd61fd2fe3a6881508e8ab6ef31 Mon Sep 17 00:00:00 2001 From: Vallish Date: Tue, 3 Dec 2024 16:44:07 +0000 Subject: [PATCH 2/2] add more testcases for recover table --- pkg/ccr/job.go | 9 +- pkg/ccr/record/recover_info.go | 7 +- pkg/rpc/thrift/FrontendService.thrift | 1 - regression-test/common/helper.groovy | 6 ++ .../recover2/test_ds_tbl_drop_recover2.out | 77 +++++++++++++ .../recover3/test_ds_tbl_drop_recover3.out | 77 +++++++++++++ .../recover2/test_ds_tbl_drop_recover2.groovy | 101 ++++++++++++++++++ .../recover3/test_ds_tbl_drop_recover3.groovy | 91 ++++++++++++++++ 8 files changed, 364 insertions(+), 5 deletions(-) create mode 100644 regression-test/data/db_sync/table/recover2/test_ds_tbl_drop_recover2.out create mode 100644 regression-test/data/db_sync/table/recover3/test_ds_tbl_drop_recover3.out create mode 100644 regression-test/suites/db_sync/table/recover2/test_ds_tbl_drop_recover2.groovy create mode 100644 regression-test/suites/db_sync/table/recover3/test_ds_tbl_drop_recover3.groovy diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index bcc28b36..ae52cbb5 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -2580,12 +2580,19 @@ func (j *Job) handleRecoverInfo(binlog *festruct.TBinlog) error { return j.handleRecoverInfoRecord(binlog.GetCommitSeq(), recoverInfo) } +func isRecoverTable(recoverInfo *record.RecoverInfo) bool { + if recoverInfo.PartitionName == "" || recoverInfo.PartitionId == -1 { + return true + } + return false +} + func (j *Job) handleRecoverInfoRecord(commitSeq int64, recoverInfo *record.RecoverInfo) error { if j.isBinlogCommitted(recoverInfo.TableId, commitSeq) { return nil } - if recoverInfo.PartitionName == "" { + if isRecoverTable(recoverInfo) { var tableName string if recoverInfo.NewTableName != "" { tableName = recoverInfo.NewTableName diff --git a/pkg/ccr/record/recover_info.go b/pkg/ccr/record/recover_info.go index aecd71c7..8e97c81e 100644 --- a/pkg/ccr/record/recover_info.go +++ b/pkg/ccr/record/recover_info.go @@ -29,9 +29,10 @@ func NewRecoverInfoFromJson(data string) (*RecoverInfo, error) { return nil, xerror.Errorf(xerror.Normal, "table id not found") } - /* need check for db level not supported. - */ - + // table name must exist. partition name not checked since optional. + if recoverInfo.TableName == "" { + return nil, xerror.Errorf(xerror.Normal, "Table Name can not be null") + } return &recoverInfo, nil } diff --git a/pkg/rpc/thrift/FrontendService.thrift b/pkg/rpc/thrift/FrontendService.thrift index b0af81db..c1a4d106 100644 --- a/pkg/rpc/thrift/FrontendService.thrift +++ b/pkg/rpc/thrift/FrontendService.thrift @@ -1214,7 +1214,6 @@ enum TBinlogType { // MIN_UNKNOWN = 18, // UNKNOWN_3 = 19, MIN_UNKNOWN = 25, - UNKNOWN_9 = 25, UNKNOWN_10 = 26, UNKNOWN_11 = 27, UNKNOWN_12 = 28, diff --git a/regression-test/common/helper.groovy b/regression-test/common/helper.groovy index 9fc243d6..c8deb392 100644 --- a/regression-test/common/helper.groovy +++ b/regression-test/common/helper.groovy @@ -189,6 +189,12 @@ class Helper { """ } + void disableDbBinlog() { + suite.sql """ + ALTER DATABASE ${context.dbName} SET properties ("binlog.enable" = "false") + """ + } + Boolean checkShowTimesOf(sqlString, myClosure, times, func = "sql") { Boolean ret = false List> res diff --git a/regression-test/data/db_sync/table/recover2/test_ds_tbl_drop_recover2.out b/regression-test/data/db_sync/table/recover2/test_ds_tbl_drop_recover2.out new file mode 100644 index 00000000..75dd0ac9 --- /dev/null +++ b/regression-test/data/db_sync/table/recover2/test_ds_tbl_drop_recover2.out @@ -0,0 +1,77 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content_2 -- +0 0 +0 1 +0 2 +10 0 +10 1 +10 2 +11 0 +11 1 +11 2 +12 0 +12 1 +12 2 +13 0 +13 1 +13 2 +14 0 +14 1 +14 2 +15 0 +15 1 +15 2 +16 0 +16 1 +16 2 +17 0 +17 1 +17 2 +18 0 +18 1 +18 2 +19 0 +19 1 +19 2 +20 0 +20 1 +20 2 + +-- !sql_source_content_2 -- +0 0 +0 1 +0 2 +10 0 +10 1 +10 2 +11 0 +11 1 +11 2 +12 0 +12 1 +12 2 +13 0 +13 1 +13 2 +14 0 +14 1 +14 2 +15 0 +15 1 +15 2 +16 0 +16 1 +16 2 +17 0 +17 1 +17 2 +18 0 +18 1 +18 2 +19 0 +19 1 +19 2 +20 0 +20 1 +20 2 + diff --git a/regression-test/data/db_sync/table/recover3/test_ds_tbl_drop_recover3.out b/regression-test/data/db_sync/table/recover3/test_ds_tbl_drop_recover3.out new file mode 100644 index 00000000..75dd0ac9 --- /dev/null +++ b/regression-test/data/db_sync/table/recover3/test_ds_tbl_drop_recover3.out @@ -0,0 +1,77 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content_2 -- +0 0 +0 1 +0 2 +10 0 +10 1 +10 2 +11 0 +11 1 +11 2 +12 0 +12 1 +12 2 +13 0 +13 1 +13 2 +14 0 +14 1 +14 2 +15 0 +15 1 +15 2 +16 0 +16 1 +16 2 +17 0 +17 1 +17 2 +18 0 +18 1 +18 2 +19 0 +19 1 +19 2 +20 0 +20 1 +20 2 + +-- !sql_source_content_2 -- +0 0 +0 1 +0 2 +10 0 +10 1 +10 2 +11 0 +11 1 +11 2 +12 0 +12 1 +12 2 +13 0 +13 1 +13 2 +14 0 +14 1 +14 2 +15 0 +15 1 +15 2 +16 0 +16 1 +16 2 +17 0 +17 1 +17 2 +18 0 +18 1 +18 2 +19 0 +19 1 +19 2 +20 0 +20 1 +20 2 + diff --git a/regression-test/suites/db_sync/table/recover2/test_ds_tbl_drop_recover2.groovy b/regression-test/suites/db_sync/table/recover2/test_ds_tbl_drop_recover2.groovy new file mode 100644 index 00000000..1d677d57 --- /dev/null +++ b/regression-test/suites/db_sync/table/recover2/test_ds_tbl_drop_recover2.groovy @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_ds_tbl_drop_recover2") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_recover" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "less" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.ccrJobDelete() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName}_1 + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_0` VALUES LESS THAN ("0"), + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + + + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + + sql """ + DROP TABLE ${tableName}_1 + """ + helper.enableDbBinlog() + helper.ccrJobCreate() + int interations = 10; + for(int t = 0; t <= interations; t += 1){ + /* first iteration already deleted */ + sql """ + DROP TABLE if exists ${tableName}_1 + """ + + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, notExist, 60, "sql")) // check recovered in local + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, notExist, 60, "target")) + + sql """ + RECOVER TABLE ${tableName}_1 + """ + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "sql")) // check recovered in local + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "target")) // check recovered in target + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_1", 60)) + + test_num = t + 10; + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + // need check restore, + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_1", 60)) + // check in remote available. + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_1 WHERE test=${test_num}", + insert_num, 30)) + + } + order_qt_target_sql_content_2("SELECT * FROM ${tableName}_1") + qt_sql_source_content_2("SELECT * FROM ${tableName}_1") +} diff --git a/regression-test/suites/db_sync/table/recover3/test_ds_tbl_drop_recover3.groovy b/regression-test/suites/db_sync/table/recover3/test_ds_tbl_drop_recover3.groovy new file mode 100644 index 00000000..ad042fec --- /dev/null +++ b/regression-test/suites/db_sync/table/recover3/test_ds_tbl_drop_recover3.groovy @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_ds_tbl_drop_recover3") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_recover" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "less" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.disableDbBinlog(); + helper.ccrJobDelete() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName}_1 + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_0` VALUES LESS THAN ("0"), + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "false" + ) + """ + + + + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + + sql """ + DROP TABLE ${tableName}_1 + """ + helper.enableDbBinlog() + helper.ccrJobCreate() + int interations = 10; + for(int t = 0; t <= interations; t += 1){ + /* first iteration already deleted */ + sql """ + DROP TABLE if exists ${tableName}_1 + """ + sql """ + RECOVER TABLE ${tableName}_1 + """ + test_num = t + 10; + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + } + // before validate, lets see restore is ok or not in target. + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_1", 60)) + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_1",36, 30)) + + order_qt_target_sql_content_2("SELECT * FROM ${tableName}_1") + qt_sql_source_content_2("SELECT * FROM ${tableName}_1") +}