Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve](task) Support splitting agent batch tasks automatically #42703 #42987

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2418,6 +2418,13 @@ public class Config extends ConfigBase {
})
public static int restore_download_task_num_per_be = 3;

@ConfField(mutable = true, masterOnly = true, description = {
"备份恢复过程中,单次 RPC 分配给每个be的任务最大个数,默认值为10000个。",
"The max number of batched tasks per RPC assigned to each be during the backup/restore process, "
+ "the default value is 10000."
})
public static int backup_restore_batch_task_num_per_rpc = 10000;

@ConfField(description = {"是否开启通过http接口获取log文件的功能",
"Whether to enable the function of getting log files through http interface"})
public static boolean enable_get_log_file_api = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,10 @@ private synchronized boolean tryNewTabletSnapshotTask(SnapshotTask task) {
task.getIndexId(), task.getTabletId(),
task.getVersion(),
task.getSchemaHash(), timeoutMs, false /* not restore task */);
AgentBatchTask batchTask = new AgentBatchTask();
batchTask.addTask(newTask);
unfinishedTaskIds.put(tablet.getId(), replica.getBackendId());

//send task
AgentBatchTask batchTask = new AgentBatchTask(newTask);
AgentTaskQueue.addTask(newTask);
AgentTaskExecutor.submit(batchTask);

Expand Down Expand Up @@ -447,7 +446,7 @@ private void prepareAndSendSnapshotTask() {
// copy all related schema at this moment
List<Table> copiedTables = Lists.newArrayList();
List<Resource> copiedResources = Lists.newArrayList();
AgentBatchTask batchTask = new AgentBatchTask();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (TableRef tableRef : tableRefs) {
String tblName = tableRef.getName().getTbl();
Table tbl = db.getTableNullable(tblName);
Expand Down Expand Up @@ -695,7 +694,7 @@ private void uploadSnapshot() {
beToSnapshots.put(info.getBeId(), info);
}

AgentBatchTask batchTask = new AgentBatchTask();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (Long beId : beToSnapshots.keySet()) {
List<SnapshotInfo> infos = beToSnapshots.get(beId);
int totalNum = infos.size();
Expand Down Expand Up @@ -851,7 +850,7 @@ private void releaseSnapshots() {
}
// we do not care about the release snapshot tasks' success or failure,
// the GC thread on BE will sweep the snapshot, finally.
AgentBatchTask batchTask = new AgentBatchTask();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (SnapshotInfo info : snapshotInfos.values()) {
ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, info.getBeId(), info.getDbId(),
info.getTabletId(), info.getPath());
Expand Down
14 changes: 7 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ private void checkAndPrepareMeta() {

AgentBatchTask batchTask = batchTaskPerTable.get(localTbl.getId());
if (batchTask == null) {
batchTask = new AgentBatchTask();
batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
batchTaskPerTable.put(localTbl.getId(), batchTask);
}
createReplicas(db, batchTask, localTbl, restorePart);
Expand All @@ -875,7 +875,7 @@ private void checkAndPrepareMeta() {
for (Partition restorePart : restoreOlapTable.getPartitions()) {
AgentBatchTask batchTask = batchTaskPerTable.get(restoreTbl.getId());
if (batchTask == null) {
batchTask = new AgentBatchTask();
batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
batchTaskPerTable.put(restoreTbl.getId(), batchTask);
}
createReplicas(db, batchTask, restoreOlapTable, restorePart, tabletBases);
Expand Down Expand Up @@ -1128,7 +1128,7 @@ private void prepareAndSendSnapshotTaskForOlapTable(Database db) {
taskProgress.clear();
taskErrMsg.clear();
Multimap<Long, Long> bePathsMap = HashMultimap.create();
AgentBatchTask batchTask = new AgentBatchTask();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
db.readLock();
try {
for (Map.Entry<IdChain, IdChain> entry : fileMapping.getMapping().entrySet()) {
Expand Down Expand Up @@ -1592,7 +1592,7 @@ private void downloadRemoteSnapshots() {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
AgentBatchTask batchTask = new AgentBatchTask();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (long dbId : dbToSnapshotInfos.keySet()) {
List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);

Expand Down Expand Up @@ -1745,7 +1745,7 @@ private void downloadLocalSnapshots() {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
AgentBatchTask batchTask = new AgentBatchTask();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (long dbId : dbToSnapshotInfos.keySet()) {
List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);

Expand Down Expand Up @@ -1922,7 +1922,7 @@ private void commit() {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
AgentBatchTask batchTask = new AgentBatchTask();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
// tablet id->(be id -> download info)
for (Cell<Long, Long, SnapshotInfo> cell : snapshotInfos.cellSet()) {
SnapshotInfo info = cell.getValue();
Expand Down Expand Up @@ -2111,7 +2111,7 @@ private void releaseSnapshots() {
}
// we do not care about the release snapshot tasks' success or failure,
// the GC thread on BE will sweep the snapshot, finally.
AgentBatchTask batchTask = new AgentBatchTask();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (SnapshotInfo info : snapshotInfos.values()) {
ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, info.getBeId(), info.getDbId(),
info.getTabletId(), info.getPath());
Expand Down
32 changes: 26 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;

import java.util.HashMap;
import java.util.LinkedList;
Expand All @@ -63,13 +64,21 @@
public class AgentBatchTask implements Runnable {
private static final Logger LOG = LogManager.getLogger(AgentBatchTask.class);

private int batchSize = Integer.MAX_VALUE;

// backendId -> AgentTask List
private Map<Long, List<AgentTask>> backendIdToTasks;

public AgentBatchTask() {
this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
}

public AgentBatchTask(int batchSize) {
this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
this.batchSize = batchSize;
assert batchSize > 0;
}

public AgentBatchTask(AgentTask singleTask) {
this();
addTask(singleTask);
Expand Down Expand Up @@ -168,14 +177,12 @@ public void run() {
List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>();
for (AgentTask task : tasks) {
agentTaskRequests.add(toAgentTaskRequest(task));
}
client.submitTasks(agentTaskRequests);
if (LOG.isDebugEnabled()) {
for (AgentTask task : tasks) {
LOG.debug("send task: type[{}], backend[{}], signature[{}]",
task.getTaskType(), backendId, task.getSignature());
if (agentTaskRequests.size() >= batchSize) {
submitTasks(backendId, client, agentTaskRequests);
agentTaskRequests.clear();
}
}
submitTasks(backendId, client, agentTaskRequests);
ok = true;
} catch (Exception e) {
LOG.warn("task exec error. backend[{}]", backendId, e);
Expand All @@ -194,6 +201,19 @@ public void run() {
} // end for backend
}

private static void submitTasks(long backendId,
BackendService.Client client, List<TAgentTaskRequest> agentTaskRequests) throws TException {
if (!agentTaskRequests.isEmpty()) {
client.submitTasks(agentTaskRequests);
}
if (LOG.isDebugEnabled()) {
for (TAgentTaskRequest req : agentTaskRequests) {
LOG.debug("send task: type[{}], backend[{}], signature[{}]",
req.getTaskType(), backendId, req.getSignature());
}
}
}

private TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
TAgentTaskRequest tAgentTaskRequest = new TAgentTaskRequest();
tAgentTaskRequest.setProtocolVersion(TAgentServiceVersion.V1);
Expand Down
Loading