Skip to content

Commit

Permalink
[improve](restore) Compress restore job to reduce editlog size (#42422)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Oct 31, 2024
1 parent 135d594 commit 9f82266
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,14 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_backup_restore_job_num_per_db = 10;

/**
* A internal config, to reduce the restore job size during serialization by compress.
*
* WARNING: Once this option is enabled and a restore is performed, the FE version cannot be rolled back.
*/
@ConfField(mutable = false)
public static boolean restore_job_compressed_serialization = false;

/**
* Control the max num of tablets per backup job involved.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public abstract class AbstractJob implements Writable {

public enum JobType {
BACKUP, RESTORE
BACKUP, RESTORE, RESTORE_COMPRESSED
}

protected JobType type;
Expand Down Expand Up @@ -160,8 +160,8 @@ public static AbstractJob read(DataInput in) throws IOException {
JobType type = JobType.valueOf(Text.readString(in));
if (type == JobType.BACKUP) {
job = new BackupJob();
} else if (type == JobType.RESTORE) {
job = new RestoreJob();
} else if (type == JobType.RESTORE || type == JobType.RESTORE_COMPRESSED) {
job = new RestoreJob(type);
} else {
throw new IOException("Unknown job type: " + type.name());
}
Expand Down
53 changes: 53 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,21 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class RestoreJob extends AbstractJob {
private static final String PROP_RESERVE_REPLICA = RestoreStmt.PROP_RESERVE_REPLICA;
Expand Down Expand Up @@ -195,6 +201,10 @@ public RestoreJob() {
super(JobType.RESTORE);
}

public RestoreJob(JobType jobType) {
super(jobType);
}

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables,
Expand Down Expand Up @@ -2421,8 +2431,31 @@ public static RestoreJob read(DataInput in) throws IOException {

@Override
public void write(DataOutput out) throws IOException {
if (Config.restore_job_compressed_serialization) {
type = JobType.RESTORE_COMPRESSED;
}
super.write(out);
if (Config.restore_job_compressed_serialization) {
type = JobType.RESTORE;

ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) {
try (DataOutputStream stream = new DataOutputStream(gzipStream)) {
writeOthers(stream);
stream.flush();
}
}
Text text = new Text(bytesStream.toByteArray());
if (LOG.isDebugEnabled() || text.getLength() > (100 << 20)) {
LOG.info("restore job serialized size {}", text.getLength());
}
text.write(out);
} else {
writeOthers(out);
}
}

private void writeOthers(DataOutput out) throws IOException {
Text.writeString(out, backupTimestamp);
jobInfo.write(out);
out.writeBoolean(allowLoad);
Expand Down Expand Up @@ -2495,7 +2528,27 @@ public void write(DataOutput out) throws IOException {
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
if (type == JobType.RESTORE_COMPRESSED) {
type = JobType.RESTORE;

Text text = new Text();
text.readFields(in);
if (LOG.isDebugEnabled() || text.getLength() > (100 << 20)) {
LOG.info("read restore job compressed size {}", text.getLength());
}

ByteArrayInputStream bytesStream = new ByteArrayInputStream(text.getBytes());
try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
try (DataInputStream stream = new DataInputStream(gzipStream)) {
readOthers(stream);
}
}
} else {
readOthers(in);
}
}

private void readOthers(DataInput in) throws IOException {
backupTimestamp = Text.readString(in);
jobInfo = BackupJobInfo.read(in);
allowLoad = in.readBoolean();
Expand Down

0 comments on commit 9f82266

Please sign in to comment.