Skip to content

Commit

Permalink
[improve](restore) Split creating replica task by table id (#42239)
Browse files Browse the repository at this point in the history
pick #42235
  • Loading branch information
w41ter authored Oct 23, 2024
1 parent a21bb80 commit c1bc061
Showing 1 changed file with 26 additions and 10 deletions.
36 changes: 26 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ private void checkAndPrepareMeta() {
Map<Long, TabletRef> tabletBases = new HashMap<>();

// Check and prepare meta objects.
AgentBatchTask batchTask = new AgentBatchTask();
Map<Long, AgentBatchTask> batchTaskPerTable = new HashMap<>();
db.readLock();
try {
for (Map.Entry<String, BackupOlapTableInfo> olapTableEntry : jobInfo.backupOlapTableObjects.entrySet()) {
Expand Down Expand Up @@ -846,6 +846,11 @@ private void checkAndPrepareMeta() {
BackupPartitionInfo backupPartitionInfo
= jobInfo.getOlapTableInfo(entry.first).getPartInfo(restorePart.getName());

AgentBatchTask batchTask = batchTaskPerTable.get(localTbl.getId());
if (batchTask == null) {
batchTask = new AgentBatchTask();
batchTaskPerTable.put(localTbl.getId(), batchTask);
}
createReplicas(db, batchTask, localTbl, restorePart);

genFileMapping(localTbl, restorePart, remoteTbl.getId(), backupPartitionInfo,
Expand All @@ -857,6 +862,11 @@ private void checkAndPrepareMeta() {
if (restoreTbl.getType() == TableType.OLAP) {
OlapTable restoreOlapTable = (OlapTable) restoreTbl;
for (Partition restorePart : restoreOlapTable.getPartitions()) {
AgentBatchTask batchTask = batchTaskPerTable.get(restoreTbl.getId());
if (batchTask == null) {
batchTask = new AgentBatchTask();
batchTaskPerTable.put(restoreTbl.getId(), batchTask);
}
createReplicas(db, batchTask, restoreOlapTable, restorePart, tabletBases);
BackupOlapTableInfo backupOlapTableInfo = jobInfo.getOlapTableInfo(restoreOlapTable.getName());
genFileMapping(restoreOlapTable, restorePart, backupOlapTableInfo.id,
Expand Down Expand Up @@ -890,20 +900,26 @@ private void checkAndPrepareMeta() {

// Send create replica task to BE outside the db lock
boolean ok = false;
MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(batchTask.getTaskNum());
if (batchTask.getTaskNum() > 0) {
for (AgentTask task : batchTask.getAllTasks()) {
latch.addMark(task.getBackendId(), task.getTabletId());
((CreateReplicaTask) task).setLatch(latch);
AgentTaskQueue.addTask(task);
int numBatchTasks = batchTaskPerTable.values()
.stream()
.mapToInt(AgentBatchTask::getTaskNum)
.sum();
MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(numBatchTasks);
if (batchTaskPerTable.size() > 0) {
for (AgentBatchTask batchTask : batchTaskPerTable.values()) {
for (AgentTask task : batchTask.getAllTasks()) {
latch.addMark(task.getBackendId(), task.getTabletId());
((CreateReplicaTask) task).setLatch(latch);
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
}
AgentTaskExecutor.submit(batchTask);

// estimate timeout
long timeout = DbUtil.getCreateReplicasTimeoutMs(batchTask.getTaskNum());
long timeout = DbUtil.getCreateReplicasTimeoutMs(numBatchTasks);
try {
LOG.info("begin to send create replica tasks to BE for restore. total {} tasks. timeout: {}",
batchTask.getTaskNum(), timeout);
numBatchTasks, timeout);
ok = latch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("InterruptedException: ", e);
Expand Down

0 comments on commit c1bc061

Please sign in to comment.