Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
wyxxxcat committed Dec 1, 2024
1 parent 08b187b commit 46a27de
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars
public static final String PROP_CLEAN_TABLES = "clean_tables";
public static final String PROP_CLEAN_PARTITIONS = "clean_partitions";
public static final String PROP_ATOMIC_RESTORE = "atomic_restore";
public static final String PROP_COLOCATE_WITH = "colocate_with";

private boolean allowLoad = false;
private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
private String backupTimestamp = null;
private String colocateWith = null;
private int metaVersion = -1;
private boolean reserveReplica = false;
private boolean reserveDynamicPartitionEnable = false;
Expand Down Expand Up @@ -123,6 +125,10 @@ public boolean isCleanPartitions() {
return isCleanPartitions;
}

public String getColocateWith() {
return colocateWith;
}

public boolean isAtomicRestore() {
return isAtomicRestore;
}
Expand Down Expand Up @@ -209,6 +215,12 @@ public void analyzeProperties() throws AnalysisException {
// is clean partitions
isCleanPartitions = eatBooleanProperty(copiedProperties, PROP_CLEAN_PARTITIONS, isCleanPartitions);

// colocate with
if (copiedProperties.containsKey(PROP_COLOCATE_WITH)) {
colocateWith = copiedProperties.get(PROP_COLOCATE_WITH);
copiedProperties.remove(PROP_COLOCATE_WITH);
}

// is atomic restore
isAtomicRestore = eatBooleanProperty(copiedProperties, PROP_ATOMIC_RESTORE, isAtomicRestore);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,16 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(),
stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(),
stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), stmt.getColocateWith(),
env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
} else {
restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(),
stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(),
env, repository.getId());
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(),
stmt.reserveDynamicPartitionEnable(),
stmt.isBeingSynced(), stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(),
stmt.getColocateWith(),
env, repository.getId());
}

env.getEditLog().logRestoreJob(restoreJob);
Expand Down
18 changes: 14 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable {
private static final String PROP_CLEAN_TABLES = RestoreStmt.PROP_CLEAN_TABLES;
private static final String PROP_CLEAN_PARTITIONS = RestoreStmt.PROP_CLEAN_PARTITIONS;
private static final String PROP_ATOMIC_RESTORE = RestoreStmt.PROP_ATOMIC_RESTORE;
private static final String PROP_COLOCATE_WITH = RestoreStmt.PROP_COLOCATE_WITH;
private static final String ATOMIC_RESTORE_TABLE_PREFIX = "__doris_atomic_restore_prefix__";

private static final Logger LOG = LogManager.getLogger(RestoreJob.class);
Expand Down Expand Up @@ -210,6 +211,8 @@ public enum RestoreJobState {
private boolean isCleanPartitions = false;
// Whether to restore the data into a temp table, and then replace the origin one.
private boolean isAtomicRestore = false;
// Whether to specify table property colocate_with
private String colocateWith = null;

// restore properties
@SerializedName("prop")
Expand All @@ -228,7 +231,7 @@ public RestoreJob(JobType jobType) {
public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables,
boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) {
boolean isCleanPartitions, boolean isAtomicRestore, String colocateWith, Env env, long repoId) {
super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId);
this.backupTimestamp = backupTs;
this.jobInfo = jobInfo;
Expand All @@ -246,20 +249,25 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu
this.isCleanTables = isCleanTables;
this.isCleanPartitions = isCleanPartitions;
this.isAtomicRestore = isAtomicRestore;
this.colocateWith = colocateWith;
properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica));
properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable));
properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced));
properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables));
properties.put(PROP_CLEAN_PARTITIONS, String.valueOf(isCleanPartitions));
properties.put(PROP_ATOMIC_RESTORE, String.valueOf(isAtomicRestore));
properties.put(PROP_COLOCATE_WITH, colocateWith);
}

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables,
boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId, BackupMeta backupMeta) {
boolean isCleanPartitions, boolean isAtomicRestore,
String colocateWith, Env env, long repoId,
BackupMeta backupMeta) {
this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica,
reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore, env,
reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions, isAtomicRestore,
colocateWith, env,
repoId);
this.backupMeta = backupMeta;
}
Expand Down Expand Up @@ -817,7 +825,7 @@ private void checkAndPrepareMeta() {

// Reset properties to correct values.
remoteOlapTbl.resetPropertiesForRestore(reserveDynamicPartitionEnable, reserveReplica,
replicaAlloc, isBeingSynced);
replicaAlloc, isBeingSynced, colocateWith);

