Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](restore) Support colocate_with properties for restore job #44829

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars
public static final String PROP_CLEAN_TABLES = "clean_tables";
public static final String PROP_CLEAN_PARTITIONS = "clean_partitions";
public static final String PROP_ATOMIC_RESTORE = "atomic_restore";
public static final String PROP_COLOCATE_WITH = "colocate_with";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep_colocate_with is a better name.


private boolean allowLoad = false;
private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
private String backupTimestamp = null;
private String colocateWith = null;
private int metaVersion = -1;
private boolean reserveReplica = false;
private boolean reserveDynamicPartitionEnable = false;
Expand Down Expand Up @@ -123,6 +125,10 @@ public boolean isCleanPartitions() {
return isCleanPartitions;
}

public String getColocateWith() {
return colocateWith;
}

public boolean isAtomicRestore() {
return isAtomicRestore;
}
Expand Down Expand Up @@ -209,6 +215,12 @@ public void analyzeProperties() throws AnalysisException {
// is clean partitions
isCleanPartitions = eatBooleanProperty(copiedProperties, PROP_CLEAN_PARTITIONS, isCleanPartitions);

// colocate with
if (copiedProperties.containsKey(PROP_COLOCATE_WITH)) {
colocateWith = copiedProperties.get(PROP_COLOCATE_WITH);
copiedProperties.remove(PROP_COLOCATE_WITH);
}

// is atomic restore
isAtomicRestore = eatBooleanProperty(copiedProperties, PROP_ATOMIC_RESTORE, isAtomicRestore);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,16 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(),
stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(),
stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), stmt.getColocateWith(),
env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
} else {
restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(),
stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(),
env, repository.getId());
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(),
stmt.reserveDynamicPartitionEnable(),
stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(),
stmt.getColocateWith(),
env, repository.getId());
}

env.getEditLog().logRestoreJob(restoreJob);
Expand Down
18 changes: 14 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable {
private static final String PROP_CLEAN_TABLES = RestoreStmt.PROP_CLEAN_TABLES;
private static final String PROP_CLEAN_PARTITIONS = RestoreStmt.PROP_CLEAN_PARTITIONS;
private static final String PROP_ATOMIC_RESTORE = RestoreStmt.PROP_ATOMIC_RESTORE;
private static final String PROP_COLOCATE_WITH = RestoreStmt.PROP_COLOCATE_WITH;
private static final String ATOMIC_RESTORE_TABLE_PREFIX = "__doris_atomic_restore_prefix__";

private static final Logger LOG = LogManager.getLogger(RestoreJob.class);
Expand Down Expand Up @@ -210,6 +211,8 @@ public enum RestoreJobState {
private boolean isCleanPartitions = false;
// Whether to restore the data into a temp table, and then replace the origin one.
private boolean isAtomicRestore = false;
// Whether to specify table property colocate_with
private String colocateWith = null;

// restore properties
@SerializedName("prop")
Expand All @@ -228,7 +231,7 @@ public RestoreJob(JobType jobType) {
public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables,
boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) {
boolean isCleanPartitions, boolean isAtomicRestore, String colocateWith, Env env, long repoId) {
super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId);
this.backupTimestamp = backupTs;
this.jobInfo = jobInfo;
Expand All @@ -246,20 +249,25 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu
this.isCleanTables = isCleanTables;
this.isCleanPartitions = isCleanPartitions;
this.isAtomicRestore = isAtomicRestore;
this.colocateWith = colocateWith;
properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica));
properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable));
properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced));
properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables));
properties.put(PROP_CLEAN_PARTITIONS, String.valueOf(isCleanPartitions));
properties.put(PROP_ATOMIC_RESTORE, String.valueOf(isAtomicRestore));
properties.put(PROP_COLOCATE_WITH, colocateWith);
}

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables,
boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId, BackupMeta backupMeta) {
boolean isCleanPartitions, boolean isAtomicRestore,
String colocateWith, Env env, long repoId,
BackupMeta backupMeta) {
this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica,
reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, env,
reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore,
colocateWith, env,
repoId);
this.backupMeta = backupMeta;
}
Expand Down Expand Up @@ -817,7 +825,7 @@ private void checkAndPrepareMeta() {

// Reset properties to correct values.
remoteOlapTbl.resetPropertiesForRestore(reserveDynamicPartitionEnable, reserveReplica,
replicaAlloc, isBeingSynced);
replicaAlloc, isBeingSynced, colocateWith);

