Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve](restore) Compress restore job to reduce editlog size #42422

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,14 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_backup_restore_job_num_per_db = 10;

/**
* A internal config, to reduce the restore job size during serialization by compress.
*
* WARNING: Once this option is enabled and a restore is performed, the FE version cannot be rolled back.
*/
@ConfField(mutable = false)
public static boolean restore_job_compressed_serialization = false;

/**
* Control the max num of tablets per backup job involved.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public abstract class AbstractJob implements Writable {

public enum JobType {
BACKUP, RESTORE
BACKUP, RESTORE, RESTORE_COMPRESSED
}

protected JobType type;
Expand Down Expand Up @@ -160,8 +160,8 @@ public static AbstractJob read(DataInput in) throws IOException {
JobType type = JobType.valueOf(Text.readString(in));
if (type == JobType.BACKUP) {
job = new BackupJob();
} else if (type == JobType.RESTORE) {
job = new RestoreJob();
} else if (type == JobType.RESTORE || type == JobType.RESTORE_COMPRESSED) {
job = new RestoreJob(type);
} else {
throw new IOException("Unknown job type: " + type.name());
}
Expand Down
53 changes: 53 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,21 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class RestoreJob extends AbstractJob {
private static final String PROP_RESERVE_REPLICA = RestoreStmt.PROP_RESERVE_REPLICA;
Expand Down Expand Up @@ -195,6 +201,10 @@ public RestoreJob() {
super(JobType.RESTORE);
}

public RestoreJob(JobType jobType) {
super(jobType);
}

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables,
Expand Down Expand Up @@ -2421,8 +2431,31 @@ public static RestoreJob read(DataInput in) throws IOException {

@Override
public void write(DataOutput out) throws IOException {
if (Config.restore_job_compressed_serialization) {
type = JobType.RESTORE_COMPRESSED;
}
super.write(out);
if (Config.restore_job_compressed_serialization) {
type = JobType.RESTORE;

ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) {
try (DataOutputStream stream = new DataOutputStream(gzipStream)) {
writeOthers(stream);
stream.flush();
}
}
Text text = new Text(bytesStream.toByteArray());
if (LOG.isDebugEnabled() || text.getLength() > (100 << 20)) {
LOG.info("restore job serialized size {}", text.getLength());
}
text.write(out);
} else {
writeOthers(out);
}
}

private void writeOthers(DataOutput out) throws IOException {
Text.writeString(out, backupTimestamp);
jobInfo.write(out);
out.writeBoolean(allowLoad);
Expand Down Expand Up @@ -2495,7 +2528,27 @@ public void write(DataOutput out) throws IOException {
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
if (type == JobType.RESTORE_COMPRESSED) {
type = JobType.RESTORE;

Text text = new Text();
text.readFields(in);
if (LOG.isDebugEnabled() || text.getLength() > (100 << 20)) {
LOG.info("read restore job compressed size {}", text.getLength());
}

ByteArrayInputStream bytesStream = new ByteArrayInputStream(text.getBytes());
try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
try (DataInputStream stream = new DataInputStream(gzipStream)) {
readOthers(stream);
}
}
} else {
readOthers(in);
}
}

private void readOthers(DataInput in) throws IOException {
backupTimestamp = Text.readString(in);
jobInfo = BackupJobInfo.read(in);
allowLoad = in.readBoolean();
Expand Down
Loading