From fbf6cc7ade65a01af90274f976efb373a0a1eecd Mon Sep 17 00:00:00 2001 From: wyxxxcat <1520358997@qq.com> Date: Sun, 1 Dec 2024 23:38:41 +0800 Subject: [PATCH] 1 --- .../apache/doris/analysis/RestoreStmt.java | 12 +++ .../apache/doris/backup/BackupHandler.java | 12 +-- .../org/apache/doris/backup/RestoreJob.java | 18 +++- .../org/apache/doris/catalog/OlapTable.java | 9 +- .../apache/doris/catalog/TableProperty.java | 19 ++++ .../doris/service/FrontendServiceImpl.java | 3 + .../apache/doris/backup/RestoreJobTest.java | 2 +- .../apache/doris/catalog/OlapTableTest.java | 4 +- gensrc/thrift/FrontendService.thrift | 1 + .../test_backup_restore_colocate_with.groovy | 89 +++++++++++++++++++ 10 files changed, 154 insertions(+), 15 deletions(-) create mode 100644 regression-test/suites/backup_restore/test_backup_restore_colocate_with.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java index bc38cfe09e5606..f16ed12e97e43f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java @@ -44,10 +44,12 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars public static final String PROP_CLEAN_TABLES = "clean_tables"; public static final String PROP_CLEAN_PARTITIONS = "clean_partitions"; public static final String PROP_ATOMIC_RESTORE = "atomic_restore"; + public static final String PROP_COLOCATE_WITH = "colocate_with"; private boolean allowLoad = false; private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; private String backupTimestamp = null; + private String colocateWith = null; private int metaVersion = -1; private boolean reserveReplica = false; private boolean reserveDynamicPartitionEnable = false; @@ -123,6 +125,10 @@ public boolean isCleanPartitions() { return isCleanPartitions; } + public String getColocateWith() { + return colocateWith; + } + public boolean isAtomicRestore() { return isAtomicRestore; } @@ -209,6 +215,12 @@ public void analyzeProperties() throws AnalysisException { // is clean partitions isCleanPartitions = eatBooleanProperty(copiedProperties, PROP_CLEAN_PARTITIONS, isCleanPartitions); + // colocate with + if (copiedProperties.containsKey(PROP_COLOCATE_WITH)) { + colocateWith = copiedProperties.get(PROP_COLOCATE_WITH); + copiedProperties.remove(PROP_COLOCATE_WITH); + } + // is atomic restore isAtomicRestore = eatBooleanProperty(copiedProperties, PROP_ATOMIC_RESTORE, isAtomicRestore); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index d70544add98747..8e4b3277c0bb74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -555,14 +555,16 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), - stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), + stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), stmt.getColocateWith(), env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta); } else { restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(), - db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), - stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), - stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), - env, repository.getId()); + db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), + stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), + stmt.reserveDynamicPartitionEnable(), + stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), + stmt.getColocateWith(), + env, repository.getId()); } env.getEditLog().logRestoreJob(restoreJob); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index f1abb0c9e632bc..5e326509aff086 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -123,6 +123,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { private static final String PROP_CLEAN_TABLES = RestoreStmt.PROP_CLEAN_TABLES; private static final String PROP_CLEAN_PARTITIONS = RestoreStmt.PROP_CLEAN_PARTITIONS; private static final String PROP_ATOMIC_RESTORE = RestoreStmt.PROP_ATOMIC_RESTORE; + private static final String PROP_COLOCATE_WITH = RestoreStmt.PROP_COLOCATE_WITH; private static final String ATOMIC_RESTORE_TABLE_PREFIX = "__doris_atomic_restore_prefix__"; private static final Logger LOG = LogManager.getLogger(RestoreJob.class); @@ -210,6 +211,8 @@ public enum RestoreJobState { private boolean isCleanPartitions = false; // Whether to restore the data into a temp table, and then replace the origin one. private boolean isAtomicRestore = false; + // Whether to specify table property colocate_with + private String colocateWith = null; // restore properties @SerializedName("prop") @@ -228,7 +231,7 @@ public RestoreJob(JobType jobType) { public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, - boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) { + boolean isCleanPartitions, boolean isAtomicRestore, String colocateWith, Env env, long repoId) { super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId); this.backupTimestamp = backupTs; this.jobInfo = jobInfo; @@ -246,20 +249,25 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu this.isCleanTables = isCleanTables; this.isCleanPartitions = isCleanPartitions; this.isAtomicRestore = isAtomicRestore; + this.colocateWith = colocateWith; properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica)); properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable)); properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced)); properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables)); properties.put(PROP_CLEAN_PARTITIONS, String.valueOf(isCleanPartitions)); properties.put(PROP_ATOMIC_RESTORE, String.valueOf(isAtomicRestore)); + properties.put(PROP_COLOCATE_WITH, colocateWith); } public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, - boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId, BackupMeta backupMeta) { + boolean isCleanPartitions, boolean isAtomicRestore, + String colocateWith, Env env, long repoId, + BackupMeta backupMeta) { this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica, - reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, env, + reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, + colocateWith, env, repoId); this.backupMeta = backupMeta; } @@ -817,7 +825,7 @@ private void checkAndPrepareMeta() { // Reset properties to correct values. remoteOlapTbl.resetPropertiesForRestore(reserveDynamicPartitionEnable, reserveReplica, - replicaAlloc, isBeingSynced); + replicaAlloc, isBeingSynced, colocateWith); // DO NOT set remote table's new name here, cause we will still need the origin name later // remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name)); @@ -2224,6 +2232,7 @@ public synchronized List getInfo(boolean isBrief) { info.add(replicaAlloc.toCreateStmt()); info.add(String.valueOf(reserveReplica)); info.add(String.valueOf(reserveDynamicPartitionEnable)); + info.add(colocateWith); if (!isBrief) { info.add(getRestoreObjs()); } @@ -2647,6 +2656,7 @@ private void readOthers(DataInput in) throws IOException { isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES)); isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS)); isAtomicRestore = Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE)); + colocateWith = properties.get(PROP_COLOCATE_WITH); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 8925e483c29df7..069f0f8b7c24bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -737,15 +737,18 @@ public void renameColumnNamePrefix(long idxId) { * Reset properties to correct values. */ public void resetPropertiesForRestore(boolean reserveDynamicPartitionEnable, boolean reserveReplica, - ReplicaAllocation replicaAlloc, boolean isBeingSynced) { + ReplicaAllocation replicaAlloc, boolean isBeingSynced, String colocateWith) { if (tableProperty != null) { tableProperty.resetPropertiesForRestore(reserveDynamicPartitionEnable, reserveReplica, replicaAlloc); } if (isBeingSynced) { setBeingSyncedProperties(); } - // remove colocate property. - setColocateGroup(null); + if (colocateWith != null) { + setColocateGroup(colocateWith); + } else { + setColocateGroup(null); + } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index 1ac556c6846c05..96b8b5ca700c90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -73,6 +73,7 @@ public class TableProperty implements Writable, GsonPostProcessable { private String storagePolicy = ""; private Boolean isBeingSynced = null; private BinlogConfig binlogConfig; + private String colocateWith = ""; private TStorageMedium storageMedium = null; @@ -157,6 +158,7 @@ public TableProperty buildProperty(short opCode) { buildMinLoadReplicaNum(); buildStorageMedium(); buildStoragePolicy(); + buildColocateWith(); buildIsBeingSynced(); buildCompactionPolicy(); buildTimeSeriesCompactionGoalSizeMbytes(); @@ -447,6 +449,15 @@ public String getStoragePolicy() { return storagePolicy; } + public TableProperty buildColocateWith() { + storagePolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, ""); + return this; + } + + public String getColocateWith() { + return storagePolicy; + } + public TableProperty buildIsBeingSynced() { isBeingSynced = Boolean.parseBoolean(properties.getOrDefault( PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED, "false")); @@ -467,6 +478,9 @@ public boolean isBeingSynced() { public void removeInvalidProperties() { properties.remove(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY); + if (properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH) != null) { + colocateWith = properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH); + } properties.remove(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH); } @@ -697,6 +711,10 @@ public void setRowStoreColumns(List rowStoreColumns) { } } + public void setColocateWith(String colocateWith) { + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, colocateWith); + } + public void buildReplicaAllocation() { try { // Must copy the properties because "analyzeReplicaAllocation" will remove the property @@ -731,6 +749,7 @@ public void gsonPostProcess() throws IOException { buildDataSortInfo(); buildCompressionType(); buildStoragePolicy(); + buildColocateWith(); buildIsBeingSynced(); buildBinlogConfig(); buildEnableLightSchemaChange(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index e35fd2dc852322..26f204fc79f690 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3062,6 +3062,9 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque if (request.isAtomicRestore()) { properties.put(RestoreStmt.PROP_ATOMIC_RESTORE, "true"); } + if (request.isSetColocateWith()) { + properties.put(RestoreStmt.PROP_COLOCATE_WITH, request.getColocateWith()); + } AbstractBackupTableRefClause restoreTableRefClause = null; if (request.isSetTableRefs()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index dadfdb632e394d..efe83479992008 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -256,7 +256,7 @@ boolean await(long timeout, TimeUnit unit) { db.unregisterTable(expectedRestoreTbl.getName()); job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false, - new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, + new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, null, env, repo.getId()); List tbls = Lists.newArrayList(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java index 84b8c6062d2351..721e4c4dbdd00a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java @@ -94,7 +94,7 @@ public void testResetPropertiesForRestore() { Assert.assertTrue(olapTable.getDefaultReplicaAllocation() == ReplicaAllocation.DEFAULT_ALLOCATION); ReplicaAllocation replicaAlloc = new ReplicaAllocation((short) 4); - olapTable.resetPropertiesForRestore(false, false, replicaAlloc, false); + olapTable.resetPropertiesForRestore(false, false, replicaAlloc, false, null); Assert.assertEquals(tableProperty.getProperties(), olapTable.getTableProperty().getProperties()); Assert.assertFalse(tableProperty.getDynamicPartitionProperty().isExist()); Assert.assertFalse(olapTable.isColocateTable()); @@ -114,7 +114,7 @@ public void testResetPropertiesForRestore() { tableProperty = new TableProperty(properties); olapTable.setTableProperty(tableProperty); - olapTable.resetPropertiesForRestore(false, false, ReplicaAllocation.DEFAULT_ALLOCATION, false); + olapTable.resetPropertiesForRestore(false, false, ReplicaAllocation.DEFAULT_ALLOCATION, false, null); Map expectedProperties = Maps.newHashMap(properties); expectedProperties.put(DynamicPartitionProperty.ENABLE, "false"); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index e2af8937425d0c..4af929c83ac8c3 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1389,6 +1389,7 @@ struct TRestoreSnapshotRequest { 14: optional bool clean_partitions 15: optional bool atomic_restore 16: optional bool compressed; + 17: optional string colocate_with; } struct TRestoreSnapshotResult { diff --git a/regression-test/suites/backup_restore/test_backup_restore_colocate_with.groovy b/regression-test/suites/backup_restore/test_backup_restore_colocate_with.groovy new file mode 100644 index 00000000000000..86ab41a662bf32 --- /dev/null +++ b/regression-test/suites/backup_restore/test_backup_restore_colocate_with.groovy @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backup_restore_colocate_with", "backup_restore") { + String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "") + String dbName = "test_backup_restore_colocate_with_db" + String tableName = "test_backup_restore_colocate_with_table" + + def syncer = getSyncer() + syncer.createS3Repository(repoName) + + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" + sql """ + CREATE TABLE ${dbName}.${tableName} ( + `saler_id` int NOT NULL, + `category_id` int NOT NULL, + `sale_date` date NOT NULL + ) + DUPLICATE KEY(`saler_id`) + DISTRIBUTED BY HASH(`saler_id`) BUCKETS 10 + PROPERTIES + ( + "replication_num" = "1", + "colocate_with" = "group1" + ) + """ + + sql """INSERT INTO ${dbName}.${tableName} VALUES (1, 1, '2023-11-01'),(2, 2, '2023-11-02')""" + + def result = sql"SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), 2) + + String snapshotName = "test_backup_restore_colocate_with_snapshot" + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON (${tableName}) + PROPERTIES ("type" = "full") + """ + + syncer.waitSnapshotFinish(dbName) + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + + assertTrue(snapshot != null) + + sql "TRUNCATE TABLE ${dbName}.${tableName}" + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true", + "colocate_with" = "group1" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + def restore_index_comment = sql "SHOW CREATE TABLE ${dbName}.${tableName}" + + assertTrue(restore_index_comment[0][1].contains("\"colocate_with\" = \"group1\"")) + + result = sql"SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), 2) + + sql "DROP TABLE ${dbName}.${tableName} FORCE" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" + +}