Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Nov 29, 2024
1 parent 51ad321 commit 0f18510
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 79 deletions.
118 changes: 118 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.qe.runtime.BackendFragmentId;
import org.apache.doris.qe.runtime.MultiFragmentsPipelineTask;
import org.apache.doris.qe.runtime.PipelineExecutionTask;
import org.apache.doris.qe.runtime.SingleFragmentPipelineTask;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;

/** AbstractJobProcessor */
public abstract class AbstractJobProcessor implements JobProcessor {
private final Logger logger = LogManager.getLogger(getClass());

protected final CoordinatorContext coordinatorContext;
protected volatile Optional<PipelineExecutionTask> executionTask;
protected volatile Optional<Map<BackendFragmentId, SingleFragmentPipelineTask>> backendFragmentTasks;

public AbstractJobProcessor(CoordinatorContext coordinatorContext) {
this.coordinatorContext = Objects.requireNonNull(coordinatorContext, "coordinatorContext can not be null");
this.executionTask = Optional.empty();
this.backendFragmentTasks = Optional.empty();
}

protected abstract void doProcessReportExecStatus(
TReportExecStatusParams params, SingleFragmentPipelineTask fragmentTask);

@Override
public final void setPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask) {
Preconditions.checkArgument(pipelineExecutionTask != null, "sqlPipelineTask can not be null");

this.executionTask = Optional.of(pipelineExecutionTask);
Map<BackendFragmentId, SingleFragmentPipelineTask> backendFragmentTasks
= buildBackendFragmentTasks(pipelineExecutionTask);
this.backendFragmentTasks = Optional.of(backendFragmentTasks);

afterSetPipelineExecutionTask(pipelineExecutionTask);
}

protected void afterSetPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask) {}

@Override
public final void updateFragmentExecStatus(TReportExecStatusParams params) {
SingleFragmentPipelineTask fragmentTask = backendFragmentTasks.get().get(
new BackendFragmentId(params.getBackendId(), params.getFragmentId()));
if (fragmentTask == null || !fragmentTask.processReportExecStatus(params)) {
return;
}

TUniqueId queryId = coordinatorContext.queryId;
Status status = new Status(params.status);
// for now, abort the query if we see any error except if the error is cancelled
// and returned_all_results_ is true.
// (UpdateStatus() initiates cancellation, if it hasn't already been initiated)
if (!status.ok()) {
if (coordinatorContext.isEos() && status.isCancelled()) {
logger.warn("Query {} has returned all results, fragment_id={} instance_id={}, be={}"
+ " is reporting failed status {}",
DebugUtil.printId(queryId), params.getFragmentId(),
DebugUtil.printId(params.getFragmentInstanceId()),
params.getBackendId(),
status.toString());
} else {
logger.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={},"
+ " error message: {}",
DebugUtil.printId(queryId), params.getFragmentId(),
DebugUtil.printId(params.getFragmentInstanceId()),
params.getBackendId(), status.toString());
coordinatorContext.updateStatusIfOk(status);
}
}
doProcessReportExecStatus(params, fragmentTask);
}

