diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 509a266365c7ca..8aedeb091f346a 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2418,6 +2418,13 @@ public class Config extends ConfigBase { }) public static int restore_download_task_num_per_be = 3; + @ConfField(mutable = true, masterOnly = true, description = { + "备份恢复过程中,单次 RPC 分配给每个be的任务最大个数,默认值为10000个。", + "The max number of batched tasks per RPC assigned to each be during the backup/restore process, " + + "the default value is 10000." + }) + public static int backup_restore_batch_task_num_per_rpc = 10000; + @ConfField(description = {"是否开启通过http接口获取log文件的功能", "Whether to enable the function of getting log files through http interface"}) public static boolean enable_get_log_file_api = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 9ee4f6d0603ea4..4315ad8ee4cab9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -209,11 +209,10 @@ private synchronized boolean tryNewTabletSnapshotTask(SnapshotTask task) { task.getIndexId(), task.getTabletId(), task.getVersion(), task.getSchemaHash(), timeoutMs, false /* not restore task */); - AgentBatchTask batchTask = new AgentBatchTask(); - batchTask.addTask(newTask); unfinishedTaskIds.put(tablet.getId(), replica.getBackendId()); //send task + AgentBatchTask batchTask = new AgentBatchTask(newTask); AgentTaskQueue.addTask(newTask); AgentTaskExecutor.submit(batchTask); @@ -447,7 +446,7 @@ private void prepareAndSendSnapshotTask() { // copy all related schema at this moment List copiedTables = Lists.newArrayList(); List copiedResources = Lists.newArrayList(); - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (TableRef tableRef : tableRefs) { String tblName = tableRef.getName().getTbl(); Table tbl = db.getTableNullable(tblName); @@ -695,7 +694,7 @@ private void uploadSnapshot() { beToSnapshots.put(info.getBeId(), info); } - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (Long beId : beToSnapshots.keySet()) { List infos = beToSnapshots.get(beId); int totalNum = infos.size(); @@ -851,7 +850,7 @@ private void releaseSnapshots() { } // we do not care about the release snapshot tasks' success or failure, // the GC thread on BE will sweep the snapshot, finally. - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (SnapshotInfo info : snapshotInfos.values()) { ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, info.getBeId(), info.getDbId(), info.getTabletId(), info.getPath()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 173af1c25f343c..fb96fdbc0bc138 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -859,7 +859,7 @@ private void checkAndPrepareMeta() { AgentBatchTask batchTask = batchTaskPerTable.get(localTbl.getId()); if (batchTask == null) { - batchTask = new AgentBatchTask(); + batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); batchTaskPerTable.put(localTbl.getId(), batchTask); } createReplicas(db, batchTask, localTbl, restorePart); @@ -875,7 +875,7 @@ private void checkAndPrepareMeta() { for (Partition restorePart : restoreOlapTable.getPartitions()) { AgentBatchTask batchTask = batchTaskPerTable.get(restoreTbl.getId()); if (batchTask == null) { - batchTask = new AgentBatchTask(); + batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); batchTaskPerTable.put(restoreTbl.getId(), batchTask); } createReplicas(db, batchTask, restoreOlapTable, restorePart, tabletBases); @@ -1128,7 +1128,7 @@ private void prepareAndSendSnapshotTaskForOlapTable(Database db) { taskProgress.clear(); taskErrMsg.clear(); Multimap bePathsMap = HashMultimap.create(); - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); db.readLock(); try { for (Map.Entry entry : fileMapping.getMapping().entrySet()) { @@ -1592,7 +1592,7 @@ private void downloadRemoteSnapshots() { unfinishedSignatureToId.clear(); taskProgress.clear(); taskErrMsg.clear(); - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (long dbId : dbToSnapshotInfos.keySet()) { List infos = dbToSnapshotInfos.get(dbId); @@ -1745,7 +1745,7 @@ private void downloadLocalSnapshots() { unfinishedSignatureToId.clear(); taskProgress.clear(); taskErrMsg.clear(); - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (long dbId : dbToSnapshotInfos.keySet()) { List infos = dbToSnapshotInfos.get(dbId); @@ -1922,7 +1922,7 @@ private void commit() { unfinishedSignatureToId.clear(); taskProgress.clear(); taskErrMsg.clear(); - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); // tablet id->(be id -> download info) for (Cell cell : snapshotInfos.cellSet()) { SnapshotInfo info = cell.getValue(); @@ -2111,7 +2111,7 @@ private void releaseSnapshots() { } // we do not care about the release snapshot tasks' success or failure, // the GC thread on BE will sweep the snapshot, finally. - AgentBatchTask batchTask = new AgentBatchTask(); + AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc); for (SnapshotInfo info : snapshotInfos.values()) { ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, info.getBeId(), info.getDbId(), info.getTabletId(), info.getPath()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java index 848211f941356e..b1839400d319d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -51,6 +51,7 @@ import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; import java.util.HashMap; import java.util.LinkedList; @@ -63,6 +64,8 @@ public class AgentBatchTask implements Runnable { private static final Logger LOG = LogManager.getLogger(AgentBatchTask.class); + private int batchSize = Integer.MAX_VALUE; + // backendId -> AgentTask List private Map> backendIdToTasks; @@ -70,6 +73,12 @@ public AgentBatchTask() { this.backendIdToTasks = new HashMap>(); } + public AgentBatchTask(int batchSize) { + this.backendIdToTasks = new HashMap>(); + this.batchSize = batchSize; + assert batchSize > 0; + } + public AgentBatchTask(AgentTask singleTask) { this(); addTask(singleTask); @@ -168,14 +177,12 @@ public void run() { List agentTaskRequests = new LinkedList(); for (AgentTask task : tasks) { agentTaskRequests.add(toAgentTaskRequest(task)); - } - client.submitTasks(agentTaskRequests); - if (LOG.isDebugEnabled()) { - for (AgentTask task : tasks) { - LOG.debug("send task: type[{}], backend[{}], signature[{}]", - task.getTaskType(), backendId, task.getSignature()); + if (agentTaskRequests.size() >= batchSize) { + submitTasks(backendId, client, agentTaskRequests); + agentTaskRequests.clear(); } } + submitTasks(backendId, client, agentTaskRequests); ok = true; } catch (Exception e) { LOG.warn("task exec error. backend[{}]", backendId, e); @@ -194,6 +201,19 @@ public void run() { } // end for backend } + private static void submitTasks(long backendId, + BackendService.Client client, List agentTaskRequests) throws TException { + if (!agentTaskRequests.isEmpty()) { + client.submitTasks(agentTaskRequests); + } + if (LOG.isDebugEnabled()) { + for (TAgentTaskRequest req : agentTaskRequests) { + LOG.debug("send task: type[{}], backend[{}], signature[{}]", + req.getTaskType(), backendId, req.getSignature()); + } + } + } + private TAgentTaskRequest toAgentTaskRequest(AgentTask task) { TAgentTaskRequest tAgentTaskRequest = new TAgentTaskRequest(); tAgentTaskRequest.setProtocolVersion(TAgentServiceVersion.V1);