// DO NOT set remote table's new name here, cause we will still need the origin name later
// remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
Expand Down Expand Up @@ -2224,6 +2232,7 @@ public synchronized List<String> getInfo(boolean isBrief) {
info.add(replicaAlloc.toCreateStmt());
info.add(String.valueOf(reserveReplica));
info.add(String.valueOf(reserveDynamicPartitionEnable));
info.add(colocateWith);
if (!isBrief) {
info.add(getRestoreObjs());
}
Expand Down Expand Up @@ -2647,6 +2656,7 @@ private void readOthers(DataInput in) throws IOException {
isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES));
isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS));
isAtomicRestore = Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE));
colocateWith = properties.get(PROP_COLOCATE_WITH);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,15 +737,18 @@ public void renameColumnNamePrefix(long idxId) {
* Reset properties to correct values.
*/
public void resetPropertiesForRestore(boolean reserveDynamicPartitionEnable, boolean reserveReplica,
ReplicaAllocation replicaAlloc, boolean isBeingSynced) {
ReplicaAllocation replicaAlloc, boolean isBeingSynced, String colocateWith) {
if (tableProperty != null) {
tableProperty.resetPropertiesForRestore(reserveDynamicPartitionEnable, reserveReplica, replicaAlloc);
}
if (isBeingSynced) {
setBeingSyncedProperties();
}
// remove colocate property.
setColocateGroup(null);
if (colocateWith != null) {
setColocateGroup(colocateWith);
} else {
setColocateGroup(null);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class TableProperty implements Writable, GsonPostProcessable {
private String storagePolicy = "";
private Boolean isBeingSynced = null;
private BinlogConfig binlogConfig;
private String colocateWith = "";

private TStorageMedium storageMedium = null;

Expand Down Expand Up @@ -157,6 +158,7 @@ public TableProperty buildProperty(short opCode) {
buildMinLoadReplicaNum();
buildStorageMedium();
buildStoragePolicy();
buildColocateWith();
buildIsBeingSynced();
buildCompactionPolicy();
buildTimeSeriesCompactionGoalSizeMbytes();
Expand Down Expand Up @@ -447,6 +449,15 @@ public String getStoragePolicy() {
return storagePolicy;
}

public TableProperty buildColocateWith() {
storagePolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, "");
return this;
}

public String getColocateWith() {
return storagePolicy;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong.

}

public TableProperty buildIsBeingSynced() {
isBeingSynced = Boolean.parseBoolean(properties.getOrDefault(
PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED, "false"));
Expand All @@ -467,6 +478,9 @@ public boolean isBeingSynced() {

public void removeInvalidProperties() {
properties.remove(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY);
if (properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH) != null) {
colocateWith = properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH);
}
properties.remove(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH);
}

Expand Down Expand Up @@ -697,6 +711,10 @@ public void setRowStoreColumns(List<String> rowStoreColumns) {
}
}

public void setColocateWith(String colocateWith) {
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, colocateWith);
}

public void buildReplicaAllocation() {
try {
// Must copy the properties because "analyzeReplicaAllocation" will remove the property
Expand Down Expand Up @@ -731,6 +749,7 @@ public void gsonPostProcess() throws IOException {
buildDataSortInfo();
buildCompressionType();
buildStoragePolicy();
buildColocateWith();
buildIsBeingSynced();
buildBinlogConfig();
buildEnableLightSchemaChange();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3062,6 +3062,9 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque
if (request.isAtomicRestore()) {
properties.put(RestoreStmt.PROP_ATOMIC_RESTORE, "true");
}
if (request.isSetColocateWith()) {
properties.put(RestoreStmt.PROP_COLOCATE_WITH, request.getColocateWith());
}

AbstractBackupTableRefClause restoreTableRefClause = null;
if (request.isSetTableRefs()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ boolean await(long timeout, TimeUnit unit) {
db.unregisterTable(expectedRestoreTbl.getName());

job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false,
new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false,
new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, null,
env, repo.getId());

List<Table> tbls = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testResetPropertiesForRestore() {
Assert.assertTrue(olapTable.getDefaultReplicaAllocation() == ReplicaAllocation.DEFAULT_ALLOCATION);

ReplicaAllocation replicaAlloc = new ReplicaAllocation((short) 4);
olapTable.resetPropertiesForRestore(false, false, replicaAlloc, false);
olapTable.resetPropertiesForRestore(false, false, replicaAlloc, false, null);
Assert.assertEquals(tableProperty.getProperties(), olapTable.getTableProperty().getProperties());
Assert.assertFalse(tableProperty.getDynamicPartitionProperty().isExist());
Assert.assertFalse(olapTable.isColocateTable());
Expand All @@ -114,7 +114,7 @@ public void testResetPropertiesForRestore() {

tableProperty = new TableProperty(properties);
olapTable.setTableProperty(tableProperty);
olapTable.resetPropertiesForRestore(false, false, ReplicaAllocation.DEFAULT_ALLOCATION, false);
olapTable.resetPropertiesForRestore(false, false, ReplicaAllocation.DEFAULT_ALLOCATION, false, null);

Map<String, String> expectedProperties = Maps.newHashMap(properties);
expectedProperties.put(DynamicPartitionProperty.ENABLE, "false");
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,7 @@ struct TRestoreSnapshotRequest {
14: optional bool clean_partitions
15: optional bool atomic_restore
16: optional bool compressed;
17: optional string colocate_with;
}

struct TRestoreSnapshotResult {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_backup_restore_colocate_with", "backup_restore") {
String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
String dbName = "test_backup_restore_colocate_with_db"
String tableName = "test_backup_restore_colocate_with_table"

def syncer = getSyncer()
syncer.createS3Repository(repoName)

sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
sql """
CREATE TABLE ${dbName}.${tableName} (
`saler_id` int NOT NULL,
`category_id` int NOT NULL,
`sale_date` date NOT NULL
)
DUPLICATE KEY(`saler_id`)
DISTRIBUTED BY HASH(`saler_id`) BUCKETS 10
PROPERTIES
(
"replication_num" = "1",
"colocate_with" = "group1"
)
"""

sql """INSERT INTO ${dbName}.${tableName} VALUES (1, 1, '2023-11-01'),(2, 2, '2023-11-02')"""

def result = sql"SELECT * FROM ${dbName}.${tableName}"
assertEquals(result.size(), 2)

String snapshotName = "test_backup_restore_colocate_with_snapshot"

sql """
BACKUP SNAPSHOT ${dbName}.${snapshotName}
TO `${repoName}`
ON (${tableName})
PROPERTIES ("type" = "full")
"""

syncer.waitSnapshotFinish(dbName)

def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)

assertTrue(snapshot != null)

sql "TRUNCATE TABLE ${dbName}.${tableName}"
sql """
RESTORE SNAPSHOT ${dbName}.${snapshotName}
FROM `${repoName}`
ON ( `${tableName}`)
PROPERTIES
(
"backup_timestamp" = "${snapshot}",
"reserve_replica" = "true",
"colocate_with" = "group1"
)
"""

syncer.waitAllRestoreFinish(dbName)

def restore_index_comment = sql "SHOW CREATE TABLE ${dbName}.${tableName}"

assertTrue(restore_index_comment[0][1].contains("\"colocate_with\" = \"group1\""))

result = sql"SELECT * FROM ${dbName}.${tableName}"
assertEquals(result.size(), 2)

sql "DROP TABLE ${dbName}.${tableName} FORCE"
sql "DROP DATABASE ${dbName} FORCE"
sql "DROP REPOSITORY `${repoName}`"

}
Loading