From 2a6113aa181e30d52502c553333505655d77281a Mon Sep 17 00:00:00 2001 From: Vallish Date: Sun, 1 Dec 2024 16:08:27 +0000 Subject: [PATCH] [feat](binlog) Add Support recover binlog --- pkg/ccr/base/spec.go | 34 ---- pkg/ccr/base/specer.go | 2 - pkg/ccr/job.go | 23 ++- .../{recover_table.go => recover_info.go} | 0 .../recover/test_ds_part_recover.groovy | 166 ++++++++++++++++++ .../recover1/test_ds_part_recover_new.groovy | 166 ++++++++++++++++++ .../test_ds_tbl_drop_recover_new.groovy | 96 ++++++++++ .../recover/test_tbl_part_recover.groovy | 108 ++++++++++++ .../recover1/test_tbl_part_recover_new.groovy | 108 ++++++++++++ 9 files changed, 662 insertions(+), 41 deletions(-) rename pkg/ccr/record/{recover_table.go => recover_info.go} (100%) create mode 100644 regression-test/suites/db_sync/partition/recover/test_ds_part_recover.groovy create mode 100644 regression-test/suites/db_sync/partition/recover1/test_ds_part_recover_new.groovy create mode 100644 regression-test/suites/db_sync/table/recover1/test_ds_tbl_drop_recover_new.groovy create mode 100644 regression-test/suites/table_sync/partition/recover/test_tbl_part_recover.groovy create mode 100644 regression-test/suites/table_sync/partition/recover1/test_tbl_part_recover_new.groovy diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index e85f48e4..4b2aa33b 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -534,40 +534,6 @@ func (s *Spec) RenameTable(destTableName string, renameTable *record.RenameTable return s.DbExec(sql) } -func (s *Spec) RecoverInfo(destTableName string, recoverInfo *record.RecoverInfo) error { - destTableName = utils.FormatKeywordName(destTableName) - // recover info may be 'recover table', 'recover partition' - var sql string - // RECOVER PARTITION p1 from table_name - // RECOVER PARTITION p1 as p2 from table_name - if recoverInfo.PartitionName != "" { - partitionName := utils.FormatKeywordName(recoverInfo.PartitionName) - if recoverInfo.NewPartitionName != "" { - newPartitionName := utils.FormatKeywordName(recoverInfo.NewPartitionName) - sql = fmt.Sprintf("RECOVER PARTITION %s as %s from %s ", partitionName, newPartitionName, destTableName) - } else { - sql = fmt.Sprintf("RECOVER PARTITION %s from %s ", partitionName, destTableName) - } - log.Infof("recover info sql: %s", sql) - return s.DbExec(sql) - } - // RECOVER TABLE table_name - // RECOVER table table_name as new_table - if recoverInfo.TableName != "" { - if recoverInfo.NewTableName != "" { - newTableName := utils.FormatKeywordName(recoverInfo.NewTableName) - sql = fmt.Sprintf("RECOVER TABLE %s as %s", destTableName, newTableName) - } else { - sql = fmt.Sprintf("RECOVER TABLE %s", destTableName) - } - log.Infof("recover info sql: %s", sql) - return s.DbExec(sql) - } - - // this can come for recover db case. we dont support it currently - return xerror.Errorf(xerror.Normal, "Recover info not currently supported") -} - func (s *Spec) RenameTableWithName(oldName, newName string) error { oldName = utils.FormatKeywordName(oldName) newName = utils.FormatKeywordName(newName) diff --git a/pkg/ccr/base/specer.go b/pkg/ccr/base/specer.go index 652d2d0c..d90a2064 100644 --- a/pkg/ccr/base/specer.go +++ b/pkg/ccr/base/specer.go @@ -57,8 +57,6 @@ type Specer interface { RenameRollup(destTableName, oldRollup, newRollup string) error DropRollup(destTableName, rollupName string) error - RecoverInfo(destTableName string, recoverInfo *record.RecoverInfo) error - DesyncTables(tables ...string) error utils.Subject[SpecEvent] diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index b2f15db4..79011c67 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -2592,13 +2592,26 @@ func (j *Job) handleRecoverInfoRecord(commitSeq int64, recoverInfo *record.Recov return nil } - var destTableName string - if j.SyncType == TableSync { - destTableName = j.Dest.Table + if recoverInfo.PartitionName == "" { + log.Infof("recover info with for table level, will trigger partial sync") + var tableName string + if recoverInfo.NewTableName != "" { + tableName = recoverInfo.NewTableName + } else { + tableName = recoverInfo.TableName + } + return j.newPartialSnapshot(recoverInfo.TableId, tableName, nil, true) + } + + log.Infof("recover info with for partition level, will trigger partial sync") + var partitions []string + if recoverInfo.NewPartitionName != "" { + partitions = append(partitions, recoverInfo.NewPartitionName) } else { - destTableName = recoverInfo.TableName + partitions = append(partitions, recoverInfo.PartitionName) } - return j.IDest.RecoverInfo(destTableName, recoverInfo) + // here we are intrested only in sync on partition level. so replace parameter passed as false. + return j.newPartialSnapshot(recoverInfo.TableId, recoverInfo.TableName, partitions, false) } func (j *Job) handleBarrier(binlog *festruct.TBinlog) error { diff --git a/pkg/ccr/record/recover_table.go b/pkg/ccr/record/recover_info.go similarity index 100% rename from pkg/ccr/record/recover_table.go rename to pkg/ccr/record/recover_info.go diff --git a/regression-test/suites/db_sync/partition/recover/test_ds_part_recover.groovy b/regression-test/suites/db_sync/partition/recover/test_ds_part_recover.groovy new file mode 100644 index 00000000..6336cd11 --- /dev/null +++ b/regression-test/suites/db_sync/partition/recover/test_ds_part_recover.groovy @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_ds_part_recover") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_" + helper.randomSuffix() + def test_num = 0 + def insert_num = 90 // insert into last partition + def opPartitonName = "less" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.enableDbBinlog() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_0` VALUES LESS THAN ("0"), + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("10"), + PARTITION `${opPartitonName}_2` VALUES LESS THAN ("20"), + PARTITION `${opPartitonName}_3` VALUES LESS THAN ("30"), + PARTITION `${opPartitonName}_4` VALUES LESS THAN ("40"), + PARTITION `${opPartitonName}_5` VALUES LESS THAN ("50"), + PARTITION `${opPartitonName}_6` VALUES LESS THAN ("60"), + PARTITION `${opPartitonName}_7` VALUES LESS THAN ("70"), + PARTITION `${opPartitonName}_8` VALUES LESS THAN ("80"), + PARTITION `${opPartitonName}_9` VALUES LESS THAN ("90") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + + logger.info("=== Test 1: Check partitions in src before sync case ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_9\" + """, + exist, 30, "target")) + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_8\" + """, + exist, 30, "target")) + + // save the backup num of source cluster + def show_backup_result = sql "SHOW BACKUP" + def backup_num = show_backup_result.size() + logger.info("backups before drop partition: ${show_backup_result}") + + logger.info("=== Test 2: Insert data in valid partitions case ===") + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + + logger.info("=== Test 3: pause ===") + + helper.ccrJobPause() + + test_num = 4 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + + sql "sync" + + logger.info("=== Test 4: Drop partitions case ===") + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_9 + """ + + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_8 + """ + sql "sync" + + logger.info("=== Test 5: pause and verify ===") + + helper.ccrJobResume() + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_9\" + """, + notExist, 30, "target")) + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_8\" + """, + notExist, 30, "target")) + + logger.info("=== Test 5: recover a partitions ===") + + sql """ + RECOVER PARTITION ${opPartitonName}_9 from ${tableName} + """ + + logger.info("=== Test 56: check partition recovered ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_9\" + """, + exist, 30, "target")) + + + sql """ + RECOVER PARTITION ${opPartitonName}_8 from ${tableName} + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_8\" + """, + exist, 30, "target")) +} + + diff --git a/regression-test/suites/db_sync/partition/recover1/test_ds_part_recover_new.groovy b/regression-test/suites/db_sync/partition/recover1/test_ds_part_recover_new.groovy new file mode 100644 index 00000000..9e813694 --- /dev/null +++ b/regression-test/suites/db_sync/partition/recover1/test_ds_part_recover_new.groovy @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_ds_part_recover_new") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_" + helper.randomSuffix() + def test_num = 0 + def insert_num = 90 // insert into last partition + def opPartitonName = "less" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.enableDbBinlog() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_0` VALUES LESS THAN ("0"), + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("10"), + PARTITION `${opPartitonName}_2` VALUES LESS THAN ("20"), + PARTITION `${opPartitonName}_3` VALUES LESS THAN ("30"), + PARTITION `${opPartitonName}_4` VALUES LESS THAN ("40"), + PARTITION `${opPartitonName}_5` VALUES LESS THAN ("50"), + PARTITION `${opPartitonName}_6` VALUES LESS THAN ("60"), + PARTITION `${opPartitonName}_7` VALUES LESS THAN ("70"), + PARTITION `${opPartitonName}_8` VALUES LESS THAN ("80"), + PARTITION `${opPartitonName}_9` VALUES LESS THAN ("90") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + + logger.info("=== Test 1: Check partitions in src before sync case ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_9\" + """, + exist, 30, "target")) + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_8\" + """, + exist, 30, "target")) + + // save the backup num of source cluster + def show_backup_result = sql "SHOW BACKUP" + def backup_num = show_backup_result.size() + logger.info("backups before drop partition: ${show_backup_result}") + + logger.info("=== Test 2: Insert data in valid partitions case ===") + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + + logger.info("=== Test 3: pause ===") + + helper.ccrJobPause() + + test_num = 4 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + + sql "sync" + + logger.info("=== Test 4: Drop partitions case ===") + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_9 + """ + + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName}_8 + """ + sql "sync" + + logger.info("=== Test 5: pause and verify ===") + + helper.ccrJobResume() + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_9\" + """, + notExist, 30, "target")) + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_8\" + """, + notExist, 30, "target")) + + logger.info("=== Test 5: recover a partitions ===") + + sql """ + RECOVER PARTITION ${opPartitonName}_9 as ${opPartitonName}_90 from ${tableName} + """ + + logger.info("=== Test 56: check partition recovered ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_90\" + """, + exist, 30, "target")) + + + sql """ + RECOVER PARTITION ${opPartitonName}_8 as ${opPartitonName}_80 from ${tableName} + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_80\" + """, + exist, 30, "target")) +} + + diff --git a/regression-test/suites/db_sync/table/recover1/test_ds_tbl_drop_recover_new.groovy b/regression-test/suites/db_sync/table/recover1/test_ds_tbl_drop_recover_new.groovy new file mode 100644 index 00000000..691f0ae0 --- /dev/null +++ b/regression-test/suites/db_sync/table/recover1/test_ds_tbl_drop_recover_new.groovy @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_ds_tbl_drop_recover_new") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_recover" + helper.randomSuffix() + def test_num = 0 + def insert_num = 10 + def opPartitonName = "less" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + helper.enableDbBinlog() + + sql """ + CREATE TABLE if NOT EXISTS ${tableName}_1 + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}_0` VALUES LESS THAN ("0"), + PARTITION `${opPartitonName}_1` VALUES LESS THAN ("1000") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_1", 60)) + + logger.info("=== Test 1: Check table and backup size ===") + sql "sync" + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_1" """, exist, 60, "target")) + + helper.ccrJobPause() + + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName}_1 VALUES (${test_num}, ${index}) + """ + } + + sql """ + DROP TABLE ${tableName}_1 + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "sql")) + + logger.info("=== Test 5: Resume and verify ===") + helper.ccrJobResume() + + assertTrue(helper.checkShowTimesOf(""" + SHOW TABLES LIKE "${tableName}_1" + """, + notExist, 60, "target")) + + // not both source and target dont have this table. it should be in recycle bin. + // lets try recover. + sql """ + RECOVER TABLE ${tableName}_1 as ${tableName}_2 + """ + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_2" """, exist, 60, "sql")) // check recovered in local + assertTrue(helper.checkShowTimesOf(""" SHOW TABLES LIKE "${tableName}_2" """, exist, 60, "target")) // check recovered in target +} diff --git a/regression-test/suites/table_sync/partition/recover/test_tbl_part_recover.groovy b/regression-test/suites/table_sync/partition/recover/test_tbl_part_recover.groovy new file mode 100644 index 00000000..a1672a41 --- /dev/null +++ b/regression-test/suites/table_sync/partition/recover/test_tbl_part_recover.groovy @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_tbl_part_recover") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_" + helper.randomSuffix() + def test_num = 0 + def insert_num = 5 + def opPartitonName = "less0" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}` VALUES LESS THAN ("100") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobCreate(tableName) + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + + logger.info("=== Test 1: Check partitions in src before sync case ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}\" + """, + exist, 30, "target")) + + + + + logger.info("=== Test 3: Insert data in valid partitions case ===") + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + + + logger.info("=== Test 4: Drop partitions case ===") + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName} + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}\" + """, + notExist, 30, "target")) + + logger.info("=== Test 4: recover partitions case ===") + sql """ + RECOVER PARTITION ${opPartitonName} from ${tableName} + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}\" + """, + exist, 30, "target")) + + def resSql = target_sql "SELECT * FROM ${tableName} WHERE test=3" + assertTrue(resSql.size() == 0) +} diff --git a/regression-test/suites/table_sync/partition/recover1/test_tbl_part_recover_new.groovy b/regression-test/suites/table_sync/partition/recover1/test_tbl_part_recover_new.groovy new file mode 100644 index 00000000..8c401a70 --- /dev/null +++ b/regression-test/suites/table_sync/partition/recover1/test_tbl_part_recover_new.groovy @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_tbl_part_recover_new") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def tableName = "tbl_" + helper.randomSuffix() + def test_num = 0 + def insert_num = 5 + def opPartitonName = "less0" + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + UNIQUE KEY(`test`, `id`) + PARTITION BY RANGE(`id`) + ( + PARTITION `${opPartitonName}` VALUES LESS THAN ("100") + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobCreate(tableName) + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + + logger.info("=== Test 1: Check partitions in src before sync case ===") + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}\" + """, + exist, 30, "target")) + + + + + logger.info("=== Test 3: Insert data in valid partitions case ===") + test_num = 3 + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${tableName} VALUES (${test_num}, ${index}) + """ + } + sql "sync" + assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName} WHERE test=${test_num}", + insert_num, 30)) + + + + logger.info("=== Test 4: Drop partitions case ===") + sql """ + ALTER TABLE ${tableName} + DROP PARTITION IF EXISTS ${opPartitonName} + """ + + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}\" + """, + notExist, 30, "target")) + + logger.info("=== Test 4: recover partitions case ===") + sql """ + RECOVER PARTITION ${opPartitonName} as ${opPartitonName}_1 from ${tableName} + """ + assertTrue(helper.checkShowTimesOf(""" + SHOW PARTITIONS + FROM TEST_${context.dbName}.${tableName} + WHERE PartitionName = \"${opPartitonName}_1\" + """, + exist, 30, "target")) + + def resSql = target_sql "SELECT * FROM ${tableName} WHERE test=3" + assertTrue(resSql.size() == 0) +}