Skip to content

Commit

Permalink
[fix](backup) Automatic adapt upload/download snapshot batch size (#4…
Browse files Browse the repository at this point in the history
…4560)

The original `backup_upload_task_num_per_be` and
`restore_download_task_num_per_be` would not adapt the different num of
snapshots, a large `UploadTask` or `DownloadTask` might exceed the
threshold of `thrift_max_message_size` and failed forever.

This PR changes these options to adapt the number of snapshots
automatically.
  • Loading branch information
w41ter authored and Your Name committed Nov 26, 2024
1 parent 0af123f commit d53a84b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 42 deletions.
13 changes: 7 additions & 6 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2664,16 +2664,17 @@ public class Config extends ConfigBase {
public static String nereids_trace_log_dir = System.getenv("LOG_DIR") + "/nereids_trace";

@ConfField(mutable = true, masterOnly = true, description = {
"备份过程中,分配给每个be的upload任务最大个数,默认值为3个。",
"The max number of upload tasks assigned to each be during the backup process, the default value is 3."
"备份过程中,一个 upload 任务上传的快照数量上限,默认值为10个",
"The max number of snapshots assigned to a upload task during the backup process, the default value is 10."
})
public static int backup_upload_task_num_per_be = 3;
public static int backup_upload_snapshot_batch_size = 10;

@ConfField(mutable = true, masterOnly = true, description = {
"恢复过程中,分配给每个be的download任务最大个数,默认值为3个。",
"The max number of download tasks assigned to each be during the restore process, the default value is 3."
"恢复过程中,一个 download 任务下载的快照数量上限,默认值为10个",
"The max number of snapshots assigned to a download task during the restore process, "
+ "the default value is 10."
})
public static int restore_download_task_num_per_be = 3;
public static int restore_download_snapshot_batch_size = 10;

@ConfField(mutable = true, masterOnly = true, description = {
"备份恢复过程中,单次 RPC 分配给每个be的任务最大个数,默认值为10000个。",
Expand Down
17 changes: 6 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -755,13 +755,10 @@ private void uploadSnapshot() {
for (Long beId : beToSnapshots.keySet()) {
List<SnapshotInfo> infos = beToSnapshots.get(beId);
int totalNum = infos.size();
int batchNum = totalNum;
if (Config.backup_upload_task_num_per_be > 0) {
batchNum = Math.min(totalNum, Config.backup_upload_task_num_per_be);
}
// each task contains several upload sub tasks
int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
LOG.info("backend {} has {} batch, total {} tasks, {}", beId, batchNum, totalNum, this);
int taskNumPerBatch = Config.backup_upload_snapshot_batch_size;
LOG.info("backend {} has total {} snapshots, per task batch size {}, {}",
beId, totalNum, taskNumPerBatch, this);

List<FsBroker> brokers = Lists.newArrayList();
Status st = repo.getBrokerAddress(beId, env, brokers);
Expand All @@ -772,12 +769,10 @@ private void uploadSnapshot() {
Preconditions.checkState(brokers.size() == 1);

// allot tasks
int index = 0;
for (int batch = 0; batch < batchNum; batch++) {
for (int index = 0; index < totalNum; index += taskNumPerBatch) {
Map<String, String> srcToDest = Maps.newHashMap();
int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch;
for (int j = 0; j < currentBatchTaskNum; j++) {
SnapshotInfo info = infos.get(index++);
for (int j = 0; j < taskNumPerBatch && index + j < totalNum; j++) {
SnapshotInfo info = infos.get(index + j);
String src = info.getTabletPath();
String dest = repo.getRepoTabletPathBySnapshotInfo(label, info);
if (dest == null) {
Expand Down
37 changes: 12 additions & 25 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -1652,16 +1652,10 @@ private void downloadRemoteSnapshots() {
for (Long beId : beToSnapshots.keySet()) {
List<SnapshotInfo> beSnapshotInfos = beToSnapshots.get(beId);
int totalNum = beSnapshotInfos.size();
int batchNum = totalNum;
if (Config.restore_download_task_num_per_be > 0) {
batchNum = Math.min(totalNum, Config.restore_download_task_num_per_be);
}
// each task contains several upload sub tasks
int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
if (LOG.isDebugEnabled()) {
LOG.debug("backend {} has {} batch, total {} tasks, {}",
beId, batchNum, totalNum, this);
}
int taskNumPerBatch = Config.restore_download_snapshot_batch_size;
LOG.info("backend {} has total {} snapshots, per task batch size {}, {}",
beId, totalNum, taskNumPerBatch, this);

List<FsBroker> brokerAddrs = null;
brokerAddrs = Lists.newArrayList();
Expand All @@ -1673,12 +1667,10 @@ private void downloadRemoteSnapshots() {
Preconditions.checkState(brokerAddrs.size() == 1);

// allot tasks
int index = 0;
for (int batch = 0; batch < batchNum; batch++) {
for (int index = 0; index < totalNum; index += taskNumPerBatch) {
Map<String, String> srcToDest = Maps.newHashMap();
int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch;
for (int j = 0; j < currentBatchTaskNum; j++) {
SnapshotInfo info = beSnapshotInfos.get(index++);
for (int j = 0; j < taskNumPerBatch && index + j < totalNum; j++) {
SnapshotInfo info = beSnapshotInfos.get(index + j);
Table tbl = db.getTableNullable(info.getTblId());
if (tbl == null) {
status = new Status(ErrCode.NOT_FOUND, "restored table "
Expand Down Expand Up @@ -1812,22 +1804,17 @@ private void downloadLocalSnapshots() {
for (Long beId : beToSnapshots.keySet()) {
List<SnapshotInfo> beSnapshotInfos = beToSnapshots.get(beId);
int totalNum = beSnapshotInfos.size();
int batchNum = totalNum;
if (Config.restore_download_task_num_per_be > 0) {
batchNum = Math.min(totalNum, Config.restore_download_task_num_per_be);
}
// each task contains several upload sub tasks
int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
int taskNumPerBatch = Config.restore_download_snapshot_batch_size;
LOG.info("backend {} has total {} snapshots, per task batch size {}, {}",
beId, totalNum, taskNumPerBatch, this);

// allot tasks
int index = 0;
for (int batch = 0; batch < batchNum; batch++) {
for (int index = 0; index < totalNum; index += taskNumPerBatch) {
List<TRemoteTabletSnapshot> remoteTabletSnapshots = Lists.newArrayList();
int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch;
for (int j = 0; j < currentBatchTaskNum; j++) {
for (int j = 0; j < taskNumPerBatch && index + j < totalNum; j++) {
TRemoteTabletSnapshot remoteTabletSnapshot = new TRemoteTabletSnapshot();

SnapshotInfo info = beSnapshotInfos.get(index++);
SnapshotInfo info = beSnapshotInfos.get(index + j);
Table tbl = db.getTableNullable(info.getTblId());
if (tbl == null) {
status = new Status(ErrCode.NOT_FOUND, "restored table "
Expand Down

0 comments on commit d53a84b

Please sign in to comment.