Skip to content

Commit

Permalink
[improve](restore) Compress backup/restore job log size by compress
Browse files Browse the repository at this point in the history
ref #42459
  • Loading branch information
w41ter committed Oct 25, 2024
1 parent be56e40 commit 13090c8
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 5 deletions.
16 changes: 16 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,22 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_backup_restore_job_num_per_db = 10;

/**
* A internal config, to reduce the restore job size during serialization by compress.
*
* WARNING: Once this option is enabled and a restore is performed, the FE version cannot be rolled back.
*/
@ConfField(mutable = false)
public static boolean restore_job_compressed_serialization = false;

/**
* A internal config, to reduce the backup job size during serialization by compress.
*
* WARNING: Once this option is enabled and a backup is performed, the FE version cannot be rolled back.
*/
@ConfField(mutable = false)
public static boolean backup_job_compressed_serialization = false;

/**
* Control the max num of tablets per backup job involved.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public abstract class AbstractJob implements Writable {

public enum JobType {
BACKUP, RESTORE
BACKUP, RESTORE, BACKUP_COMPRESSED, RESTORE_COMPRESSED
}

protected JobType type;
Expand Down Expand Up @@ -158,10 +158,10 @@ public void setTypeRead(boolean isTypeRead) {
public static AbstractJob read(DataInput in) throws IOException {
AbstractJob job = null;
JobType type = JobType.valueOf(Text.readString(in));
if (type == JobType.BACKUP) {
job = new BackupJob();
} else if (type == JobType.RESTORE) {
job = new RestoreJob();
if (type == JobType.BACKUP || type == JobType.BACKUP_COMPRESSED) {
job = new BackupJob(type);
} else if (type == JobType.RESTORE || type == JobType.RESTORE_COMPRESSED) {
job = new RestoreJob(type);
} else {
throw new IOException("Unknown job type: " + type.name());
}
Expand Down
54 changes: 54 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
Expand All @@ -74,6 +77,8 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;


public class BackupJob extends AbstractJob {
Expand Down Expand Up @@ -124,6 +129,11 @@ public BackupJob() {
super(JobType.BACKUP);
}

public BackupJob(JobType jobType) {
super(jobType);
assert jobType == JobType.BACKUP || jobType == JobType.BACKUP_COMPRESSED;
}

public BackupJob(String label, long dbId, String dbName, List<TableRef> tableRefs, long timeoutMs,
BackupContent content, Env env, long repoId) {
super(JobType.BACKUP, label, dbId, dbName, timeoutMs, env, repoId);
Expand Down Expand Up @@ -1001,8 +1011,32 @@ public static BackupJob read(DataInput in) throws IOException {

@Override
public void write(DataOutput out) throws IOException {
if (Config.backup_job_compressed_serialization) {
type = JobType.BACKUP_COMPRESSED;
}
super.write(out);
if (Config.backup_job_compressed_serialization) {
type = JobType.BACKUP;

int written = 0;
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
try (GZIPOutputStream gzipStream = new GZIPOutputStream(byteStream)) {
try (DataOutputStream stream = new DataOutputStream(gzipStream)) {
writeOthers(out);
written = stream.size();
}
}
Text text = new Text(byteStream.toByteArray());
if (LOG.isDebugEnable() || text.getLength() > (50 << 20)) {
LOG.info("backup job written size {}, compressed size {}", written, text.getLength());
}
text.write(out);
} else {
writeOthers(out);
}
}

public void writeOthers(DataOutput out) throws IOException {
// table refs
out.writeInt(tableRefs.size());
for (TableRef tblRef : tableRefs) {
Expand Down Expand Up @@ -1057,7 +1091,27 @@ public void write(DataOutput out) throws IOException {

public void readFields(DataInput in) throws IOException {
super.readFields(in);
if (type == JobType.BACKUP_COMPRESSED) {
type = JobType.BACKUP;

Text text = new Text();
text.readFields(in);
if (LOG.isDebugEnable() || text.getLength() > (50 << 20)) {
LOG.info("read backup job, compressed size {}", text.getLength());
}

ByteArrayInputStream byteStream = new ByteArrayInputStream();
try (GZIPInputStream gzipStream = new GZIPInputStream(byteStream)) {
try (DataInputStream stream = new DataInputStream(gzipStream)) {
readOthers(stream);
}
}
} else {
readOthers(in);
}
}

public void readOthers(DataInput in) throws IOException {
// table refs
int size = in.readInt();
tableRefs = Lists.newArrayList();
Expand Down
56 changes: 56 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,21 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class RestoreJob extends AbstractJob {
private static final String PROP_RESERVE_REPLICA = RestoreStmt.PROP_RESERVE_REPLICA;
Expand Down Expand Up @@ -195,6 +201,11 @@ public RestoreJob() {
super(JobType.RESTORE);
}

public RestoreJob(JobType jobType) {
super(jobType);
assert jobType == JobType.RESTORE || jobType == JobType.RESTORE_COMPRESSED;
}

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables,
Expand Down Expand Up @@ -2421,8 +2432,32 @@ public static RestoreJob read(DataInput in) throws IOException {

@Override
public void write(DataOutput out) throws IOException {
if (Config.restore_job_compressed_serialization) {
type = JobType.RESTORE_COMPRESSED;
}
super.write(out);
if (Config.restore_job_compressed_serialization) {
type = JobType.RESTORE;

int written = 0;
ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) {
try (DataOutputStream stream = new DataOutputStream(gzipStream)) {
writeOthers(stream);
written = stream.size();
}
}
Text text = new Text(bytesStream.toByteArray());
if (LOG.isDebugEnabled() || text.getLength() > (50 << 20)) {
LOG.info("restore job written size {}, compressed size {}", written, text.getLength());
}
text.write(out);
} else {
writeOthers(out);
}
}

private void writeOthers(DataOutput out) throws IOException {
Text.writeString(out, backupTimestamp);
jobInfo.write(out);
out.writeBoolean(allowLoad);
Expand Down Expand Up @@ -2496,6 +2531,27 @@ public void write(DataOutput out) throws IOException {
public void readFields(DataInput in) throws IOException {
super.readFields(in);

if (type == JobType.RESTORE_COMPRESSED) {
type = JobType.RESTORE;

Text text = new Text();
text.readFields(in);
if (LOG.isDebugEnabled() || text.getLength() > (50 << 20)) {
LOG.info("read restore job, compressed size {}", text.getLength());
}

ByteArrayInputStream bytesStream = new ByteArrayInputStream(text.getBytes());
try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
try (DataInputStream stream = new DataInputStream(gzipStream)) {
readOthers(stream);
}
}
} else {
readOthers(in);
}
}

private void readOthers(DataInput in) throws IOException {
backupTimestamp = Text.readString(in);
jobInfo = BackupJobInfo.read(in);
allowLoad = in.readBoolean();
Expand Down

0 comments on commit 13090c8

Please sign in to comment.