From 974c3955f20c635db307ebecd5fd5d50ba2de097 Mon Sep 17 00:00:00 2001 From: Vallish Date: Sat, 30 Nov 2024 13:28:55 +0000 Subject: [PATCH] [feat](binlog) Add Support recover binlog --- pkg/ccr/job.go | 50 ++++++ pkg/ccr/record/recover_info.go | 42 +++++ .../frontendservice/FrontendService.go | 12 +- pkg/rpc/thrift/FrontendService.thrift | 4 +- .../recover/test_ds_part_recover.out | 17 ++ .../recover1/test_ds_part_recover_new.out | 17 ++ .../recover/test_ds_tbl_drop_recover.out | 39 +++++ .../recover1/test_ds_tbl_drop_recover_new.out | 39 +++++ .../recover/test_tbl_part_recover.out | 17 ++ .../recover1/test_tbl_part_recover_new.out | 17 ++ .../recover/test_ds_part_recover.groovy | 147 +++++++++++++++++ .../recover1/test_ds_part_recover_new.groovy | 147 +++++++++++++++++ .../recover/test_ds_tbl_drop_recover.groovy | 150 ++++++++++++++++++ .../test_ds_tbl_drop_recover_new.groovy | 143 +++++++++++++++++ .../recover/test_tbl_part_recover.groovy | 145 +++++++++++++++++ .../recover1/test_tbl_part_recover_new.groovy | 144 +++++++++++++++++ 16 files changed, 1122 insertions(+), 8 deletions(-) create mode 100644 pkg/ccr/record/recover_info.go create mode 100644 regression-test/data/db_sync/partition/recover/test_ds_part_recover.out create mode 100644 regression-test/data/db_sync/partition/recover1/test_ds_part_recover_new.out create mode 100644 regression-test/data/db_sync/table/recover/test_ds_tbl_drop_recover.out create mode 100644 regression-test/data/db_sync/table/recover1/test_ds_tbl_drop_recover_new.out create mode 100644 regression-test/data/table_sync/partition/recover/test_tbl_part_recover.out create mode 100644 regression-test/data/table_sync/partition/recover1/test_tbl_part_recover_new.out create mode 100644 regression-test/suites/db_sync/partition/recover/test_ds_part_recover.groovy create mode 100644 regression-test/suites/db_sync/partition/recover1/test_ds_part_recover_new.groovy create mode 100644 regression-test/suites/db_sync/table/recover/test_ds_tbl_drop_recover.groovy create mode 100644 regression-test/suites/db_sync/table/recover1/test_ds_tbl_drop_recover_new.groovy create mode 100644 regression-test/suites/table_sync/partition/recover/test_tbl_part_recover.groovy create mode 100644 regression-test/suites/table_sync/partition/recover1/test_tbl_part_recover_new.groovy diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 4f715922..bcc28b36 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -2567,6 +2567,48 @@ func (j *Job) handleDropRollupRecord(commitSeq int64, dropRollup *record.DropRol return j.IDest.DropRollup(destTableName, dropRollup.IndexName) } +func (j *Job) handleRecoverInfo(binlog *festruct.TBinlog) error { + log.Infof("handle recoverInfo binlog, prevCommitSeq: %d, commitSeq: %d", + j.progress.PrevCommitSeq, j.progress.CommitSeq) + + data := binlog.GetData() + recoverInfo, err := record.NewRecoverInfoFromJson(data) + if err != nil { + return err + } + + return j.handleRecoverInfoRecord(binlog.GetCommitSeq(), recoverInfo) +} + +func (j *Job) handleRecoverInfoRecord(commitSeq int64, recoverInfo *record.RecoverInfo) error { + if j.isBinlogCommitted(recoverInfo.TableId, commitSeq) { + return nil + } + + if recoverInfo.PartitionName == "" { + var tableName string + if recoverInfo.NewTableName != "" { + tableName = recoverInfo.NewTableName + } else { + tableName = recoverInfo.TableName + } + log.Infof("recover info with for table %s, will trigger partial sync", tableName) + return j.newPartialSnapshot(recoverInfo.TableId, tableName, nil, true) + } + + var partitions []string + if recoverInfo.NewPartitionName != "" { + partitions = append(partitions, recoverInfo.NewPartitionName) + } else { + partitions = append(partitions, recoverInfo.PartitionName) + } + log.Infof("recover info with for partition(%s) for table %s, will trigger partial sync", + partitions, recoverInfo.TableName) + // if source does multiple recover of partition, then there is a race + // condition and some recover might miss due to commitseq change after snapshot. + return j.newPartialSnapshot(recoverInfo.TableId, recoverInfo.TableName, nil, true) +} + func (j *Job) handleBarrier(binlog *festruct.TBinlog) error { data := binlog.GetData() barrierLog, err := record.NewBarrierLogFromJson(data) @@ -2645,6 +2687,12 @@ func (j *Job) handleBarrier(binlog *festruct.TBinlog) error { return err } return j.handleModifyCommentRecord(commitSeq, modifyComment) + case festruct.TBinlogType_RECOVER_INFO: + recoverInfo, err := record.NewRecoverInfoFromJson(barrierLog.Binlog) + if err != nil { + return err + } + return j.handleRecoverInfoRecord(commitSeq, recoverInfo) case festruct.TBinlogType_BARRIER: log.Info("handle barrier binlog, ignore it") default: @@ -2757,6 +2805,8 @@ func (j *Job) handleBinlog(binlog *festruct.TBinlog) error { return j.handleRenameRollup(binlog) case festruct.TBinlogType_DROP_ROLLUP: return j.handleDropRollup(binlog) + case festruct.TBinlogType_RECOVER_INFO: + return j.handleRecoverInfo(binlog) default: return xerror.Errorf(xerror.Normal, "unknown binlog type: %v", binlog.GetType()) } diff --git a/pkg/ccr/record/recover_info.go b/pkg/ccr/record/recover_info.go new file mode 100644 index 00000000..aecd71c7 --- /dev/null +++ b/pkg/ccr/record/recover_info.go @@ -0,0 +1,42 @@ +package record + +import ( + "encoding/json" + "fmt" + + "github.com/selectdb/ccr_syncer/pkg/xerror" +) + +type RecoverInfo struct { + DbId int64 `json:"dbId"` + NewDbName string `json:"newDbName"` + TableId int64 `json:"tableId"` + TableName string `json:"tableName"` + NewTableName string `json:"newTableName"` + PartitionId int64 `json:"partitionId"` + PartitionName string `json:"partitionName"` + NewPartitionName string `json:"newPartitionName"` +} + +func NewRecoverInfoFromJson(data string) (*RecoverInfo, error) { + var recoverInfo RecoverInfo + err := json.Unmarshal([]byte(data), &recoverInfo) + if err != nil { + return nil, xerror.Wrap(err, xerror.Normal, "unmarshal create table error") + } + + if recoverInfo.TableId == 0 { + return nil, xerror.Errorf(xerror.Normal, "table id not found") + } + + /* need check for db level not supported. + */ + + return &recoverInfo, nil +} + +// String +func (c *RecoverInfo) String() string { + return fmt.Sprintf("RecoverInfo: DbId: %d, NewDbName: %s, TableId: %d, TableName: %s, NewTableName: %s, PartitionId: %d, PartitionName: %s, NewPartitionName: %s", + c.DbId, c.NewDbName, c.TableId, c.TableName, c.NewTableName, c.PartitionId, c.PartitionName, c.NewPartitionName) +} diff --git a/pkg/rpc/kitex_gen/frontendservice/FrontendService.go b/pkg/rpc/kitex_gen/frontendservice/FrontendService.go index ec4200b4..dd8cca03 100644 --- a/pkg/rpc/kitex_gen/frontendservice/FrontendService.go +++ b/pkg/rpc/kitex_gen/frontendservice/FrontendService.go @@ -637,8 +637,8 @@ const ( TBinlogType_RENAME_ROLLUP TBinlogType = 21 TBinlogType_RENAME_PARTITION TBinlogType = 22 TBinlogType_DROP_ROLLUP TBinlogType = 23 - TBinlogType_MIN_UNKNOWN TBinlogType = 24 - TBinlogType_UNKNOWN_9 TBinlogType = 25 + TBinlogType_RECOVER_INFO TBinlogType = 24 + TBinlogType_MIN_UNKNOWN TBinlogType = 25 TBinlogType_UNKNOWN_10 TBinlogType = 26 TBinlogType_UNKNOWN_11 TBinlogType = 27 TBinlogType_UNKNOWN_12 TBinlogType = 28 @@ -782,10 +782,10 @@ func (p TBinlogType) String() string { return "RENAME_PARTITION" case TBinlogType_DROP_ROLLUP: return "DROP_ROLLUP" + case TBinlogType_RECOVER_INFO: + return "RECOVER_INFO" case TBinlogType_MIN_UNKNOWN: return "MIN_UNKNOWN" - case TBinlogType_UNKNOWN_9: - return "UNKNOWN_9" case TBinlogType_UNKNOWN_10: return "UNKNOWN_10" case TBinlogType_UNKNOWN_11: @@ -1022,10 +1022,10 @@ func TBinlogTypeFromString(s string) (TBinlogType, error) { return TBinlogType_RENAME_PARTITION, nil case "DROP_ROLLUP": return TBinlogType_DROP_ROLLUP, nil + case "RECOVER_INFO": + return TBinlogType_RECOVER_INFO, nil case "MIN_UNKNOWN": return TBinlogType_MIN_UNKNOWN, nil - case "UNKNOWN_9": - return TBinlogType_UNKNOWN_9, nil case "UNKNOWN_10": return TBinlogType_UNKNOWN_10, nil case "UNKNOWN_11": diff --git a/pkg/rpc/thrift/FrontendService.thrift b/pkg/rpc/thrift/FrontendService.thrift index e2af8937..b0af81db 100644 --- a/pkg/rpc/thrift/FrontendService.thrift +++ b/pkg/rpc/thrift/FrontendService.thrift @@ -1197,7 +1197,7 @@ enum TBinlogType { RENAME_ROLLUP = 21, RENAME_PARTITION = 22, DROP_ROLLUP = 23, - + RECOVER_INFO = 24, // Keep some IDs for allocation so that when new binlog types are added in the // future, the changes can be picked back to the old versions without breaking // compatibility. @@ -1213,7 +1213,7 @@ enum TBinlogType { // MODIFY_XXX = 17, // MIN_UNKNOWN = 18, // UNKNOWN_3 = 19, - MIN_UNKNOWN = 24, + MIN_UNKNOWN = 25, UNKNOWN_9 = 25, UNKNOWN_10 = 26, UNKNOWN_11 = 27, diff --git a/regression-test/data/db_sync/partition/recover/test_ds_part_recover.out b/regression-test/data/db_sync/partition/recover/test_ds_part_recover.out new file mode 100644 index 00000000..44d17364 --- /dev/null +++ b/regression-test/data/db_sync/partition/recover/test_ds_part_recover.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + +-- !sql_source_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + diff --git a/regression-test/data/db_sync/partition/recover1/test_ds_part_recover_new.out b/regression-test/data/db_sync/partition/recover1/test_ds_part_recover_new.out new file mode 100644 index 00000000..44d17364 --- /dev/null +++ b/regression-test/data/db_sync/partition/recover1/test_ds_part_recover_new.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + +-- !sql_source_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + diff --git a/regression-test/data/db_sync/table/recover/test_ds_tbl_drop_recover.out b/regression-test/data/db_sync/table/recover/test_ds_tbl_drop_recover.out new file mode 100644 index 00000000..28abc0ba --- /dev/null +++ b/regression-test/data/db_sync/table/recover/test_ds_tbl_drop_recover.out @@ -0,0 +1,39 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 + +-- !sql_source_content -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 + +-- !target_sql_content_2 -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 +3 0 +3 1 +3 2 + +-- !sql_source_content_2 -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 +3 0 +3 1 +3 2 + diff --git a/regression-test/data/db_sync/table/recover1/test_ds_tbl_drop_recover_new.out b/regression-test/data/db_sync/table/recover1/test_ds_tbl_drop_recover_new.out new file mode 100644 index 00000000..28abc0ba --- /dev/null +++ b/regression-test/data/db_sync/table/recover1/test_ds_tbl_drop_recover_new.out @@ -0,0 +1,39 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 + +-- !sql_source_content -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 + +-- !target_sql_content_2 -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 +3 0 +3 1 +3 2 + +-- !sql_source_content_2 -- +0 0 +0 1 +0 2 +2 0 +2 1 +2 2 +3 0 +3 1 +3 2 + diff --git a/regression-test/data/table_sync/partition/recover/test_tbl_part_recover.out b/regression-test/data/table_sync/partition/recover/test_tbl_part_recover.out new file mode 100644 index 00000000..44d17364 --- /dev/null +++ b/regression-test/data/table_sync/partition/recover/test_tbl_part_recover.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + +-- !sql_source_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + diff --git a/regression-test/data/table_sync/partition/recover1/test_tbl_part_recover_new.out b/regression-test/data/table_sync/partition/recover1/test_tbl_part_recover_new.out new file mode 100644 index 00000000..44d17364 --- /dev/null +++ b/regression-test/data/table_sync/partition/recover1/test_tbl_part_recover_new.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_sql_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + +-- !sql_source_content -- +3 0 +3 1 +3 2 +5 0 +5 1 +5 2 + diff --git a/regression-test/suites/db_sync/partition/recover/test_ds_part_recover.groovy b/regression-test/suites/db_sync/partition/recover/test_ds_part_recover.groovy new file mode 100644 index 00000000..44350930 --- /dev/null +++ b/regression-test/suites/db_sync/partition/recover/test_ds_part_recover.groovy @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_ds_part_recover") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "part" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.enableDbBinlog() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("10"), + PARTITION `${opPartitonName}_2` VALUES LESS THAN ("100") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + + logger.info("=== Test 1: Check partitions in src before sync case ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + exist, 30, "target")) + + + + logger.info("=== Test 3: Insert data in valid partitions case ===") + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + + + logger.info("=== Test 4: Drop partitions case ===") + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_1 + """ + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_2 + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + notExist, 30, "target")) + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + notExist, 30, "target")) + + logger.info("=== Test 4: recover partitions case ===") + sql """ + RECOVER PARTITION ${opPartitonName}_1 from ${tableName} + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + sql """ + RECOVER PARTITION ${opPartitonName}_2 from ${tableName} + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + exist, 30, "target")) + + test_num = 5 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + order_qt_target_sql_content("SELECT * FROM ${tableName}") + order_qt_sql_source_content("SELECT * FROM ${tableName}") +} diff --git a/regression-test/suites/db_sync/partition/recover1/test_ds_part_recover_new.groovy b/regression-test/suites/db_sync/partition/recover1/test_ds_part_recover_new.groovy new file mode 100644 index 00000000..dca2b8ca --- /dev/null +++ b/regression-test/suites/db_sync/partition/recover1/test_ds_part_recover_new.groovy @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_ds_part_recover_new") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "part" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.enableDbBinlog() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("10"), + PARTITION `${opPartitonName}_2` VALUES LESS THAN ("100") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + + logger.info("=== Test 1: Check partitions in src before sync case ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + exist, 30, "target")) + + + + logger.info("=== Test 3: Insert data in valid partitions case ===") + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + + + logger.info("=== Test 4: Drop partitions case ===") + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_1 + """ + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_2 + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + notExist, 30, "target")) + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + notExist, 30, "target")) + + logger.info("=== Test 4: recover partitions case ===") + sql """ + RECOVER PARTITION ${opPartitonName}_1 as ${opPartitonName}_11 from ${tableName} + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_11\" + """, + exist, 30, "target")) + sql """ + RECOVER PARTITION ${opPartitonName}_2 as ${opPartitonName}_21 from ${tableName} + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_21\" + """, + exist, 30, "target")) + + test_num = 5 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + order_qt_target_sql_content("SELECT * FROM ${tableName}") + order_qt_sql_source_content("SELECT * FROM ${tableName}") +} diff --git a/regression-test/suites/db_sync/table/recover/test_ds_tbl_drop_recover.groovy b/regression-test/suites/db_sync/table/recover/test_ds_tbl_drop_recover.groovy new file mode 100644 index 00000000..e91da172 --- /dev/null +++ b/regression-test/suites/db_sync/table/recover/test_ds_tbl_drop_recover.groovy @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_ds_tbl_drop_recover") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_recover" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "less" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.enableDbBinlog() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName}_1 + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_0` VALUES LESS THAN ("0"), + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_1", 60)) + + logger.info("=== Test 1: Check table and backup size ===") + sql "sync" + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "target")) + + helper.ccrJobPause() + + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + + sql """ + DROP TABLE ${tableName}_1 + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "sql")) + + logger.info("=== Test 5: Resume and verify ===") + helper.ccrJobResume() + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "target")) + + // not both source and target dont have this table. it should be in recycle bin. + // lets try recover. + helper.ccrJobPause() + sql """ + RECOVER TABLE ${tableName}_1 + """ + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "sql")) // check recovered in local + helper.ccrJobResume() + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "target")) // check recovered in target + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_1", 60)) + + def resSql = target_sql "SELECT * FROM ${tableName}_1 WHERE test=0" + def resSrcSql = sql "SELECT * FROM ${context.dbName}.${tableName}_1 WHERE test=0" + logger.info("=== {} vs {} ===", resSql.size(), resSrcSql.size()) + assertTrue(resSql.size() == resSrcSql.size()) + + test_num = 2 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_1 WHERE test=${test_num}", + insert_num, 30)) + + qt_target_sql_content("SELECT * FROM ${tableName}_1") + qt_sql_source_content("SELECT * FROM ${tableName}_1") + + logger.info("=== Test 6: Drop again and try recover and insert ===") + sql """ + DROP TABLE ${tableName}_1 + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "sql")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "target")) + + // not both source and target dont have this table. it should be in recycle bin. + sql """ + RECOVER TABLE ${tableName}_1 + """ + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "sql")) // check recovered in local + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "target")) // check recovered in target + + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_1 WHERE test=${test_num}", + insert_num, 30)) + order_qt_target_sql_content_2("SELECT * FROM ${tableName}_1") + order_qt_sql_source_content_2("SELECT * FROM ${tableName}_1") +} diff --git a/regression-test/suites/db_sync/table/recover1/test_ds_tbl_drop_recover_new.groovy b/regression-test/suites/db_sync/table/recover1/test_ds_tbl_drop_recover_new.groovy new file mode 100644 index 00000000..11f9965b --- /dev/null +++ b/regression-test/suites/db_sync/table/recover1/test_ds_tbl_drop_recover_new.groovy @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_ds_tbl_drop_recover_new") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_recover" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "part" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.enableDbBinlog() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName}_1 + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_0` VALUES LESS THAN ("0"), + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_1", 60)) + + logger.info("=== Test 1: Check table and backup size ===") + sql "sync" + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "target")) + + helper.ccrJobPause() + + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + + sql """ + DROP TABLE ${tableName}_1 + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "sql")) + + logger.info("=== Test 5: Resume and verify ===") + helper.ccrJobResume() + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "target")) + + // not both source and target dont have this table. it should be in recycle bin. + // lets try recover. + helper.ccrJobPause() + sql """ + RECOVER TABLE ${tableName}_1 as ${tableName}_10 + """ + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_10" """, exist, 60, "sql")) // check recovered in local + helper.ccrJobResume() + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_10" """, exist, 60, "target")) // check recovered in target + + test_num = 2 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_10 VALUES (${test_num}, ${index}) + """ + } + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_10 WHERE test=${test_num}", + insert_num, 30)) + + qt_target_sql_content("SELECT * FROM ${tableName}_10") + qt_sql_source_content("SELECT * FROM ${tableName}_10") + + logger.info("=== Test 6: Drop again and try recover and insert ===") + sql """ + DROP TABLE ${tableName}_10 + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_10" + """, + notExist, 60, "sql")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_10" + """, + notExist, 60, "target")) + + // not both source and target dont have this table. it should be in recycle bin. + sql """ + RECOVER TABLE ${tableName}_10 as ${tableName}_100 + """ + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_100" """, exist, 60, "sql")) // check recovered in local + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_100" """, exist, 60, "target")) // check recovered in target + + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_100 VALUES (${test_num}, ${index}) + """ + } + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_100 WHERE test=${test_num}", + insert_num, 30)) + order_qt_target_sql_content_2("SELECT * FROM ${tableName}_100") + order_qt_sql_source_content_2("SELECT * FROM ${tableName}_100") +} diff --git a/regression-test/suites/table_sync/partition/recover/test_tbl_part_recover.groovy b/regression-test/suites/table_sync/partition/recover/test_tbl_part_recover.groovy new file mode 100644 index 00000000..dda58aaa --- /dev/null +++ b/regression-test/suites/table_sync/partition/recover/test_tbl_part_recover.groovy @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_tbl_part_recover") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "part" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("10"), + PARTITION `${opPartitonName}_2` VALUES LESS THAN ("100") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobCreate(tableName) + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + + logger.info("=== Test 1: Check partitions in src before sync case ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + exist, 30, "target")) + + + + logger.info("=== Test 3: Insert data in valid partitions case ===") + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + + + logger.info("=== Test 4: Drop partitions case ===") + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_1 + """ + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_2 + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + notExist, 30, "target")) + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + notExist, 30, "target")) + + logger.info("=== Test 4: recover partitions case ===") + sql """ + RECOVER PARTITION ${opPartitonName}_1 from ${tableName} + """ + sql """ + RECOVER PARTITION ${opPartitonName}_2 from ${tableName} + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + exist, 30, "target")) + + test_num = 5 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + order_qt_target_sql_content("SELECT * FROM ${tableName}") + order_qt_sql_source_content("SELECT * FROM ${tableName}") +} diff --git a/regression-test/suites/table_sync/partition/recover1/test_tbl_part_recover_new.groovy b/regression-test/suites/table_sync/partition/recover1/test_tbl_part_recover_new.groovy new file mode 100644 index 00000000..1cf9a5ab --- /dev/null +++ b/regression-test/suites/table_sync/partition/recover1/test_tbl_part_recover_new.groovy @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_tbl_part_recover_new") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_" + helper.randomSuffix() + def test_num = 0 + def insert_num = 3 + def opPartitonName = "part" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("10"), + PARTITION `${opPartitonName}_2` VALUES LESS THAN ("100") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobCreate(tableName) + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + + logger.info("=== Test 1: Check partitions in src before sync case ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + exist, 30, "target")) + + + + logger.info("=== Test 3: Insert data in valid partitions case ===") + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + + + logger.info("=== Test 4: Drop partitions case ===") + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_1 + """ + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_2 + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + notExist, 30, "target")) + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_2\" + """, + notExist, 30, "target")) + + logger.info("=== Test 4: recover partitions case ===") + sql """ + RECOVER PARTITION ${opPartitonName}_1 as ${opPartitonName}_11 from ${tableName} + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_11\" + """, + exist, 30, "target")) + sql """ + RECOVER PARTITION ${opPartitonName}_2 as ${opPartitonName}_22 from ${tableName} + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_22\" + """, + exist, 30, "target")) + + test_num = 5 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + order_qt_target_sql_content("SELECT * FROM ${tableName}") + order_qt_sql_source_content("SELECT * FROM ${tableName}") +}