private Map<BackendFragmentId, SingleFragmentPipelineTask> buildBackendFragmentTasks(
PipelineExecutionTask executionTask) {
ImmutableMap.Builder<BackendFragmentId, SingleFragmentPipelineTask> backendFragmentTasks
= ImmutableMap.builder();
for (Entry<Long, MultiFragmentsPipelineTask> backendTask : executionTask.getChildrenTasks().entrySet()) {
Long backendId = backendTask.getKey();
for (Entry<Integer, SingleFragmentPipelineTask> fragmentIdToTask : backendTask.getValue()
.getChildrenTasks().entrySet()) {
Integer fragmentId = fragmentIdToTask.getKey();
SingleFragmentPipelineTask fragmentTask = fragmentIdToTask.getValue();
backendFragmentTasks.put(new BackendFragmentId(backendId, fragmentId), fragmentTask);
}
}
return backendFragmentTasks.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

import org.apache.doris.common.Status;
import org.apache.doris.qe.runtime.PipelineExecutionTask;
import org.apache.doris.thrift.TReportExecStatusParams;

public interface JobProcessor {
void setSqlPipelineTask(PipelineExecutionTask pipelineExecutionTask);
void setPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask);

void cancel(Status cancelReason);

void updateFragmentExecStatus(TReportExecStatusParams params);
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,7 @@ public boolean isDone() {

@Override
public void updateFragmentExecStatus(TReportExecStatusParams params) {
JobProcessor jobProcessor = coordinatorContext.getJobProcessor();
if (jobProcessor instanceof LoadProcessor) {
coordinatorContext.asLoadProcessor().updateFragmentExecStatus(params);
}
coordinatorContext.getJobProcessor().updateFragmentExecStatus(params);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,26 @@
import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.AbstractJobProcessor;
import org.apache.doris.qe.CoordinatorContext;
import org.apache.doris.qe.JobProcessor;
import org.apache.doris.qe.LoadContext;
import org.apache.doris.thrift.TFragmentInstanceReport;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

public class LoadProcessor implements JobProcessor {
public class LoadProcessor extends AbstractJobProcessor {
private static final Logger LOG = LogManager.getLogger(LoadProcessor.class);

public final CoordinatorContext coordinatorContext;
public final LoadContext loadContext;
public final long jobId;

Expand All @@ -61,7 +56,8 @@ public class LoadProcessor implements JobProcessor {
private volatile List<SingleFragmentPipelineTask> topFragmentTasks;

public LoadProcessor(CoordinatorContext coordinatorContext, long jobId) {
this.coordinatorContext = Objects.requireNonNull(coordinatorContext, "coordinatorContext can not be null");
super(coordinatorContext);

this.loadContext = new LoadContext();
this.executionTask = Optional.empty();
this.latch = Optional.empty();
Expand All @@ -87,14 +83,8 @@ public LoadProcessor(CoordinatorContext coordinatorContext, long jobId) {
}

@Override
public void setSqlPipelineTask(PipelineExecutionTask pipelineExecutionTask) {
Preconditions.checkArgument(pipelineExecutionTask != null, "sqlPipelineTask can not be null");

this.executionTask = Optional.of(pipelineExecutionTask);
Map<BackendFragmentId, SingleFragmentPipelineTask> backendFragmentTasks
= buildBackendFragmentTasks(pipelineExecutionTask);
this.backendFragmentTasks = Optional.of(backendFragmentTasks);

protected void afterSetPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask) {
Map<BackendFragmentId, SingleFragmentPipelineTask> backendFragmentTasks = this.backendFragmentTasks.get();
MarkedCountDownLatch<Integer, Long> latch = new MarkedCountDownLatch<>(backendFragmentTasks.size());
for (BackendFragmentId backendFragmentId : backendFragmentTasks.keySet()) {
latch.addMark(backendFragmentId.fragmentId, backendFragmentId.backendId);
Expand Down Expand Up @@ -168,34 +158,9 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return latch.get().await(timeout, unit);
}

public void updateFragmentExecStatus(TReportExecStatusParams params) {
SingleFragmentPipelineTask fragmentTask = backendFragmentTasks.get().get(
new BackendFragmentId(params.getBackendId(), params.getFragmentId()));
if (fragmentTask == null || !fragmentTask.processReportExecStatus(params)) {
return;
}
TUniqueId queryId = coordinatorContext.queryId;
Status status = new Status(params.status);
// for now, abort the query if we see any error except if the error is cancelled
// and returned_all_results_ is true.
// (UpdateStatus() initiates cancellation, if it hasn't already been initiated)
if (!status.ok()) {
if (coordinatorContext.isEos() && status.isCancelled()) {
LOG.warn("Query {} has returned all results, fragment_id={} instance_id={}, be={}"
+ " is reporting failed status {}",
DebugUtil.printId(queryId), params.getFragmentId(),
DebugUtil.printId(params.getFragmentInstanceId()),
params.getBackendId(),
status.toString());
} else {
LOG.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={},"
+ " error message: {}",
DebugUtil.printId(queryId), params.getFragmentId(),
DebugUtil.printId(params.getFragmentInstanceId()),
params.getBackendId(), status.toString());
coordinatorContext.updateStatusIfOk(status);
}
}

@Override
protected void doProcessReportExecStatus(TReportExecStatusParams params, SingleFragmentPipelineTask fragmentTask) {
LoadContext loadContext = coordinatorContext.asLoadProcessor().loadContext;
if (params.isSetDeltaUrls()) {
loadContext.updateDeltaUrls(params.getDeltaUrls());
Expand Down Expand Up @@ -234,7 +199,7 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
if (fragmentTask.isDone()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Query {} fragment {} is marked done",
DebugUtil.printId(queryId), params.getFragmentId());
DebugUtil.printId(coordinatorContext.queryId), params.getFragmentId());
}
latch.get().markedCountDown(params.getFragmentId(), params.getBackendId());
}
Expand All @@ -258,22 +223,6 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
}
}

private Map<BackendFragmentId, SingleFragmentPipelineTask> buildBackendFragmentTasks(
PipelineExecutionTask executionTask) {
ImmutableMap.Builder<BackendFragmentId, SingleFragmentPipelineTask> backendFragmentTasks
= ImmutableMap.builder();
for (Entry<Long, MultiFragmentsPipelineTask> backendTask : executionTask.getChildrenTasks().entrySet()) {
Long backendId = backendTask.getKey();
for (Entry<Integer, SingleFragmentPipelineTask> fragmentIdToTask : backendTask.getValue()
.getChildrenTasks().entrySet()) {
Integer fragmentId = fragmentIdToTask.getKey();
SingleFragmentPipelineTask fragmentTask = fragmentIdToTask.getValue();
backendFragmentTasks.put(new BackendFragmentId(backendId, fragmentId), fragmentTask);
}
}
return backendFragmentTasks.build();
}

/*
* Check the state of backends in needCheckBackendExecStates.
* return true if all of them are OK. Otherwise, return false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void execute() throws Exception {

@Override
public String toString() {
return "SqlPipelineTask(\n"
return "PipelineExecutionTask(\n"
+ childrenTasks.allTasks()
.stream()
.map(multiFragmentsPipelineTask -> " " + multiFragmentsPipelineTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private PipelineExecutionTask buildTask(CoordinatorContext coordinatorContext,
backendServiceProxy,
buildMultiFragmentTasks(coordinatorContext, backendServiceProxy, workerToFragmentsParam)
);
coordinatorContext.getJobProcessor().setSqlPipelineTask(pipelineExecutionTask);
coordinatorContext.getJobProcessor().setPipelineExecutionTask(pipelineExecutionTask);
return pipelineExecutionTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.ResultSink;
import org.apache.doris.qe.AbstractJobProcessor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.CoordinatorContext;
import org.apache.doris.qe.JobProcessor;
import org.apache.doris.qe.ResultReceiver;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.base.Strings;
Expand All @@ -44,33 +45,28 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;

public class QueryProcessor implements JobProcessor {
public class QueryProcessor extends AbstractJobProcessor {
private static final Logger LOG = LogManager.getLogger(QueryProcessor.class);

// constant fields
private final long limitRows;

// mutable field
private Optional<PipelineExecutionTask> sqlPipelineTask;
private final CoordinatorContext coordinatorContext;
private final List<ResultReceiver> runningReceivers;
private int receiverOffset;
private long numReceivedRows;

public QueryProcessor(CoordinatorContext coordinatorContext, List<ResultReceiver> runningReceivers) {
this.coordinatorContext = Objects.requireNonNull(coordinatorContext, "coordinatorContext can not be null");
super(coordinatorContext);
this.runningReceivers = new CopyOnWriteArrayList<>(
Objects.requireNonNull(runningReceivers, "runningReceivers can not be null")
);

this.limitRows = coordinatorContext.fragments.get(coordinatorContext.fragments.size() - 1)
.getPlanRoot()
.getLimit();

this.sqlPipelineTask = Optional.empty();
}

public static QueryProcessor build(CoordinatorContext coordinatorContext) {
Expand Down Expand Up @@ -109,8 +105,8 @@ public static QueryProcessor build(CoordinatorContext coordinatorContext) {
}

@Override
public void setSqlPipelineTask(PipelineExecutionTask pipelineExecutionTask) {
this.sqlPipelineTask = Optional.ofNullable(pipelineExecutionTask);
protected void doProcessReportExecStatus(TReportExecStatusParams params, SingleFragmentPipelineTask fragmentTask) {

}

public boolean isEos() {
Expand Down Expand Up @@ -178,7 +174,7 @@ public void cancel(Status cancelReason) {
receiver.cancel(cancelReason);
}

this.sqlPipelineTask.ifPresent(sqlPipelineTask -> {
this.executionTask.ifPresent(sqlPipelineTask -> {
for (MultiFragmentsPipelineTask fragmentsTask : sqlPipelineTask.getChildrenTasks().values()) {
fragmentsTask.cancelExecute(cancelReason);
}
Expand Down

0 comments on commit 0f18510

Please sign in to comment.