diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java new file mode 100644 index 00000000000000..2858de25d578fc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.common.Status; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.qe.runtime.BackendFragmentId; +import org.apache.doris.qe.runtime.MultiFragmentsPipelineTask; +import org.apache.doris.qe.runtime.PipelineExecutionTask; +import org.apache.doris.qe.runtime.SingleFragmentPipelineTask; +import org.apache.doris.thrift.TReportExecStatusParams; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; + +/** AbstractJobProcessor */ +public abstract class AbstractJobProcessor implements JobProcessor { + private final Logger logger = LogManager.getLogger(getClass()); + + protected final CoordinatorContext coordinatorContext; + protected volatile Optional executionTask; + protected volatile Optional> backendFragmentTasks; + + public AbstractJobProcessor(CoordinatorContext coordinatorContext) { + this.coordinatorContext = Objects.requireNonNull(coordinatorContext, "coordinatorContext can not be null"); + this.executionTask = Optional.empty(); + this.backendFragmentTasks = Optional.empty(); + } + + protected abstract void doProcessReportExecStatus( + TReportExecStatusParams params, SingleFragmentPipelineTask fragmentTask); + + @Override + public final void setPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask) { + Preconditions.checkArgument(pipelineExecutionTask != null, "sqlPipelineTask can not be null"); + + this.executionTask = Optional.of(pipelineExecutionTask); + Map backendFragmentTasks + = buildBackendFragmentTasks(pipelineExecutionTask); + this.backendFragmentTasks = Optional.of(backendFragmentTasks); + + afterSetPipelineExecutionTask(pipelineExecutionTask); + } + + protected void afterSetPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask) {} + + @Override + public final void updateFragmentExecStatus(TReportExecStatusParams params) { + SingleFragmentPipelineTask fragmentTask = backendFragmentTasks.get().get( + new BackendFragmentId(params.getBackendId(), params.getFragmentId())); + if (fragmentTask == null || !fragmentTask.processReportExecStatus(params)) { + return; + } + + TUniqueId queryId = coordinatorContext.queryId; + Status status = new Status(params.status); + // for now, abort the query if we see any error except if the error is cancelled + // and returned_all_results_ is true. + // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) + if (!status.ok()) { + if (coordinatorContext.isEos() && status.isCancelled()) { + logger.warn("Query {} has returned all results, fragment_id={} instance_id={}, be={}" + + " is reporting failed status {}", + DebugUtil.printId(queryId), params.getFragmentId(), + DebugUtil.printId(params.getFragmentInstanceId()), + params.getBackendId(), + status.toString()); + } else { + logger.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={}," + + " error message: {}", + DebugUtil.printId(queryId), params.getFragmentId(), + DebugUtil.printId(params.getFragmentInstanceId()), + params.getBackendId(), status.toString()); + coordinatorContext.updateStatusIfOk(status); + } + } + doProcessReportExecStatus(params, fragmentTask); + } + + private Map buildBackendFragmentTasks( + PipelineExecutionTask executionTask) { + ImmutableMap.Builder backendFragmentTasks + = ImmutableMap.builder(); + for (Entry backendTask : executionTask.getChildrenTasks().entrySet()) { + Long backendId = backendTask.getKey(); + for (Entry fragmentIdToTask : backendTask.getValue() + .getChildrenTasks().entrySet()) { + Integer fragmentId = fragmentIdToTask.getKey(); + SingleFragmentPipelineTask fragmentTask = fragmentIdToTask.getValue(); + backendFragmentTasks.put(new BackendFragmentId(backendId, fragmentId), fragmentTask); + } + } + return backendFragmentTasks.build(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/JobProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/JobProcessor.java index ede218848c7221..7e4042dde3c3bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/JobProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/JobProcessor.java @@ -19,9 +19,12 @@ import org.apache.doris.common.Status; import org.apache.doris.qe.runtime.PipelineExecutionTask; +import org.apache.doris.thrift.TReportExecStatusParams; public interface JobProcessor { - void setSqlPipelineTask(PipelineExecutionTask pipelineExecutionTask); + void setPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask); void cancel(Status cancelReason); + + void updateFragmentExecStatus(TReportExecStatusParams params); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java index a9d6becc7fafe3..a6f24806ed74aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java @@ -233,10 +233,7 @@ public boolean isDone() { @Override public void updateFragmentExecStatus(TReportExecStatusParams params) { - JobProcessor jobProcessor = coordinatorContext.getJobProcessor(); - if (jobProcessor instanceof LoadProcessor) { - coordinatorContext.asLoadProcessor().updateFragmentExecStatus(params); - } + coordinatorContext.getJobProcessor().updateFragmentExecStatus(params); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java index 3a448521fca0bf..fb32919d834a54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java @@ -24,46 +24,39 @@ import org.apache.doris.datasource.hive.HMSTransaction; import org.apache.doris.datasource.iceberg.IcebergTransaction; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.AbstractJobProcessor; import org.apache.doris.qe.CoordinatorContext; -import org.apache.doris.qe.JobProcessor; import org.apache.doris.qe.LoadContext; import org.apache.doris.thrift.TFragmentInstanceReport; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; -public class LoadProcessor implements JobProcessor { +public class LoadProcessor extends AbstractJobProcessor { private static final Logger LOG = LogManager.getLogger(LoadProcessor.class); - public final CoordinatorContext coordinatorContext; public final LoadContext loadContext; public final long jobId; // this latch is used to wait finish for load, for example, insert into statement // MarkedCountDownLatch: // key: fragmentId, value: backendId - private volatile Optional executionTask; private volatile Optional> latch; - private volatile Optional> backendFragmentTasks; private volatile List topFragmentTasks; public LoadProcessor(CoordinatorContext coordinatorContext, long jobId) { - this.coordinatorContext = Objects.requireNonNull(coordinatorContext, "coordinatorContext can not be null"); + super(coordinatorContext); + this.loadContext = new LoadContext(); - this.executionTask = Optional.empty(); this.latch = Optional.empty(); this.backendFragmentTasks = Optional.empty(); @@ -87,14 +80,8 @@ public LoadProcessor(CoordinatorContext coordinatorContext, long jobId) { } @Override - public void setSqlPipelineTask(PipelineExecutionTask pipelineExecutionTask) { - Preconditions.checkArgument(pipelineExecutionTask != null, "sqlPipelineTask can not be null"); - - this.executionTask = Optional.of(pipelineExecutionTask); - Map backendFragmentTasks - = buildBackendFragmentTasks(pipelineExecutionTask); - this.backendFragmentTasks = Optional.of(backendFragmentTasks); - + protected void afterSetPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask) { + Map backendFragmentTasks = this.backendFragmentTasks.get(); MarkedCountDownLatch latch = new MarkedCountDownLatch<>(backendFragmentTasks.size()); for (BackendFragmentId backendFragmentId : backendFragmentTasks.keySet()) { latch.addMark(backendFragmentId.fragmentId, backendFragmentId.backendId); @@ -168,34 +155,9 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return latch.get().await(timeout, unit); } - public void updateFragmentExecStatus(TReportExecStatusParams params) { - SingleFragmentPipelineTask fragmentTask = backendFragmentTasks.get().get( - new BackendFragmentId(params.getBackendId(), params.getFragmentId())); - if (fragmentTask == null || !fragmentTask.processReportExecStatus(params)) { - return; - } - TUniqueId queryId = coordinatorContext.queryId; - Status status = new Status(params.status); - // for now, abort the query if we see any error except if the error is cancelled - // and returned_all_results_ is true. - // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) - if (!status.ok()) { - if (coordinatorContext.isEos() && status.isCancelled()) { - LOG.warn("Query {} has returned all results, fragment_id={} instance_id={}, be={}" - + " is reporting failed status {}", - DebugUtil.printId(queryId), params.getFragmentId(), - DebugUtil.printId(params.getFragmentInstanceId()), - params.getBackendId(), - status.toString()); - } else { - LOG.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={}," - + " error message: {}", - DebugUtil.printId(queryId), params.getFragmentId(), - DebugUtil.printId(params.getFragmentInstanceId()), - params.getBackendId(), status.toString()); - coordinatorContext.updateStatusIfOk(status); - } - } + + @Override + protected void doProcessReportExecStatus(TReportExecStatusParams params, SingleFragmentPipelineTask fragmentTask) { LoadContext loadContext = coordinatorContext.asLoadProcessor().loadContext; if (params.isSetDeltaUrls()) { loadContext.updateDeltaUrls(params.getDeltaUrls()); @@ -234,7 +196,7 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { if (fragmentTask.isDone()) { if (LOG.isDebugEnabled()) { LOG.debug("Query {} fragment {} is marked done", - DebugUtil.printId(queryId), params.getFragmentId()); + DebugUtil.printId(coordinatorContext.queryId), params.getFragmentId()); } latch.get().markedCountDown(params.getFragmentId(), params.getBackendId()); } @@ -258,22 +220,6 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { } } - private Map buildBackendFragmentTasks( - PipelineExecutionTask executionTask) { - ImmutableMap.Builder backendFragmentTasks - = ImmutableMap.builder(); - for (Entry backendTask : executionTask.getChildrenTasks().entrySet()) { - Long backendId = backendTask.getKey(); - for (Entry fragmentIdToTask : backendTask.getValue() - .getChildrenTasks().entrySet()) { - Integer fragmentId = fragmentIdToTask.getKey(); - SingleFragmentPipelineTask fragmentTask = fragmentIdToTask.getValue(); - backendFragmentTasks.put(new BackendFragmentId(backendId, fragmentId), fragmentTask); - } - } - return backendFragmentTasks.build(); - } - /* * Check the state of backends in needCheckBackendExecStates. * return true if all of them are OK. Otherwise, return false. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java index 8c1b9714c35db8..ae87d59d075d12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java @@ -102,7 +102,7 @@ public void execute() throws Exception { @Override public String toString() { - return "SqlPipelineTask(\n" + return "PipelineExecutionTask(\n" + childrenTasks.allTasks() .stream() .map(multiFragmentsPipelineTask -> " " + multiFragmentsPipelineTask) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java index fd00bf0e3e8536..0da6f4a5fe2e43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java @@ -61,7 +61,7 @@ private PipelineExecutionTask buildTask(CoordinatorContext coordinatorContext, backendServiceProxy, buildMultiFragmentTasks(coordinatorContext, backendServiceProxy, workerToFragmentsParam) ); - coordinatorContext.getJobProcessor().setSqlPipelineTask(pipelineExecutionTask); + coordinatorContext.getJobProcessor().setPipelineExecutionTask(pipelineExecutionTask); return pipelineExecutionTask; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java index 2ec38e8cc8e3ea..a5a5100faece1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java @@ -25,13 +25,14 @@ import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; import org.apache.doris.planner.DataSink; import org.apache.doris.planner.ResultSink; +import org.apache.doris.qe.AbstractJobProcessor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.CoordinatorContext; -import org.apache.doris.qe.JobProcessor; import org.apache.doris.qe.ResultReceiver; import org.apache.doris.qe.RowBatch; import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Strings; @@ -44,24 +45,21 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; -public class QueryProcessor implements JobProcessor { +public class QueryProcessor extends AbstractJobProcessor { private static final Logger LOG = LogManager.getLogger(QueryProcessor.class); // constant fields private final long limitRows; // mutable field - private Optional sqlPipelineTask; - private final CoordinatorContext coordinatorContext; private final List runningReceivers; private int receiverOffset; private long numReceivedRows; public QueryProcessor(CoordinatorContext coordinatorContext, List runningReceivers) { - this.coordinatorContext = Objects.requireNonNull(coordinatorContext, "coordinatorContext can not be null"); + super(coordinatorContext); this.runningReceivers = new CopyOnWriteArrayList<>( Objects.requireNonNull(runningReceivers, "runningReceivers can not be null") ); @@ -69,8 +67,6 @@ public QueryProcessor(CoordinatorContext coordinatorContext, List { + this.executionTask.ifPresent(sqlPipelineTask -> { for (MultiFragmentsPipelineTask fragmentsTask : sqlPipelineTask.getChildrenTasks().values()) { fragmentsTask.cancelExecute(cancelReason); }