// DO NOT set remote table's new name here, cause we will still need the origin name later
// remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
Expand Down Expand Up @@ -2224,6 +2232,7 @@ public synchronized List<String> getInfo(boolean isBrief) {
info.add(replicaAlloc.toCreateStmt());
info.add(String.valueOf(reserveReplica));
info.add(String.valueOf(reserveDynamicPartitionEnable));
info.add(colocateWith);
if (!isBrief) {
info.add(getRestoreObjs());
}
Expand Down Expand Up @@ -2647,6 +2656,7 @@ private void readOthers(DataInput in) throws IOException {
isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES));
isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS));
isAtomicRestore = Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE));
colocateWith = properties.get(PROP_COLOCATE_WITH);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,15 +737,18 @@ public void renameColumnNamePrefix(long idxId) {
* Reset properties to correct values.
*/
public void resetPropertiesForRestore(boolean reserveDynamicPartitionEnable, boolean reserveReplica,
ReplicaAllocation replicaAlloc, boolean isBeingSynced) {
ReplicaAllocation replicaAlloc, boolean isBeingSynced, String colocateWith) {
if (tableProperty != null) {
tableProperty.resetPropertiesForRestore(reserveDynamicPartitionEnable, reserveReplica, replicaAlloc);
}
if (isBeingSynced) {
setBeingSyncedProperties();
}
// remove colocate property.
setColocateGroup(null);
if (colocateWith != null) {
setColocateGroup(colocateWith);
} else {
setColocateGroup(null);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class TableProperty implements Writable, GsonPostProcessable {
private String storagePolicy = "";
private Boolean isBeingSynced = null;
private BinlogConfig binlogConfig;
private String colocateWith = "";

private TStorageMedium storageMedium = null;

Expand Down Expand Up @@ -157,6 +158,7 @@ public TableProperty buildProperty(short opCode) {
buildMinLoadReplicaNum();
buildStorageMedium();
buildStoragePolicy();
buildColocateWith();
buildIsBeingSynced();
buildCompactionPolicy();
buildTimeSeriesCompactionGoalSizeMbytes();
Expand Down Expand Up @@ -447,6 +449,15 @@ public String getStoragePolicy() {
return storagePolicy;
}

public TableProperty buildColocateWith() {
storagePolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, "");
return this;
}

public String getColocateWith() {
return storagePolicy;
}

public TableProperty buildIsBeingSynced() {
isBeingSynced = Boolean.parseBoolean(properties.getOrDefault(
PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED, "false"));
Expand All @@ -467,6 +478,9 @@ public boolean isBeingSynced() {

public void removeInvalidProperties() {
properties.remove(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY);
if (properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH) != null) {
colocateWith = properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH);
}
properties.remove(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH);
}

Expand Down Expand Up @@ -697,6 +711,10 @@ public void setRowStoreColumns(List<String> rowStoreColumns) {
}
}

public void setColocateWith(String colocateWith) {
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, colocateWith);
}

public void buildReplicaAllocation() {
try {
// Must copy the properties because "analyzeReplicaAllocation" will remove the property
Expand Down Expand Up @@ -731,6 +749,7 @@ public void gsonPostProcess() throws IOException {
buildDataSortInfo();
buildCompressionType();
buildStoragePolicy();
buildColocateWith();
buildIsBeingSynced();
buildBinlogConfig();
buildEnableLightSchemaChange();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3062,6 +3062,9 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque
if (request.isAtomicRestore()) {
properties.put(RestoreStmt.PROP_ATOMIC_RESTORE, "true");
}
if (request.isSetColocateWith()) {
properties.put(RestoreStmt.PROP_COLOCATE_WITH, request.getColocateWith());
}

AbstractBackupTableRefClause restoreTableRefClause = null;
if (request.isSetTableRefs()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ boolean await(long timeout, TimeUnit unit) {
db.unregisterTable(expectedRestoreTbl.getName());

job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false,
new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false,
new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, null,
env, repo.getId());

List<Table> tbls = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testResetPropertiesForRestore() {
Assert.assertTrue(olapTable.getDefaultReplicaAllocation() == ReplicaAllocation.DEFAULT_ALLOCATION);

ReplicaAllocation replicaAlloc = new ReplicaAllocation((short) 4);
olapTable.resetPropertiesForRestore(false, false, replicaAlloc, false);
olapTable.resetPropertiesForRestore(false, false, replicaAlloc, false, null);
Assert.assertEquals(tableProperty.getProperties(), olapTable.getTableProperty().getProperties());
Assert.assertFalse(tableProperty.getDynamicPartitionProperty().isExist());
Assert.assertFalse(olapTable.isColocateTable());
Expand All @@ -114,7 +114,7 @@ public void testResetPropertiesForRestore() {

tableProperty = new TableProperty(properties);
olapTable.setTableProperty(tableProperty);
olapTable.resetPropertiesForRestore(false, false, ReplicaAllocation.DEFAULT_ALLOCATION, false);
olapTable.resetPropertiesForRestore(false, false, ReplicaAllocation.DEFAULT_ALLOCATION, false, null);

Map<String, String> expectedProperties = Maps.newHashMap(properties);
expectedProperties.put(DynamicPartitionProperty.ENABLE, "false");
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,7 @@ struct TRestoreSnapshotRequest {
14: optional bool clean_partitions
15: optional bool atomic_restore
16: optional bool compressed;
17: optional string colocate_with;
}

struct TRestoreSnapshotResult {
Expand Down

0 comments on commit 46a27de

Please sign in to comment.