From 46a27de7fe2c4a0d98ff2a62fa14b96d2973c7a3 Mon Sep 17 00:00:00 2001 From: wyxxxcat <1520358997@qq.com> Date: Sun, 1 Dec 2024 23:38:41 +0800 Subject: [PATCH] 1 --- .../apache/doris/analysis/RestoreStmt.java | 12 ++++++++++++ .../apache/doris/backup/BackupHandler.java | 12 +++++++----- .../org/apache/doris/backup/RestoreJob.java | 18 ++++++++++++++---- .../org/apache/doris/catalog/OlapTable.java | 9 ++++++--- .../apache/doris/catalog/TableProperty.java | 19 +++++++++++++++++++ .../doris/service/FrontendServiceImpl.java | 3 +++ .../apache/doris/backup/RestoreJobTest.java | 2 +- .../apache/doris/catalog/OlapTableTest.java | 4 ++-- gensrc/thrift/FrontendService.thrift | 1 + 9 files changed, 65 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java index bc38cfe09e5606f..f16ed12e97e43f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java @@ -44,10 +44,12 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars public static final String PROP_CLEAN_TABLES = "clean_tables"; public static final String PROP_CLEAN_PARTITIONS = "clean_partitions"; public static final String PROP_ATOMIC_RESTORE = "atomic_restore"; + public static final String PROP_COLOCATE_WITH = "colocate_with"; private boolean allowLoad = false; private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; private String backupTimestamp = null; + private String colocateWith = null; private int metaVersion = -1; private boolean reserveReplica = false; private boolean reserveDynamicPartitionEnable = false; @@ -123,6 +125,10 @@ public boolean isCleanPartitions() { return isCleanPartitions; } + public String getColocateWith() { + return colocateWith; + } + public boolean isAtomicRestore() { return isAtomicRestore; } @@ -209,6 +215,12 @@ public void analyzeProperties() throws AnalysisException { // is clean partitions isCleanPartitions = eatBooleanProperty(copiedProperties, PROP_CLEAN_PARTITIONS, isCleanPartitions); + // colocate with + if (copiedProperties.containsKey(PROP_COLOCATE_WITH)) { + colocateWith = copiedProperties.get(PROP_COLOCATE_WITH); + copiedProperties.remove(PROP_COLOCATE_WITH); + } + // is atomic restore isAtomicRestore = eatBooleanProperty(copiedProperties, PROP_ATOMIC_RESTORE, isAtomicRestore); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index d70544add98747d..8e4b3277c0bb74b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -555,14 +555,16 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), - stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), + stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), stmt.getColocateWith(), env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta); } else { restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(), - db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), - stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(), - stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), - env, repository.getId()); + db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(), + stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), + stmt.reserveDynamicPartitionEnable(), + stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), + stmt.getColocateWith(), + env, repository.getId()); } env.getEditLog().logRestoreJob(restoreJob); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index f1abb0c9e632bc1..5e326509aff0861 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -123,6 +123,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { private static final String PROP_CLEAN_TABLES = RestoreStmt.PROP_CLEAN_TABLES; private static final String PROP_CLEAN_PARTITIONS = RestoreStmt.PROP_CLEAN_PARTITIONS; private static final String PROP_ATOMIC_RESTORE = RestoreStmt.PROP_ATOMIC_RESTORE; + private static final String PROP_COLOCATE_WITH = RestoreStmt.PROP_COLOCATE_WITH; private static final String ATOMIC_RESTORE_TABLE_PREFIX = "__doris_atomic_restore_prefix__"; private static final Logger LOG = LogManager.getLogger(RestoreJob.class); @@ -210,6 +211,8 @@ public enum RestoreJobState { private boolean isCleanPartitions = false; // Whether to restore the data into a temp table, and then replace the origin one. private boolean isAtomicRestore = false; + // Whether to specify table property colocate_with + private String colocateWith = null; // restore properties @SerializedName("prop") @@ -228,7 +231,7 @@ public RestoreJob(JobType jobType) { public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, - boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) { + boolean isCleanPartitions, boolean isAtomicRestore, String colocateWith, Env env, long repoId) { super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId); this.backupTimestamp = backupTs; this.jobInfo = jobInfo; @@ -246,20 +249,25 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu this.isCleanTables = isCleanTables; this.isCleanPartitions = isCleanPartitions; this.isAtomicRestore = isAtomicRestore; + this.colocateWith = colocateWith; properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica)); properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable)); properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced)); properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables)); properties.put(PROP_CLEAN_PARTITIONS, String.valueOf(isCleanPartitions)); properties.put(PROP_ATOMIC_RESTORE, String.valueOf(isAtomicRestore)); + properties.put(PROP_COLOCATE_WITH, colocateWith); } public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, - boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId, BackupMeta backupMeta) { + boolean isCleanPartitions, boolean isAtomicRestore, + String colocateWith, Env env, long repoId, + BackupMeta backupMeta) { this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica, - reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, env, + reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, + colocateWith, env, repoId); this.backupMeta = backupMeta; } @@ -817,7 +825,7 @@ private void checkAndPrepareMeta() { // Reset properties to correct values. remoteOlapTbl.resetPropertiesForRestore(reserveDynamicPartitionEnable, reserveReplica, - replicaAlloc, isBeingSynced); + replicaAlloc, isBeingSynced, colocateWith); // DO NOT set remote table's new name here, cause we will still need the origin name later // remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name)); @@ -2224,6 +2232,7 @@ public synchronized List getInfo(boolean isBrief) { info.add(replicaAlloc.toCreateStmt()); info.add(String.valueOf(reserveReplica)); info.add(String.valueOf(reserveDynamicPartitionEnable)); + info.add(colocateWith); if (!isBrief) { info.add(getRestoreObjs()); } @@ -2647,6 +2656,7 @@ private void readOthers(DataInput in) throws IOException { isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES)); isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS)); isAtomicRestore = Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE)); + colocateWith = properties.get(PROP_COLOCATE_WITH); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 8925e483c29df7f..069f0f8b7c24bfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -737,15 +737,18 @@ public void renameColumnNamePrefix(long idxId) { * Reset properties to correct values. */ public void resetPropertiesForRestore(boolean reserveDynamicPartitionEnable, boolean reserveReplica, - ReplicaAllocation replicaAlloc, boolean isBeingSynced) { + ReplicaAllocation replicaAlloc, boolean isBeingSynced, String colocateWith) { if (tableProperty != null) { tableProperty.resetPropertiesForRestore(reserveDynamicPartitionEnable, reserveReplica, replicaAlloc); } if (isBeingSynced) { setBeingSyncedProperties(); } - // remove colocate property. - setColocateGroup(null); + if (colocateWith != null) { + setColocateGroup(colocateWith); + } else { + setColocateGroup(null); + } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index 1ac556c6846c05f..96b8b5ca700c903 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -73,6 +73,7 @@ public class TableProperty implements Writable, GsonPostProcessable { private String storagePolicy = ""; private Boolean isBeingSynced = null; private BinlogConfig binlogConfig; + private String colocateWith = ""; private TStorageMedium storageMedium = null; @@ -157,6 +158,7 @@ public TableProperty buildProperty(short opCode) { buildMinLoadReplicaNum(); buildStorageMedium(); buildStoragePolicy(); + buildColocateWith(); buildIsBeingSynced(); buildCompactionPolicy(); buildTimeSeriesCompactionGoalSizeMbytes(); @@ -447,6 +449,15 @@ public String getStoragePolicy() { return storagePolicy; } + public TableProperty buildColocateWith() { + storagePolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, ""); + return this; + } + + public String getColocateWith() { + return storagePolicy; + } + public TableProperty buildIsBeingSynced() { isBeingSynced = Boolean.parseBoolean(properties.getOrDefault( PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED, "false")); @@ -467,6 +478,9 @@ public boolean isBeingSynced() { public void removeInvalidProperties() { properties.remove(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY); + if (properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH) != null) { + colocateWith = properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH); + } properties.remove(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH); } @@ -697,6 +711,10 @@ public void setRowStoreColumns(List rowStoreColumns) { } } + public void setColocateWith(String colocateWith) { + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, colocateWith); + } + public void buildReplicaAllocation() { try { // Must copy the properties because "analyzeReplicaAllocation" will remove the property @@ -731,6 +749,7 @@ public void gsonPostProcess() throws IOException { buildDataSortInfo(); buildCompressionType(); buildStoragePolicy(); + buildColocateWith(); buildIsBeingSynced(); buildBinlogConfig(); buildEnableLightSchemaChange(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index e35fd2dc852322f..26f204fc79f690b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3062,6 +3062,9 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque if (request.isAtomicRestore()) { properties.put(RestoreStmt.PROP_ATOMIC_RESTORE, "true"); } + if (request.isSetColocateWith()) { + properties.put(RestoreStmt.PROP_COLOCATE_WITH, request.getColocateWith()); + } AbstractBackupTableRefClause restoreTableRefClause = null; if (request.isSetTableRefs()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index dadfdb632e394d4..efe834799920088 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -256,7 +256,7 @@ boolean await(long timeout, TimeUnit unit) { db.unregisterTable(expectedRestoreTbl.getName()); job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false, - new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, + new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, null, env, repo.getId()); List tbls = Lists.newArrayList(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java index 84b8c6062d2351b..721e4c4dbdd00ab 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java @@ -94,7 +94,7 @@ public void testResetPropertiesForRestore() { Assert.assertTrue(olapTable.getDefaultReplicaAllocation() == ReplicaAllocation.DEFAULT_ALLOCATION); ReplicaAllocation replicaAlloc = new ReplicaAllocation((short) 4); - olapTable.resetPropertiesForRestore(false, false, replicaAlloc, false); + olapTable.resetPropertiesForRestore(false, false, replicaAlloc, false, null); Assert.assertEquals(tableProperty.getProperties(), olapTable.getTableProperty().getProperties()); Assert.assertFalse(tableProperty.getDynamicPartitionProperty().isExist()); Assert.assertFalse(olapTable.isColocateTable()); @@ -114,7 +114,7 @@ public void testResetPropertiesForRestore() { tableProperty = new TableProperty(properties); olapTable.setTableProperty(tableProperty); - olapTable.resetPropertiesForRestore(false, false, ReplicaAllocation.DEFAULT_ALLOCATION, false); + olapTable.resetPropertiesForRestore(false, false, ReplicaAllocation.DEFAULT_ALLOCATION, false, null); Map expectedProperties = Maps.newHashMap(properties); expectedProperties.put(DynamicPartitionProperty.ENABLE, "false"); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index e2af8937425d0c7..4af929c83ac8c37 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1389,6 +1389,7 @@ struct TRestoreSnapshotRequest { 14: optional bool clean_partitions 15: optional bool atomic_restore 16: optional bool compressed; + 17: optional string colocate_with; } struct TRestoreSnapshotResult {